Merhaba, ben Dmitry Logvinenko - Vezet şirketler grubunun Analitik Departmanında Veri Mühendisi.
Size ETL süreçleri geliştirmek için harika bir araçtan bahsedeceğim - Apache Airflow. Ancak Airflow o kadar çok yönlü ve çok yönlüdür ki, veri akışlarına dahil olmasanız, ancak herhangi bir işlemi periyodik olarak başlatmanız ve yürütmelerini izlemeniz gerekse bile, ona daha yakından bakmalısınız.
Ve evet, sadece anlatmakla kalmayacağım, aynı zamanda göstereceğim: programın birçok kodu, ekran görüntüsü ve önerisi var.
Google'da Airflow / Wikimedia Commons kelimesini arattığınızda genellikle ne görürsünüz?
- sadece daha iyisi ve tamamen farklı amaçlar için yapıldı, yani (kata'dan önce yazıldığı gibi):
sınırsız sayıda makinede görevleri çalıştırma ve izleme (birçok Kereviz / Kubernetes ve vicdanınızın size izin verdiği kadar)
Yazması ve anlaması çok kolay Python kodundan dinamik iş akışı oluşturma ile
ve hem hazır bileşenleri hem de ev yapımı eklentileri (son derece basit) kullanarak herhangi bir veritabanını ve API'yi birbirine bağlama yeteneği.
Apache Airflow'u şu şekilde kullanıyoruz:
DWH ve ODS'de (Vertica ve Clickhouse'a sahibiz) çeşitli kaynaklardan (birçok SQL Server ve PostgreSQL örneği, uygulama metriklerine sahip çeşitli API'ler, hatta 1C) veri topluyoruz.
ne kadar gelişmiş cronODS üzerinde veri konsolidasyon süreçlerini başlatan ve bakımlarını da denetleyen.
Yakın zamana kadar ihtiyaçlarımız 32 çekirdekli ve 50 GB RAM'e sahip küçük bir sunucu tarafından karşılanıyordu. Airflow'da bu şu şekilde çalışır:
daha fazla 200 han (aslında görevleri doldurduğumuz iş akışları),
ortalama olarak her birinde 70 görev,
bu iyilik başlar (ortalama olarak da) saatte bir.
Ve nasıl genişlediğimizi aşağıya yazacağım ama şimdi çözeceğimiz über problemini tanımlayalım:
Her biri 50 veritabanına sahip üç orijinal SQL Sunucusu vardır - sırasıyla bir projenin örnekleri, aynı yapıya sahiptirler (neredeyse her yerde, mua-ha-ha), bu da her birinin bir Siparişler tablosuna (neyse ki, buna sahip bir tablo) sahip olduğu anlamına gelir. adı herhangi bir işletmeye itilebilir). Verileri hizmet alanları (kaynak sunucu, kaynak veritabanı, ETL görev kimliği) ekleyerek alıyoruz ve saf bir şekilde örneğin Vertica'ya atıyoruz.
Hadi gidelim!
Ana kısım, pratik (ve biraz da teorik)
Neden biz (ve siz)
Ağaçlar büyükken ben basittim SQL-schik, bir Rus perakende satışında, bize sunulan iki aracı kullanarak veri akışları olarak da bilinen ETL işlemlerini dolandırdık:
Bilişim Güç Merkezi - kendi donanımına, kendi versiyonuna sahip, son derece üretken, son derece yaygın bir sistem. Tanrı korusun yeteneklerinin% 1'ini kullandım. Neden? Her şeyden önce, 380'li yıllardan kalma bu arayüz, zihinsel olarak üzerimizde baskı oluşturuyor. İkinci olarak, bu mekanizma son derece süslü süreçler, öfkeli bileşen yeniden kullanımı ve diğer çok önemli kurumsal hileler için tasarlanmıştır. Airbus AXNUMX / yıl kanadı gibi maliyeti hakkında hiçbir şey söylemeyeceğiz.
Dikkat, ekran görüntüsü 30 yaşın altındaki insanları biraz incitebilir
SQL Server Entegrasyon Sunucusu - bu yoldaşı proje içi akışlarımızda kullandık. Aslında, zaten SQL Server kullanıyoruz ve ETL araçlarını kullanmamak bir şekilde mantıksız olurdu. İçindeki her şey iyi: hem arayüz güzel, hem de ilerleme raporları ... Ama bu yüzden yazılım ürünlerini sevmiyoruz, ah, bunun için değil. sürüm dtsx (bu, kaydetme sırasında karıştırılan düğümleri olan XML'dir) yapabiliriz, ama amaç ne? Yüzlerce tabloyu bir sunucudan diğerine sürükleyecek bir görev paketi yapmaya ne dersiniz? Evet, ne yüz, işaret parmağınız fare düğmesine tıklayarak yirmi parçadan düşecek. Ama kesinlikle daha moda görünüyor:
Elbette çıkış yolları aradık. Hatta durum neredeyse kendi kendine yazılmış bir SSIS paket oluşturucuya geldi ...
…ve sonra yeni bir iş beni buldu. Ve Apache Airflow beni bu konuda geride bıraktı.
ETL işlem açıklamalarının basit Python kodu olduğunu öğrendiğimde, sadece neşe için dans etmedim. Veri akışları bu şekilde versiyonlanıp farklılaştırıldı ve yüzlerce veri tabanından tek yapıya sahip tabloları tek bir hedefe dökmek, bir buçuk veya iki 13” ekranda Python kodu meselesi haline geldi.
Kümeyi birleştirme
Tamamen bir anaokulu düzenlemeyelim ve burada Airflow'u yüklemek, seçtiğiniz veritabanı, Kereviz ve rıhtımda açıklanan diğer durumlar gibi tamamen bariz şeylerden bahsetmeyelim.
Deneylere hemen başlayabilmemiz için eskizini çizdim. docker-compose.yml hangisinde:
Aslında yükseltelim Hava akışı: Zamanlayıcı, Web sunucusu. Flower ayrıca Kereviz görevlerini izlemek için orada dönüyor olacak (çünkü zaten apache/airflow:1.10.10-python3.7, ama umursamıyoruz)
PostgreSQLAirflow'un hizmet bilgilerini (zamanlayıcı verileri, yürütme istatistikleri vb.) yazacağı ve Celery'nin tamamlanan görevleri işaretleyeceği;
Redis, Kereviz için bir görev komisyoncusu olarak hareket edecek;
kereviz işçisi, görevlerin doğrudan yerine getirilmesiyle meşgul olacak.
Klasöre ./dags dosyalarımızı dags açıklaması ile ekleyeceğiz. Anında alınacaklar, bu nedenle her hapşırmadan sonra tüm yığınla hokkabazlık yapmaya gerek yok.
Bazı yerlerde, örneklerdeki kod tam olarak gösterilmez (metni karıştırmamak için), ancak işlem sırasında bir yerde değiştirilir. Eksiksiz çalışan kod örnekleri depoda bulunabilir https://github.com/dm-logv/airflow-tutorial.
Kompozisyonun montajında, büyük ölçüde iyi bilinen görüntüye güvendim. puckel/docker-hava akımı - kontrol ettiğinizden emin olun. Belki de hayatında başka hiçbir şeye ihtiyacın yoktur.
Tüm Hava Akışı ayarları yalnızca airflow.cfg, ama aynı zamanda kötü niyetli olarak yararlandığım ortam değişkenleri (geliştiriciler sayesinde) aracılığıyla.
Doğal olarak üretime hazır değil: Kasten kaplara kalp atışı koymadım, güvenlikle uğraşmadım. Ama deneycilerimiz için en uygun olanı yaptım.
Dikkat:
Dag klasörü hem zamanlayıcı hem de çalışanlar tarafından erişilebilir olmalıdır.
Aynısı, tüm üçüncü taraf kitaplıkları için de geçerlidir - bunların tümü, bir zamanlayıcı ve çalışanlara sahip makinelere kurulmalıdır.
Eh, şimdi çok basit:
$ docker-compose up --scale worker=3
Her şey yükseldikten sonra web arayüzlerine bakabilirsiniz:
Tüm bu "günlüklerden" hiçbir şey anlamadıysanız, işte kısa bir sözlük:
Zamanlayıcı - Airflow'daki en önemli amca, bir kişinin değil, robotların sıkı çalışmasını kontrol eder: programı izler, günlükleri günceller, görevleri başlatır.
Genel olarak, eski sürümlerde bellekle ilgili sorunları vardı (hayır, amnezi değil, sızıntılar) ve hatta eski parametre yapılandırmalarda kaldı run_duration — yeniden başlatma aralığı. Ama şimdi her şey yolunda.
DAG (namı diğer "dag") - "yönlendirilmiş asiklik grafik", ancak böyle bir tanım çok az kişiye söyleyecektir, ancak aslında birbiriyle etkileşime giren görevler için bir kapsayıcıdır (aşağıya bakın) veya SSIS'deki Paket ve Informatica'daki Workflow'un bir benzeridir. .
Günlere ek olarak, hala alt günler olabilir, ancak büyük olasılıkla onlara ulaşamayacağız.
DAG Koşusu - kendi atanan başlatılmış gün execution_date. Aynı günün Dagran'ları paralel olarak çalışabilir (eğer görevlerinizi önemsiz yaptıysanız, elbette).
Kullanım belirli bir eylemi gerçekleştirmekten sorumlu kod parçalarıdır. Üç tür operatör vardır:
aksiyonbizim favorimiz gibi PythonOperatorherhangi bir (geçerli) Python kodunu çalıştırabilen;
transfer, verileri bir yerden bir yere taşıyan, diyelim ki, MsSqlToHiveTransfer;
algılayıcı Öte yandan, bir olay meydana gelene kadar tepki vermenize veya dag'ın daha fazla yürütülmesini yavaşlatmanıza izin verecektir. HttpSensor belirtilen uç noktayı çekebilir ve istenen yanıt beklerken aktarımı başlatır GoogleCloudStorageToS3Operator. Meraklı bir zihin soracaktır: “neden? Ne de olsa tekrarları doğrudan operatörde yapabilirsiniz!” Ve sonra, görev havuzunu askıya alınmış operatörlerle tıkamamak için. Sensör başlar, kontrol eder ve bir sonraki denemeden önce ölür.
Görev - türü ne olursa olsun ve günlüğe eklenmiş beyan edilen operatörler, görev sıralamasına yükseltilir.
görev örneği - genel planlamacı, icracı-işçiler üzerinde görevleri savaşa gönderme zamanının geldiğine karar verdiğinde (eğer kullanırsak, hemen yerinde) LocalExecutor veya şu durumda uzak bir düğüme CeleryExecutor), onlara bir bağlam atar (yani, bir dizi değişken - yürütme parametreleri), komut veya sorgu şablonlarını genişletir ve bunları havuzlar.
Görevler üretiyoruz
İlk olarak, doug'ımızın genel şemasını çizelim ve ardından önemsiz olmayan bazı çözümler uyguladığımız için ayrıntılara giderek daha fazla dalacağız.
Yani, en basit haliyle, böyle bir dag şöyle görünecektir:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Hadi çözelim:
İlk olarak, gerekli lib'leri içe aktarıyoruz ve başka bir şey;
sql_server_ds - Mı List[namedtuple[str, str]] Airflow Connections'tan bağlantıların isimleri ve plakamızı alacağımız veritabanları ile;
dag - mutlaka içinde olması gereken günümüzün duyurusu globals(), aksi takdirde Airflow onu bulamaz. Doug'ın ayrıca şunları söylemesi gerekiyor:
onun adı ne orders - bu ad daha sonra web arayüzünde görünecektir,
XNUMX Temmuz gece yarısından itibaren çalışacağını,
ve yaklaşık olarak her 6 saatte bir çalışmalıdır (buradaki sert adamlar için timedelta() kabul edilebilir cron-astar 0 0 0/6 ? * * *, daha az havalı - gibi bir ifade @daily);
workflow() ana işi yapacak, ama şimdi değil. Şimdilik, içeriğimizi günlüğe aktaracağız.
Ve şimdi görev yaratmanın basit büyüsü:
kaynaklarımızı gözden geçiriyoruz;
başlatmak PythonOperatorkuklamızı yürütecek olan workflow(). Görevin benzersiz (günlük içinde) bir adını belirtmeyi ve günlüğün kendisini bağlamayı unutmayın. bayrak provide_context sırayla, işleve, kullanarak dikkatlice toplayacağımız ek argümanlar ekleyecektir. **context.
Şimdilik hepsi bu. Elimizde ne var:
web arayüzünde yeni gün,
paralel olarak yürütülecek bir buçuk yüz görev (Airflow, Kereviz ayarları ve sunucu kapasitesi izin veriyorsa).
Neredeyse anladım.
Bağımlılıkları kim kuracak?
Her şeyi basitleştirmek için, batırdım docker-compose.yml işleme requirements.txt tüm düğümlerde.
Şimdi gitti:
Gri kareler, zamanlayıcı tarafından işlenen görev örnekleridir.
Biraz bekleriz, görevler işçiler tarafından kapılır:
Yeşil olanlar elbette işlerini başarıyla tamamladılar. Kırmızılar pek başarılı değil.
Bu arada, ürünümüzde klasör yok. ./dags, makineler arasında senkronizasyon yoktur - tüm günlükler git Gitlab'ımızda ve Gitlab CI, birleştirme sırasında güncellemeleri makinelere dağıtır master.
Çiçek hakkında biraz
İşçiler emziklerimizi harmanlarken, bize bir şeyler gösterebilecek başka bir aracı hatırlayalım - Çiçek.
Çalışan düğümleri hakkında özet bilgiler içeren ilk sayfa:
İşe yarayan görevlerle en yoğun sayfa:
Brokerimizin statüsüne sahip en sıkıcı sayfa:
En parlak sayfa, görev durumu grafikleri ve yürütme süreleriyle birliktedir:
Eksik olanı yüklüyoruz
Böylece tüm görevler yerine getirildi, yaralıları götürebilirsiniz.
Ve şu ya da bu nedenle çok sayıda yaralı vardı. Airflow'un doğru kullanılması durumunda, bu tam kareler, verilerin kesinlikle ulaşmadığını gösterir.
Günlüğü izlemeniz ve düşen görev örneklerini yeniden başlatmanız gerekir.
Herhangi bir kareye tıklayarak, bize sunulan eylemleri göreceğiz:
Düşmüşleri alıp temizleyebilirsiniz. Yani, orada bir şeyin başarısız olduğunu unutuyoruz ve aynı örnek görev zamanlayıcıya gidecek.
Bunu fareyle tüm kırmızı karelerle yapmanın çok insancıl olmadığı açık - Airflow'dan beklediğimiz bu değil. Doğal olarak kitle imha silahlarımız var: Browse/Task Instances
Her şeyi bir kerede seçelim ve sıfıra sıfırlayalım, doğru öğeye tıklayın:
Temizlikten sonra taksilerimiz şöyle görünür (zaten zamanlayıcının onları programlamasını bekliyorlar):
Bağlantılar, kancalar ve diğer değişkenler
Bir sonraki DAG'a bakma zamanı, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Herkes hiç rapor güncellemesi yaptı mı? Bu yine o: verilerin nereden alınabileceğine dair bir kaynak listesi var; nereye koyacağınıza dair bir liste var; her şey olduğunda veya bozulduğunda korna çalmayı unutma (pekala, bu bizimle ilgili değil, hayır).
Dosyayı tekrar gözden geçirelim ve yeni belirsiz şeylere bakalım:
from commons.operators import TelegramBotSendMessage - Unblocked'a mesaj göndermek için küçük bir sarmalayıcı yaparak yararlandığımız kendi operatörlerimizi oluşturmamızı hiçbir şey engellemiyor. (Aşağıda bu operatör hakkında daha fazla konuşacağız);
default_args={} - dag aynı bağımsız değişkenleri tüm işleçlerine dağıtabilir;
to='{{ var.value.all_the_kings_men }}' - alan to sabit kodlu değil, Jinja ve e-posta listeli bir değişken kullanarak dinamik olarak oluşturacağız; Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — operatörü başlatmak için koşul. Bizim durumumuzda, mektup yalnızca tüm bağımlılıklar işe yaradığında patronlara uçacaktır. başarılı olarak;
tg_bot_conn_id='tg_main' - argümanlar conn_id oluşturduğumuz bağlantı kimliklerini kabul edin Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - Telegram'daki mesajlar, yalnızca düşen görevler varsa uçar;
task_concurrency=1 - bir görevin birkaç görev örneğinin aynı anda başlatılmasını yasaklıyoruz. Aksi takdirde, birkaçının eşzamanlı lansmanını alacağız. VerticaOperator (bir masaya bakarak);
report_update >> [email, tg] - hepsi VerticaOperator aşağıdaki gibi mektuplar ve mesajlar gönderme konusunda birleşin:
Ancak, bildirim operatörlerinin farklı başlatma koşulları olduğundan, yalnızca biri çalışacaktır. Ağaç Görünümünde her şey biraz daha az görsel görünür:
hakkında birkaç söz söyleyeceğim makrolar ve arkadaşları - değişkenler.
Makrolar, çeşitli faydalı bilgileri operatör bağımsız değişkenlerine ikame edebilen Jinja yer tutucularıdır. Örneğin, bunun gibi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} bağlam değişkeninin içeriğine genişleyecek execution_date biçiminde YYYY-MM-DD: 2020-07-14. En iyi yanı, bağlam değişkenlerinin belirli bir görev örneğine (Ağaç Görünümünde bir kare) sabitlenmiş olması ve yeniden başlatıldığında yer tutucuların aynı değerlere genişleyecek olmasıdır.
Atanan değerler, her görev örneğinde İşlenen düğmesi kullanılarak görüntülenebilir. Mektup gönderme görevi şu şekildedir:
Ve böylece bir mesaj gönderme görevinde:
Mevcut en son sürüm için yerleşik makroların tam listesi burada mevcuttur: makro referansı
Üstelik eklentilerin yardımıyla kendi makrolarımızı ilan edebiliriz, ama bu başka bir hikaye.
Önceden tanımlanmış şeylere ek olarak, değişkenlerimizin değerlerini değiştirebiliriz (bunu zaten yukarıdaki kodda kullandım). içinde oluşturalım Admin/Variables birkaç şey:
sadece istenen anahtarın yolunu kullanın: {{ var.json.bot_config.bot.token }}.
Kelimenin tam anlamıyla bir kelime söyleyeceğim ve hakkında bir ekran görüntüsü göstereceğim bağlantı. Burada her şey temel: sayfada Admin/Connections bir bağlantı oluşturuyoruz, girişlerimizi / şifrelerimizi ve daha spesifik parametreleri oraya ekliyoruz. Bunun gibi:
Parolalar şifrelenebilir (varsayılandan daha ayrıntılı olarak) veya bağlantı türünü (benim yaptığım gibi) dışarıda bırakabilirsiniz. tg_main) - gerçek şu ki, Airflow modellerinde tür listesi fiziksel olarak bağlanmıştır ve kaynak kodlarına girmeden genişletilemez (birdenbire google'da bir şey aramadıysam, lütfen beni düzeltin), ancak hiçbir şey kredi almamızı engelleyemez. isim.
Aynı ada sahip birkaç bağlantı da kurabilirsiniz: bu durumda, yöntem BaseHook.get_connection()bize adıyla bağlantı sağlayan , verecek rasgele birkaç isimden (Round Robin yapmak daha mantıklı olurdu, ancak bunu Airflow geliştiricilerinin vicdanına bırakalım).
Değişkenler ve Bağlantılar kesinlikle harika araçlardır, ancak dengeyi kaybetmemek önemlidir: akışlarınızın hangi kısımlarını kodun kendisinde saklarsınız ve hangi kısımlarını saklaması için Airflow'a verirsiniz. Bir yandan, örneğin bir posta kutusu gibi bir değeri kullanıcı arabirimi aracılığıyla hızlı bir şekilde değiştirmek uygun olabilir. Öte yandan, bu yine de (ben) kurtulmak istediğimiz fare tıklamasına bir dönüş.
Bağlantılarla çalışmak görevlerden biridir kancalar. Genel olarak, Airflow kancaları, onu üçüncü taraf hizmetlere ve kitaplıklara bağlamak için kullanılan noktalardır. Örneğin, JiraHook Jira ile etkileşim kurmamız için bir müşteri açacak (görevleri ileri geri hareket ettirebilirsin) ve yardımıyla SambaHook yerel bir dosyayı şuraya gönderebilirsiniz: smb-nokta.
Özel operatörü ayrıştırma
Ve nasıl yapıldığına bakmaya yaklaştık TelegramBotSendMessage
Kod commons/operators.py gerçek operatör ile:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Airflow'daki diğer her şey gibi burada da her şey çok basit:
miras alınan BaseOperator, Airflow'a özgü epeyce şey uygulayan (boş zamanlarınıza bakın)
Bildirilen alanlar template_fields, burada Jinja işlenecek makroları arayacaktır.
için doğru argümanları düzenledi. __init__(), gerektiğinde varsayılanları ayarlayın.
Atanın başlatılmasını da unutmadık.
İlgili kancayı açtı TelegramBotHookondan bir istemci nesnesi aldı.
Geçersiz kılınan (yeniden tanımlanmış) yöntem BaseOperator.execute(), operatörü başlatma zamanı geldiğinde Airfow'un seğireceği - içinde oturum açmayı unutarak ana eylemi uygulayacağız. (Bu arada giriş yapıyoruz, hemen stdout и stderr - Hava akışı her şeyi kesecek, güzelce saracak, gerektiğinde ayrıştıracaktır.)
bakalım elimizde ne var commons/hooks.py. Kancanın kendisi ile dosyanın ilk kısmı:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Burada ne anlatacağımı bile bilmiyorum, sadece önemli noktaları not edeceğim:
Miras alıyoruz, argümanları düşünün - çoğu durumda bir tane olacak: conn_id;
Standart yöntemleri geçersiz kılmak: Kendimi sınırladım get_conn(), bağlantı parametrelerini ada göre aldığım ve sadece bölümü aldığım extra (bu bir JSON alanıdır), içine (kendi talimatlarıma göre!) Telegram bot belirtecini koydum: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Bizim bir örneğini oluşturuyorum TelegramBot, ona belirli bir belirteç veriyor.
Bu kadar. Kullanarak bir kancadan bir müşteri alabilirsiniz. TelegramBotHook().clent veya TelegramBotHook().get_conn().
Ve aynı şeyi sürüklememek için Telegram REST API için bir mikro sarmalayıcı yaptığım dosyanın ikinci kısmı python-telegram-bot bir yöntem için sendMessage.
Doğru yol, hepsini eklemektir: TelegramBotSendMessage, TelegramBotHook, TelegramBot - eklentide, halka açık bir depo koyun ve Açık Kaynağa verin.
Tüm bunları incelerken, rapor güncellemelerimiz başarılı bir şekilde başarısız oldu ve kanalda bana bir hata mesajı gönderdi. Yanlış mı diye kontrol edeceğim...
Köpeğimizde bir şey kırıldı! Beklediğimiz bu değil miydi? Kesinlikle!
dökecek misin?
Bir şeyi kaçırdığımı mı düşünüyorsun? Görünüşe göre SQL Server'dan Vertica'ya veri aktarmaya söz verdi ve sonra onu aldı ve konudan uzaklaştı, alçak!
Bu vahşet kasıtlıydı, sadece sizin için bazı terminolojileri deşifre etmem gerekiyordu. Şimdi daha ileri gidebilirsiniz.
Planımız şuydu:
gün yap
Görev oluştur
Her şeyin ne kadar güzel olduğunu görün
Dolgulara oturum numaraları atama
SQL Server'dan veri alma
Vertica'ya veri koyun
İstatistik topla
Bu yüzden, hepsini halletmek ve çalıştırmak için, dosyamıza küçük bir ekleme yaptım. docker-compose.yml:
Ev sahibi olarak Vertica dwh en varsayılan ayarlarla,
SQL Server'ın üç örneği,
ikincisindeki veritabanlarını bazı verilerle dolduruyoruz (hiçbir durumda mssql_init.py!)
Geçen seferkinden biraz daha karmaşık bir komutun yardımıyla tüm iyi şeyleri başlatıyoruz:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Mucize rasgele oluşturucumuzun ürettiği şeyi, öğeyi kullanabilirsiniz Data Profiling/Ad Hoc Query:
Ana şey analistlere göstermemek
üzerinde durmak ETL oturumları Yapmayacağım, orada her şey önemsiz: bir temel oluşturuyoruz, içinde bir işaret var, her şeyi bir içerik yöneticisi ile sarıyoruz ve şimdi şunu yapıyoruz:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
oturum.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Zaman geldi verilerimizi topla bir buçuk yüz masamızdan. Bunu çok iddiasız satırların yardımıyla yapalım:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Airflow'dan aldığımız bir kanca yardımıyla pymssql-bağlamak
Talebin yerine tarih biçiminde bir kısıtlama koyalım - bu, şablon motoru tarafından işleve atılacaktır.
İsteğimizi beslemek pandasbizi kim alacak DataFrame - gelecekte bizim için faydalı olacak.
ikame kullanıyorum {dt} istek parametresi yerine %s kötü bir Pinokyo olduğum için değil, çünkü pandas baş edememek pymssql ve sonuncusu kayar params: Listgerçekten istemesine rağmen tuple.
Ayrıca, geliştiricinin pymssql artık onu desteklememeye karar verdi ve taşınmanın zamanı geldi pyodbc.
Veri yoksa, devam etmenin bir anlamı yoktur. Ancak dolgunun başarılı olduğunu düşünmek de garip. Ama bu bir hata değil. A-ah-ah, ne yapmalı?! Ve işte ne:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException Airflow'a hata olmadığını söyler, ancak görevi atlarız. Arayüzde yeşil veya kırmızı bir kare değil, pembe olacaktır.
Taşma oturumumuzun kimliği (farklı olacak her görev için),
Kaynaktan ve sipariş kimliğinden bir karma - böylece nihai veritabanında (her şeyin tek bir tabloya döküldüğü yer) benzersiz bir sipariş kimliğimiz olur.
Sondan bir önceki adım kalır: her şeyi Vertica'ya dökün. Ve garip bir şekilde, bunu yapmanın en muhteşem ve verimli yollarından biri de CSV!
Satışta hedef plakasını manuel olarak oluşturuyoruz. Burada kendime küçük bir makineye izin verdim:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Ben kullanıyorum VerticaOperator() Bir veritabanı şeması ve bir tablo oluşturuyorum (tabii ki mevcut değillerse). Ana şey, bağımlılıkları doğru bir şekilde düzenlemektir:
- Şey, - dedi küçük fare, - değil mi, şimdi
Ormandaki en korkunç hayvan olduğuma inanıyor musun?
Julia Donaldson, Gruffalo
Meslektaşlarım ve benim bir rekabetimiz olsaydı: Kim bir ETL sürecini sıfırdan hızlı bir şekilde oluşturacak ve başlatacak: onlar SSIS'leri ve bir fare ve ben Airflow ile ... Ve sonra bakım kolaylığını da karşılaştırırdık ... Vay canına, sanırım onları her cephede yeneceğimi kabul edeceksin!
Biraz daha ciddiyse, Apache Airflow - süreçleri program kodu biçiminde tanımlayarak - işimi yaptı daha daha rahat ve keyifli.
Hem eklentiler hem de ölçeklenebilirliğe yatkınlık açısından sınırsız genişletilebilirliği, Airflow'u hemen hemen her alanda kullanma fırsatı verir: hatta tüm veri toplama, hazırlama ve işleme döngüsünde, hatta roket fırlatmada bile (Mars'a, Mars'a). kurs).
Bölüm finali, referans ve bilgiler
Sizin için topladığımız tırmık
start_date. Evet, bu zaten yerel bir mem. Doug'ın ana argümanı aracılığıyla start_date tamamı bitti. Kısaca belirtirseniz start_date geçerli tarih ve schedule_interval - bir gün, o zaman DAG yarın daha erken başlayacak.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Ve artık sorun yok.
Bununla ilişkili başka bir çalışma zamanı hatası var: Task is missing the start_date parameter, bu genellikle dag operatörüne bağlanmayı unuttuğunuzu gösterir.
Hepsi bir makinede. Evet ve tabanlar (Airflow'un kendisi ve kaplamamız) ve bir web sunucusu, bir zamanlayıcı ve çalışanlar. Ve hatta işe yaradı. Ancak zamanla hizmetler için görev sayısı arttı ve PostgreSQL dizine 20 ms yerine 5 s'de yanıt vermeye başlayınca onu aldık ve taşıdık.
LocalExecutor. Evet, hala üzerinde oturuyoruz ve çoktan uçurumun kenarına geldik. LocalExecutor şu ana kadar bizim için yeterliydi ama şimdi en az bir çalışanla genişleme zamanı ve CeleryExecutor'a geçmek için çok çalışmamız gerekecek. Ve onunla tek bir makinede çalışabileceğiniz gerçeği göz önüne alındığında, "dürüst olmak gerekirse, elbette asla üretime geçmeyecek!"
kullanılmama yerleşik araçlar:
Bağlantılar hizmet kimlik bilgilerini saklamak için,
SLA'yı Kaçıranlar Zamanında yapılmayan işlere cevap vermek,
xcom meta veri değişimi için (dedim metaveri!) günlük görevler arasında.
Posta kötüye kullanımı. Ne diyebilirim ki? Düşen görevlerin tüm tekrarları için uyarılar ayarlandı. Artık benim iş Gmail'de Airflow'dan gelen 90'den fazla e-posta var ve web posta ağzı aynı anda 100'den fazla e-postayı alıp silmeyi reddediyor.
Ellerimizle değil de kafalarımızla daha fazla çalışabilmemiz için Airflow bizim için şunu hazırladı:
REST API - hala, çalışmasına engel olmayan Deneysel statüsüne sahiptir. Bununla birlikte, yalnızca günler ve görevler hakkında bilgi almakla kalmaz, aynı zamanda bir günü durdurabilir/başlatabilir, bir DAG Çalıştırması veya bir havuz oluşturabilirsiniz.
CLI - WebUI aracılığıyla kullanımı elverişsiz olmakla kalmayan, ancak genellikle bulunmayan birçok araç komut satırı aracılığıyla kullanılabilir. Örneğin:
backfill görev örneklerini yeniden başlatmak için gereklidir.
Örneğin analistler geldi ve şöyle dedi: “Ve sen yoldaş, 1'den 13 Ocak'a kadar olan verilerde saçmalık var! Düzelt, düzelt, düzelt, düzelt!" Ve sen tam bir ocaksın:
Temel hizmet: initdb, resetdb, upgradedb, checkdb.
run, bu da bir örnek görevi çalıştırmanıza ve hatta tüm bağımlılıklarda puan almanıza olanak tanır. Ayrıca, aracılığıyla çalıştırabilirsiniz LocalExecutor, bir Kereviz kümeniz olsa bile.
hemen hemen aynı şeyi yapar test, sadece bazlarda da hiçbir şey yazmaz.
connections kabuktan bağlantıların toplu olarak oluşturulmasına izin verir.
Python API - eklentiler için tasarlanmış ve küçük ellerle dolup taşmayan oldukça sert bir etkileşim yolu. Ama gitmemize kim engel olacak? /home/airflow/dags, koşmak ipython ve ortalığı karıştırmaya başla? Örneğin, tüm bağlantıları aşağıdaki kodla dışa aktarabilirsiniz:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Airflow meta veritabanına bağlanma. Ona yazmayı önermiyorum, ancak çeşitli özel ölçümler için görev durumlarını almak, herhangi bir API'yi kullanmaktan çok daha hızlı ve kolay olabilir.
Diyelim ki tüm görevlerimiz önemsiz değil, ancak bazen düşebiliyor ve bu normal. Ancak birkaç blokaj zaten şüpheli ve kontrol edilmesi gerekecek.
SQL'e dikkat!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
referanslar
Ve tabii ki, Google'ın yayınından ilk on bağlantı, yer işaretlerimdeki Airflow klasörünün içeriğidir.
Python ve Apache Airflow'un Zen'i - örtük DAG iletme, işlevlerde bağlam atma, yine bağımlılıklar hakkında ve ayrıca görev başlatmalarını atlama hakkında.