Salom, men Dmitriy Logvinenko - "Vezet" kompaniyalar guruhi tahliliy bo'limining ma'lumotlar muhandisi.
Men sizga ETL jarayonlarini ishlab chiqish uchun ajoyib vosita - Apache Airflow haqida gapirib beraman. Ammo Airflow shu qadar ko'p qirrali va ko'p qirrali bo'lib, siz ma'lumotlar oqimlarida ishtirok etmasangiz ham, uni diqqat bilan ko'rib chiqishingiz kerak, lekin vaqti-vaqti bilan har qanday jarayonni ishga tushirish va ularning bajarilishini nazorat qilish zarurati bo'lsa.
Va ha, men nafaqat aytib beraman, balki ko'rsataman: dasturda juda ko'p kod, skrinshotlar va tavsiyalar mavjud.

Google'da Airflow / Wikimedia Commons so'zini ko'rganingizda odatda nima ko'rasiz
Mundarija
kirish
Apache Airflow xuddi Djangoga o'xshaydi:
- python tilida yozilgan
- ajoyib boshqaruv paneli mavjud,
- cheksiz kengaytirilishi mumkin
- faqat yaxshiroq va u butunlay boshqacha maqsadlarda qilingan, ya'ni (katdan oldin yozilganidek):
- cheksiz miqdordagi mashinalarda vazifalarni bajarish va nazorat qilish (ko'p Selderey / Kubernetes va sizning vijdoningiz sizga ruxsat berganidek)
- Python kodini yozish va tushunish juda oson bo'lgan dinamik ish oqimini yaratish bilan
- va tayyor komponentlar va uy qurilishi plaginlari yordamida har qanday ma'lumotlar bazalari va API-larni bir-biriga ulash imkoniyati (bu juda oddiy).
Biz Apache Airflow-dan quyidagicha foydalanamiz:
- biz DWH va ODS da (bizda Vertica va Clickhouse mavjud) turli manbalardan ma'lumotlarni yig'amiz (ko'plab SQL Server va PostgreSQL misollari, dastur ko'rsatkichlari bo'lgan turli xil APIlar, hatto 1C).
- qanchalik rivojlangan
cron, bu ODSda ma'lumotlarni birlashtirish jarayonlarini boshlaydi, shuningdek, ularga xizmat ko'rsatishni nazorat qiladi.
Yaqin vaqtgacha bizning ehtiyojlarimiz 32 yadroli va 50 Gb tezkor xotiraga ega bo'lgan bitta kichik server tomonidan qoplanar edi. Airflow'da bu ishlaydi:
- ko'proq 200 dag (aslida biz vazifalarni to'ldirgan ish oqimlari),
- har birida o'rtacha 70 ta vazifa,
- bu yaxshilik boshlanadi (shuningdek o'rtacha) soatiga bir marta.
Va qanday qilib kengaytirilganimiz haqida men quyida yozaman, lekin endi biz hal qiladigan über muammosini aniqlaymiz:
Har birida 50 ta ma'lumotlar bazasi bo'lgan uchta original SQL serveri mavjud - bitta loyihaning namunalari, mos ravishda ular bir xil tuzilishga ega (deyarli hamma joyda, mua-ha-ha), ya'ni har birida Buyurtmalar jadvali mavjud (xayriyatki, shunday jadval mavjud). ism har qanday biznesga kiritilishi mumkin). Biz xizmat maydonlarini (manba serveri, manba ma'lumotlar bazasi, ETL topshiriq identifikatori) qo'shish orqali ma'lumotlarni olamiz va ularni sodda tarzda, masalan, Vertica-ga tashlaymiz.
Yuringlar!
Asosiy qism, amaliy (va biroz nazariy)
Nega biz (va siz)
Daraxtlar katta bo'lganda va men oddiy edim SQL-schik bitta ruscha chakana savdoda biz mavjud bo'lgan ikkita vositadan foydalangan holda ETL jarayonlarini, ya'ni ma'lumotlar oqimlarini aldashga muvaffaq bo'ldik:
- Informatika quvvat markazi - juda keng tarqalgan tizim, juda samarali, o'ziga xos uskuna, o'z versiyasi. Men uning imkoniyatlaridan 1% xudo saqlasin foydalandim. Nega? Xo'sh, birinchi navbatda, 380-yillardagi bu interfeys bizga ruhiy bosim o'tkazdi. Ikkinchidan, bu kontraseptsiya juda chiroyli jarayonlar, g'azablangan komponentlarni qayta ishlatish va boshqa juda muhim korporativ fokuslar uchun mo'ljallangan. Uning Airbus AXNUMX qanoti kabi narxi haqida biz hech narsa demaymiz.
Ehtiyot bo'ling, skrinshot 30 yoshgacha bo'lgan odamlarga ozgina zarar etkazishi mumkin

- SQL Server integratsiya serveri - biz bu o'rtoqdan loyiha ichidagi oqimlarimizda foydalanganmiz. Xo'sh, aslida: biz allaqachon SQL Serverdan foydalanamiz va uning ETL vositalaridan foydalanmaslik qandaydir asossiz bo'lar edi. Unda hamma narsa yaxshi: interfeys ham chiroyli, taraqqiyot haqida hisobotlar... Lekin biz dasturiy mahsulotlarni nima uchun sevamiz, oh, buning uchun emas. Uning versiyasi
dtsx(qaysi tugunlarni saqlash paytida aralashtirilgan XML) biz qila olamiz, lekin buning nima keragi bor? Yuzlab jadvallarni bir serverdan ikkinchisiga sudrab o'tadigan vazifa paketini yaratish haqida nima deyish mumkin? Ha, yuzta, ko'rsatkich barmog'ingiz sichqoncha tugmasini bosgan holda yigirma bo'lakdan tushadi. Lekin, albatta, u yanada zamonaviy ko'rinadi:
Biz, albatta, chiqish yo'llarini qidirdik. Hatto hol deyarli o'z-o'zidan yozilgan SSIS paketi generatoriga keldi ...
... va keyin menga yangi ish topildi. Va Apache Airflow meni ortda qoldirdi.
ETL jarayonining tavsiflari oddiy Python kodi ekanligini bilganimda, men shunchaki xursandchilik uchun raqsga tushmadim. Ma'lumotlar oqimlarining versiyalari va farqlari shu tarzda amalga oshirildi va bitta tuzilishga ega jadvallarni yuzlab ma'lumotlar bazalaridan bitta nishonga quyish bir yarim yoki ikkita 13 "ekranda Python kodi masalasiga aylandi.
Klasterni yig'ish
Keling, butunlay bolalar bog'chasini tashkil qilmaylik va bu erda Airflow, siz tanlagan ma'lumotlar bazasi, Selderey va docklarda tasvirlangan boshqa holatlarni o'rnatish kabi mutlaqo aniq narsalar haqida gapirmaylik.
Biz darhol tajribalarni boshlashimiz uchun men eskiz qildim docker-compose.yml unda:
- Haqiqatan ham ko'taraylik Havo oqimi: Rejalashtiruvchi, veb-server. Selderey vazifalarini kuzatish uchun u erda gul ham aylanadi (chunki u allaqachon ichkariga kiritilgan
apache/airflow:1.10.10-python3.7, lekin biz qarshi emasmiz) - PostgreSQL, unda Airflow o'zining xizmat ma'lumotlarini (rejalashtiruvchi ma'lumotlari, ijro statistikasi va boshqalar) yozadi va Selderey bajarilgan vazifalarni belgilaydi;
- Redis, Selderey uchun vazifa brokeri vazifasini bajaradi;
- Selderey ishchi, vazifalarni bevosita bajarish bilan shug'ullanadi.
- Jildga
./dagsbiz fayllarimizni daglarning tavsifi bilan qo'shamiz. Ular pashshada olinadi, shuning uchun har bir aksirishdan keyin butun to'plamni jonglyor qilishning hojati yo'q.
Ba'zi joylarda misollardagi kod to'liq ko'rsatilmagan (matnni chalkashtirmaslik uchun), lekin biror joyda jarayonda o'zgartirilgan. To'liq ishchi kod misollarini omborda topish mumkin .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerRemarks:
- Kompozitsiyani yig'ishda men ko'p jihatdan taniqli tasvirga tayandim - albatta tekshirib ko'ring. Balki hayotingizda boshqa hech narsa kerak emasdir.
- Barcha havo oqimi sozlamalari nafaqat orqali mavjud
airflow.cfg, balki atrof-muhit o'zgaruvchilari orqali (ishlab chiquvchilarga rahmat), men undan yomon niyat bilan foydalanganman. - Tabiiyki, u ishlab chiqarishga tayyor emas: men ataylab yurak urishlarini idishlarga qo'ymadim, xavfsizlik bilan bezovta qilmadim. Ammo men eksperimentchilarimizga mos keladigan minimal narsani qildim.
- Eslab qoling:
- Dag papkasi ham rejalashtiruvchi, ham ishchilar uchun ochiq bo'lishi kerak.
- Xuddi shu narsa barcha uchinchi tomon kutubxonalariga tegishli - ularning barchasi rejalashtiruvchi va ishchilar bilan ishlaydigan mashinalarga o'rnatilishi kerak.
Xo'sh, endi hamma narsa oddiy:
$ docker-compose up --scale worker=3Har bir narsa ko'tarilgach, siz veb-interfeyslarga qarashingiz mumkin:
- Havo oqimi:
- Gul:
Asosiy tushunchalar
Agar siz ushbu "daglar" ning barchasida hech narsani tushunmagan bo'lsangiz, unda qisqacha lug'at:
- Taymer - Airflow-dagi eng muhim amaki, robotlar odam emas, qattiq ishlashini nazorat qiladi: jadvalni kuzatib boradi, daglarni yangilaydi, vazifalarni ishga tushiradi.
Umuman olganda, eski versiyalarda u xotira bilan bog'liq muammolarga duch keldi (yo'q, amneziya emas, balki oqmalar) va eski parametr hatto konfiguratsiyalarda ham saqlanib qolgan.
run_duration- uning qayta ishga tushirish oralig'i. Ammo hozir hammasi yaxshi. - DAG (aka "dag") - "yo'naltirilgan asiklik grafik", lekin bunday ta'rif kam odamga aytadi, lekin aslida bu bir-biri bilan o'zaro ta'sir qiladigan vazifalar uchun konteyner (pastga qarang) yoki SSIS va Informatica'dagi Workflow'dagi paketning analogidir. .
Daglardan tashqari, subdaglar hali ham bo'lishi mumkin, ammo biz ularga erisha olmaymiz.
- DAG Run - o'ziga xos bo'lgan ishga tushirilgan dag
execution_date. Xuddi shu dag'ning dagranlari parallel ravishda ishlashi mumkin (agar siz o'z vazifalaringizni idempotent qilgan bo'lsangiz, albatta). - Operator ma'lum bir harakatni bajarish uchun mas'ul bo'lgan kod qismlari. Operatorlarning uch turi mavjud:
- harakatbizning sevimlimiz kabi
PythonOperator, har qanday (to'g'ri) Python kodini bajarishi mumkin; - uzatishma'lumotlarni bir joydan boshqa joyga ko'chiradigan, aytaylik,
MsSqlToHiveTransfer; - Sensor boshqa tomondan, bu sizga biror voqea sodir bo'lgunga qadar munosabat bildirish yoki dagning keyingi bajarilishini sekinlashtirish imkonini beradi.
HttpSensorbelgilangan so'nggi nuqtani tortib olishi mumkin va kerakli javob kutilganda, uzatishni boshlangGoogleCloudStorageToS3Operator. Qiziquvchan aql so'raydi: "Nega? Axir siz operatorda takrorlashni amalga oshirishingiz mumkin! ” Va keyin, to'xtatilgan operatorlar bilan vazifalar havzasini to'sib qo'ymaslik uchun. Sensor keyingi urinishdan oldin ishga tushadi, tekshiradi va o'ladi.
- harakatbizning sevimlimiz kabi
- masala - turidan qat'iy nazar e'lon qilingan va dag'ga biriktirilgan operatorlar vazifa darajasiga ko'tariladi.
- vazifa misoli - bosh rejalashtiruvchi vazifalarni bajaruvchi-ishchilarga jangga yuborish vaqti keldi, deb qaror qilganda (agar biz foydalansak, darhol joyida)
LocalExecutoryoki holatda uzoq tugungaCeleryExecutor), ularga kontekstni tayinlaydi (ya'ni, o'zgaruvchilar to'plami - bajarilish parametrlari), buyruqlar yoki so'rovlar shablonlarini kengaytiradi va ularni birlashtiradi.
Biz vazifalar yaratamiz
Birinchidan, keling, dogimizning umumiy sxemasini belgilab olaylik, keyin biz tafsilotlarga ko'proq sho'ng'iymiz, chunki biz ba'zi bir ahamiyatsiz echimlarni qo'llaymiz.
Shunday qilib, eng oddiy shaklda bunday dag quyidagicha ko'rinadi:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Keling, buni aniqlaylik:
- Birinchidan, biz kerakli liblarni import qilamiz va boshqa bir narsa;
sql_server_dsIsmi?List[namedtuple[str, str]]Airflow Connections-dan ulanishlar nomlari va biz plitamizni oladigan ma'lumotlar bazalari bilan;dag- albatta bo'lishi kerak bo'lgan dagimizni e'lon qilishglobals(), aks holda Airflow uni topa olmaydi. Dug shuningdek aytishi kerak:- uning ismi nima
orders- keyin bu nom veb-interfeysda paydo bo'ladi, - u sakkizinchi iyulda yarim tundan boshlab ishlaydi,
- va u taxminan har 6 soatda ishlashi kerak (bu erda qattiq yigitlar uchun
timedelta()joizdircron-chiziq0 0 0/6 ? * * *, kamroq salqin uchun - kabi ifoda@daily);
- uning ismi nima
workflow()asosiy ishni bajaradi, lekin hozir emas. Hozircha biz kontekstimizni jurnalga joylashtiramiz.- Va endi vazifalarni yaratishning oddiy sehrlari:
- biz manbalarimiz orqali ishlaymiz;
- ishga tushirish
PythonOperator, bu bizning qo'g'irchoqimizni bajaradiworkflow(). Vazifaning noyob (dag ichida) nomini belgilashni va dagni o'zini bog'lashni unutmang. Bayroqprovide_contexto'z navbatida funktsiyaga qo'shimcha argumentlarni kiritadi, biz ularni diqqat bilan to'playmiz**context.
Hozircha hammasi shu. Bizda nima bor:
- veb-interfeysdagi yangi dag,
- parallel ravishda bajariladigan bir yarim yuzta vazifa (agar havo oqimi, selderey sozlamalari va server sig'imi ruxsat etilsa).
Xo'sh, deyarli oldim.

Bog'liqlarni kim o'rnatadi?
Bularning barchasini soddalashtirish uchun men o'girildim docker-compose.yml qayta ishlash requirements.txt barcha tugunlarda.
Endi u ketdi:

Kulrang kvadratlar - bu rejalashtiruvchi tomonidan qayta ishlanadigan topshiriq namunalari.
Biz biroz kutamiz, ishchilar vazifalarni hal qilishadi:

Yashillar, albatta, o'z ishlarini muvaffaqiyatli yakunladilar. Qizil ranglar unchalik muvaffaqiyatli emas.
Aytgancha, bizning mahsulotimizda papka yo'q
./dags, mashinalar o'rtasida sinxronizatsiya yo'q - barcha daglar yotadigitGitlab-da va Gitlab CI birlashganda mashinalarga yangilanishlarni tarqatadimaster.
Gul haqida bir oz
Ishchilar so‘rg‘ichlarimizni urayotganda, keling, bizga nimanidir ko‘rsata oladigan yana bir vositani eslaylik – Gul.
Ishchi tugunlari haqida qisqacha ma'lumotga ega bo'lgan birinchi sahifa:

Ishga tushgan vazifalar bilan eng qizg'in sahifa:

Brokerimiz maqomi bilan eng zerikarli sahifa:

Eng yorqin sahifa vazifa holati grafiklari va ularning bajarilish vaqti:

Biz kam yuklangan narsalarni yuklaymiz
Shunday qilib, barcha vazifalar bajarildi, siz yaradorlarni olib ketishingiz mumkin.

Va yaradorlar ko'p edi - u yoki bu sababga ko'ra. Havo oqimidan to'g'ri foydalanilganda, bu kvadratlar ma'lumotlar aniq kelmaganligini ko'rsatadi.
Jurnalni tomosha qilishingiz va tushib qolgan topshiriq namunalarini qayta ishga tushirishingiz kerak.
Har qanday kvadratni bosish orqali biz mavjud amallarni ko'ramiz:

Siz yiqilganlarni olib, Clear qilishingiz mumkin. Ya'ni, biz u erda biror narsa muvaffaqiyatsiz bo'lganini unutamiz va xuddi shu misol vazifasi rejalashtiruvchiga o'tadi.

Buni barcha qizil kvadratlar bilan sichqoncha bilan qilish unchalik insonparvar emasligi aniq - bu biz Airflow-dan kutgan narsa emas. Tabiiyki, bizda ommaviy qirg'in qurollari bor: Browse/Task Instances

Keling, bir vaqtning o'zida hamma narsani tanlaymiz va nolga qaytaramiz, to'g'ri elementni bosing:

Tozalashdan keyin bizning taksilarimiz quyidagicha ko'rinadi (ular allaqachon rejalashtiruvchi ularni rejalashtirishini kutishmoqda):

Ulanishlar, kancalar va boshqa o'zgaruvchilar
Keyingi DAGga qarash vaqti keldi, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Hamma hisobotni yangilaganmi? Bu yana u: ma'lumotlarni qaerdan olish mumkin bo'lgan manbalar ro'yxati mavjud; qaerga qo'yish kerakligi ro'yxati mavjud; hamma narsa sodir bo'lganda yoki buzilganda qo'ng'iroq qilishni unutmang (yaxshi, bu biz haqimizda emas, yo'q).
Keling, yana faylni ko'rib chiqamiz va yangi tushunarsiz narsalarni ko'rib chiqamiz:
from commons.operators import TelegramBotSendMessage- bizni o'z operatorlarimizni yaratishga hech narsa to'sqinlik qilmaydi, biz Unblocked-ga xabarlarni yuborish uchun kichik o'ram yasash orqali foyda oldik. (Biz quyida ushbu operator haqida ko'proq gaplashamiz);default_args={}- dag bir xil argumentlarni barcha operatorlariga tarqatishi mumkin;to='{{ var.value.all_the_kings_men }}'- maydontobiz qattiq kodlangan emas, balki Jinja yordamida dinamik ravishda yaratilgan va men diqqat bilan kiritgan elektron pochta ro'yxati bilan o'zgaruvchiga ega bo'lamiz.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— operatorni ishga tushirish sharti. Bizning holatlarimizda, agar barcha bog'liqliklar ishlagan bo'lsa, xat xo'jayinlarga uchib ketadi muvaffaqiyatli;tg_bot_conn_id='tg_main'- argumentlarconn_idbiz yaratgan ulanish identifikatorlarini qabul qilingAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Telegram-dagi xabarlar faqat topshiriqlar bajarilmagan taqdirdagina uchib ketadi;task_concurrency=1- biz bir vazifaning bir nechta topshiriq nusxalarini bir vaqtning o'zida ishga tushirishni taqiqlaymiz. Aks holda, biz bir vaqtning o'zida bir nechtasini ishga tushiramizVerticaOperator(bitta stolga qarab);report_update >> [email, tg]- barchasiVerticaOperatorquyidagi kabi xat va xabarlarni yuborishda birlashing:

Ammo bildirishnoma operatorlari turli ishga tushirish shartlariga ega bo'lganligi sababli, faqat bittasi ishlaydi. Daraxt ko'rinishida hamma narsa biroz kamroq ko'rinadi:

haqida bir necha so'z aytaman makroslar va ularning do'stlari - o'zgaruvchilar.
Makroslar Jinja to'ldiruvchilari bo'lib, ular turli xil foydali ma'lumotlarni operator argumentlariga almashtira oladi. Masalan, bu kabi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} kontekst o'zgaruvchisi mazmuniga kengayadi execution_date formatida YYYY-MM-DD: 2020-07-14. Eng yaxshi tomoni shundaki, kontekst o'zgaruvchilari ma'lum bir vazifa misoliga (Daraxt ko'rinishidagi kvadrat) mixlangan va qayta ishga tushirilganda, to'ldiruvchilar bir xil qiymatlarga kengayadi.
Belgilangan qiymatlarni har bir topshiriq misolidagi Renderlangan tugmasi yordamida ko'rish mumkin. Xat yuborish vazifasi quyidagicha:

Va shuning uchun xabar yuborish vazifasida:

Eng soʻnggi versiya uchun oʻrnatilgan makroslarning toʻliq roʻyxati bu yerda mavjud:
Bundan tashqari, plaginlar yordamida biz o'z makroslarimizni e'lon qilishimiz mumkin, ammo bu boshqa voqea.
Oldindan belgilangan narsalarga qo'shimcha ravishda, biz o'zgaruvchilarimizning qiymatlarini almashtirishimiz mumkin (men buni yuqoridagi kodda allaqachon ishlatganman). Keling, yarataylik Admin/Variables bir nechta narsa:

Siz foydalanishingiz mumkin bo'lgan hamma narsa:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Qiymat skaler bo'lishi mumkin yoki JSON ham bo'lishi mumkin. JSON holatida:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}faqat kerakli kalitga yo'lni ishlating: {{ var.json.bot_config.bot.token }}.
Men tom ma'noda bitta so'z aytaman va bitta skrinshotni ko'rsataman ulanishlar. Bu erda hamma narsa oddiy: sahifada Admin/Connections biz ulanishni yaratamiz, u erda loginlar / parollar va aniqroq parametrlarni qo'shamiz. Mana bunday:

Parollar shifrlangan bo'lishi mumkin (standartdan ko'ra chuqurroq) yoki siz ulanish turini qoldirishingiz mumkin (men uchun qilganim kabi) tg_main) - Gap shundaki, turlar ro'yxati Airflow modellarida mahkamlangan va uni manba kodlariga kirmasdan kengaytirib bo'lmaydi (agar men birdaniga Google'da biror narsa topmagan bo'lsam, meni to'g'rilang), lekin hech narsa bizga kredit olishimizga to'sqinlik qilmaydi. nomi.
Bundan tashqari, bir xil nom bilan bir nechta ulanishlarni amalga oshirishingiz mumkin: bu holda, usul BaseHook.get_connection(), qaysi bizga nomi bilan bog'lanishlarni oladi, beradi tasodifiy bir nechta nomlardan (Round Robin qilish mantiqiyroq bo'lar edi, lekin keling, buni Airflow ishlab chiqaruvchilarining vijdoniga qoldiraylik).
O‘zgaruvchilar va ulanishlar, albatta, ajoyib vositalardir, lekin muvozanatni yo‘qotmaslik muhim: oqimlaringizning qaysi qismlarini kodning o‘zida saqlaysiz va qaysi qismlarni saqlash uchun Airflow’ga berasiz. Bir tomondan, UI orqali qiymatni, masalan, pochta qutisini tezda o'zgartirish qulay bo'lishi mumkin. Boshqa tomondan, bu hali ham biz (men) qutulmoqchi bo'lgan sichqonchani bosishga qaytish.
Bog'lanishlar bilan ishlash vazifalardan biridir ilgaklar. Umuman olganda, havo oqimi kancalari uni uchinchi tomon xizmatlari va kutubxonalariga ulash uchun nuqtalardir. Masalan, JiraHook Jira bilan ishlashimiz uchun mijoz ochadi (siz vazifalarni oldinga va orqaga ko'chirishingiz mumkin) va SambaHook mahalliy faylni surish mumkin smb-nuqta.
Maxsus operatorni tahlil qilish
Va biz uning qanday yaratilganini ko'rib chiqishga yaqinlashdik TelegramBotSendMessage
da commons/operators.py haqiqiy operator bilan:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Bu erda, Airflow-dagi hamma narsa kabi, hamma narsa juda oddiy:
- dan meros
BaseOperator, bu havo oqimiga xos bo'lgan bir nechta narsalarni amalga oshiradi (bo'sh vaqtingizga qarang) - E'lon qilingan maydonlar
template_fields, unda Jinja ishlov berish uchun makrolarni qidiradi. - uchun to'g'ri dalillarni tashkil qildi
__init__(), kerak bo'lganda standart sozlamalarni o'rnating. - Biz ajdodni ishga tushirishni ham unutmadik.
- Tegishli kancani ochdi
TelegramBotHookundan mijoz ob'ektini oldi. - Qayta belgilangan (qayta belgilangan) usul
BaseOperator.execute(), qaysi Airfow operatorni ishga tushirish vaqti kelganda silkitadi - unda biz tizimga kirishni unutib, asosiy harakatni amalga oshiramiz. (Aytgancha, biz tizimga kiramizstdoutиstderr- Havo oqimi hamma narsani ushlab turadi, uni chiroyli tarzda o'rab oladi va kerak bo'lganda parchalaydi.)
Keling, bizda nima borligini ko'rib chiqaylik commons/hooks.py. Faylning birinchi qismi, kancaning o'zi bilan:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientBu erda nima tushuntirishni ham bilmayman, faqat muhim fikrlarni ta'kidlayman:
- Biz meros olamiz, argumentlar haqida o'ylaymiz - aksariyat hollarda bu bitta bo'ladi:
conn_id; - Standart usullarni bekor qilish: men o'zimni chekladim
get_conn(), unda men ulanish parametrlarini nom bilan olaman va faqat bo'limni olamanextra(bu JSON maydoni), men (o'zimning ko'rsatmalarimga ko'ra!) Telegram bot tokenini joylashtirdim:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Men bizning misolimizni yarataman
TelegramBot, unga aniq belgi berish.
Ana xolos. Siz mijozni kancadan foydalanib olishingiz mumkin TelegramBotHook().clent yoki TelegramBotHook().get_conn().
Va faylning ikkinchi qismi, unda men Telegram REST API uchun mikroo'ram yasayman, xuddi shunday sudrab ketmaslik uchun. bitta usul uchun sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))To'g'ri yo'l hammasini qo'shishdir:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- plaginda umumiy omborga qo'ying va uni Open Source-ga bering.
Bularning barchasini o'rganayotganimizda, bizning hisobot yangilanishlarimiz muvaffaqiyatsiz tugadi va menga kanalda xato xabari yubordi. Noto'g'ri yoki yo'qligini tekshirib ko'raman...

Dogimizda nimadir buzildi! Bu biz kutgan narsa emasmi? Aynan!
Siz quymoqchimisiz?
Nimanidir sog'indim deb o'ylaysizmi? U SQL Serverdan Vertica ga ma'lumotlarni o'tkazishga va'da bergan shekilli, keyin uni olib, mavzudan uzoqlashdi, harom!
Bu vahshiylik qasddan qilingan, men siz uchun ba'zi atamalarni tushunishim kerak edi. Endi siz uzoqroqqa borishingiz mumkin.
Bizning rejamiz shunday edi:
- Dog
- Vazifalarni yaratish
- Hammasi qanchalik go'zal ekanligini ko'ring
- To'ldirish uchun seans raqamlarini belgilang
- SQL Serverdan ma'lumotlarni oling
- Vertica-ga ma'lumotlarni joylashtiring
- Statistikani yig'ish
Shunday qilib, bularning barchasini ishga tushirish uchun men kichik bir qo'shimcha qildim docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyU erda biz ko'taramiz:
- Vertica xost sifatida
dwheng standart sozlamalar bilan, - SQL Serverning uchta nusxasi,
- ikkinchisidagi ma'lumotlar bazalarini ba'zi ma'lumotlar bilan to'ldiramiz (hech qanday holatda qaramang
mssql_init.py!)
Biz barcha yaxshi narsalarni o'tgan safarga qaraganda biroz murakkabroq buyruq yordamida ishga tushiramiz:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Bizning mo''jizaviy tasodifiy yaratgan narsangizdan foydalanishingiz mumkin Data Profiling/Ad Hoc Query:

Asosiysi, buni tahlilchilarga ko'rsatmaslik
batafsil bayon qiling ETL seanslari Men buni qilmayman, u erda hamma narsa ahamiyatsiz: biz poydevor yaratamiz, unda belgi bor, biz hamma narsani kontekst menejeri bilan o'rab olamiz va endi buni qilamiz:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passVaqt keldi ma'lumotlarimizni yig'ish bir yarim yuz stolimizdan. Keling, buni juda oddiy chiziqlar yordamida qilaylik:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Kanca yordamida biz havo oqimidan olamiz
pymssql-ulanmoq - Keling, so'rovga sana ko'rinishidagi cheklovni almashtiramiz - u shablon mexanizmi tomonidan funktsiyaga tashlanadi.
- Bizning so'rovimizni oziqlantirish
pandasbizni kim oladiDataFrame- kelajakda biz uchun foydali bo'ladi.
Men almashtirishdan foydalanaman
{dt}so'rov parametri o'rniga%sMen yovuz Pinokkio ekanligim uchun emas, balkipandaschiday olmaymanpymssqlva oxirgisini siljitadiparams: ListGarchi u haqiqatan ham xohlasatuple.
Bundan tashqari, ishlab chiquvchi ekanligini unutmangpymssqlendi uni qo'llab-quvvatlamaslikka qaror qildi va ketish vaqti keldipyodbc.
Keling, Airflow bizning funktsiyalarimizning argumentlarini nima bilan to'ldirganini ko'rib chiqaylik:

Agar ma'lumot bo'lmasa, unda davom etishning ma'nosi yo'q. Ammo to'ldirishni muvaffaqiyatli deb hisoblash ham g'alati. Lekin bu xato emas. A-a-a, nima qilish kerak?! Va mana nima:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException Airflow-ga hech qanday xatolik yo'qligini aytadi, lekin biz vazifani o'tkazib yuboramiz. Interfeys yashil yoki qizil kvadratga ega bo'lmaydi, lekin pushti.
Keling, ma'lumotlarimizni tashlaymiz bir nechta ustunlar:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Aynan:
- Biz buyurtma olgan ma'lumotlar bazasi,
- Bizning suv toshqini seansining identifikatori (bu boshqacha bo'ladi har bir vazifa uchun),
- Manba va buyurtma identifikatoridan olingan xesh - yakuniy ma'lumotlar bazasida (hamma narsa bitta jadvalga quyiladi) bizda noyob buyurtma identifikatori mavjud.
Oxirgi qadam qoladi: hamma narsani Vertica-ga quying. Va, g'alati, buni qilishning eng ajoyib va samarali usullaridan biri bu CSV orqali!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Biz maxsus qabul qilgich yasaymiz
StringIO. pandasmarhamat bilan qo'yamizDataFramesifatidaCSV-chiziqlar.- Keling, sevimli Vertica-ga kanca yordamida ulanishni ochaylik.
- Va endi yordam bilan
copy()ma'lumotlarimizni to'g'ridan-to'g'ri Vertikaga yuboring!
Biz haydovchidan qancha qator to'ldirilganligini olamiz va seans menejeriga hammasi joyida ekanligini aytamiz:
session.loaded_rows = cursor.rowcount
session.successful = TrueHammasi shu.
Savdoda biz maqsadli plastinkani qo'lda yaratamiz. Bu erda men o'zimga kichik mashinaga ruxsat berdim:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Men foydalanaman
VerticaOperator()Men ma'lumotlar bazasi sxemasini va jadvalni yarataman (agar ular allaqachon mavjud bo'lmasa, albatta). Asosiysi, bog'liqliklarni to'g'ri tartibga solish:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadXulosa
- Xo'sh, - dedi kichkina sichqon, - shunday emasmi, endi
Men o'rmondagi eng dahshatli hayvon ekanligimga ishonchingiz komilmi?
Julia Donaldson, Gruffalo
O'ylaymanki, agar mening hamkasblarim va men raqobatdosh bo'lsak: kim tezda ETL jarayonini noldan yaratadi va ishga tushiradi: ular SSIS va sichqoncha bilan va men Airflow bilan ... Va keyin biz texnik xizmat ko'rsatish qulayligini ham solishtiramiz ... Voy, men ularni barcha jabhalarda mag'lub etishimga rozi bo'lasiz deb o'ylayman!
Agar biroz jiddiyroq bo'lsa, unda Apache Airflow - jarayonlarni dastur kodi ko'rinishida tasvirlab, mening vazifamni bajardi. juda ko'p yanada qulay va yoqimli.
Uning cheksiz kengaytirilishi, ham plaginlar, ham miqyoslilikka moyilligi sizga Airflow-dan deyarli har qanday sohada foydalanish imkoniyatini beradi: hatto ma'lumotlarni to'plash, tayyorlash va qayta ishlashning to'liq siklida, hattoki raketalarni (Marsga, XNUMX-yilgacha) uchirishda ham. kurs).
Yakuniy qism, ma'lumotnoma va ma'lumot
Biz siz uchun to'plagan rake
start_date. Ha, bu allaqachon mahalliy mem. Dagning asosiy argumenti orqalistart_datehammasi o'tadi. Qisqacha aytganda, agar siz belgilasangizstart_datejoriy sana vaschedule_interval- bir kun, keyin DAG ertaga erta boshlanadi.start_date = datetime(2020, 7, 7, 0, 1, 2)Va boshqa muammolar yo'q.
U bilan bog'liq yana bir ish vaqti xatosi mavjud:
Task is missing the start_date parameter, bu ko'pincha dag operatoriga ulanishni unutganingizni ko'rsatadi.- Hammasi bitta mashinada. Ha, va bazalar (Airflow o'zi va bizning qoplamamiz) va veb-server, rejalashtiruvchi va ishchilar. Va hatto ishladi. Ammo vaqt o'tishi bilan xizmatlar uchun vazifalar soni ko'paydi va PostgreSQL indeksga 20 ms o'rniga 5 soniyada javob bera boshlaganida, biz uni olib, olib ketdik.
- Mahalliy ijrochi. Ha, biz hali ham uning ustida o'tiribmiz va biz allaqachon tubsizlik chetiga etib kelganmiz. LocalExecutor biz uchun hozirgacha yetarli edi, lekin endi kamida bitta ishchi bilan kengaytirish vaqti keldi va biz CeleryExecutor-ga o'tish uchun ko'p mehnat qilishimiz kerak. Va siz u bilan bitta mashinada ishlashingiz mumkinligini hisobga olsak, Seldereyni hatto serverda ham ishlatishingizga hech narsa to'sqinlik qilmaydi, bu "albatta, hech qachon ishlab chiqarishga kirmaydi!"
- Foydalanilmaslik o'rnatilgan asboblar:
- Aloqalar xizmat ma'lumotlarini saqlash uchun,
- SLA Misses o'z vaqtida bajarilmagan vazifalarga javob berish;
- xcom metadata almashinuvi uchun (men aytdim metama'lumotlar!) dag vazifalari orasida.
- Pochtani suiiste'mol qilish. Xo'sh, nima deyishim mumkin? Tushgan vazifalarning barcha takrorlanishi uchun ogohlantirishlar o'rnatildi. Endi mening ishim Gmail-da Airflow-dan 90 mingdan ortiq elektron pochta xabarlari bor va veb-pochta tumshug'i bir vaqtning o'zida 100 dan ortiq xatlarni olish va o'chirishni rad etadi.
Ko'proq tuzoqlar:
Ko'proq avtomatlashtirish vositalari
Biz qo'llarimiz bilan emas, balki boshimiz bilan ko'proq ishlashimiz uchun Airflow biz uchun quyidagilarni tayyorladi:
- - u hali ham Eksperimental maqomiga ega, bu uning ishlashiga to'sqinlik qilmaydi. Uning yordamida siz nafaqat daglar va vazifalar haqida ma'lumot olishingiz, balki dagni to'xtatish/boshlash, DAG Run yoki hovuz yaratishingiz mumkin.
- - Buyruqlar qatori orqali ko'plab vositalar mavjud bo'lib, ular nafaqat WebUI orqali foydalanish uchun noqulay, balki umuman yo'q. Masalan:
backfillvazifa misollarini qayta ishga tushirish uchun kerak.
Masalan, tahlilchilar kelib: “Siz, o‘rtoq, 1-yanvardan 13-yanvargacha bo‘lgan ma’lumotlarda bema’ni gaplar bor! To‘g‘rila, tuzat, tuzat, tuzat!” Va siz shunday pechkasiz:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Asosiy xizmat:
initdb,resetdb,upgradedb,checkdb. run, bu sizga bitta misol topshirig'ini bajarishga va hatto barcha bog'liqliklar bo'yicha ball olishga imkon beradi. Bundan tashqari, siz uni orqali ishga tushirishingiz mumkinLocalExecutor, Agar sizda Selderey klasteri bo'lsa ham.- Deyarli bir xil ishni qiladi
test, faqat bazalarda ham hech narsa yozmaydi. connectionsqobiqdan ulanishlarni ommaviy yaratish imkonini beradi.
- - plaginlar uchun mo'ljallangan va kichik qo'llar bilan aralashmaslik uchun o'zaro ta'sir qilishning juda qiyin usuli. Ammo bizni borishimizga kim to'sqinlik qiladi
/home/airflow/dags, yugurishipythonva chalkashishni boshlaysizmi? Siz, masalan, quyidagi kod bilan barcha ulanishlarni eksport qilishingiz mumkin:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Airflow metama'lumotlar bazasiga ulanish. Men unga yozishni tavsiya etmayman, lekin har qanday maxsus ko'rsatkichlar uchun vazifa holatlarini olish har qanday API-dan foydalanishdan ko'ra tezroq va osonroq bo'lishi mumkin.
Aytaylik, bizning barcha vazifalarimiz idempotent emas, lekin ular ba'zan tushishi mumkin va bu normal holat. Ammo bir nechta blokirovkalar allaqachon shubhali va tekshirish kerak bo'ladi.
SQLdan ehtiyot bo'ling!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Manbalar
Va, albatta, Google tomonidan chiqarilgan dastlabki o'nta havola - bu mening xatcho'plarimdagi Airflow papkasining mazmuni.
- - Albatta, biz ofisdan boshlashimiz kerak. hujjatlar, lekin ko'rsatmalarni kim o'qiydi?
- - Xo'sh, hech bo'lmaganda ijodkorlarning tavsiyalarini o'qing.
- - eng boshlanishi: rasmlardagi foydalanuvchi interfeysi
- - asosiy tushunchalar yaxshi tasvirlangan, agar (to'satdan!) Mendan biror narsani tushunmadingiz.
- - havo oqimi klasterini o'rnatish uchun qisqacha qo'llanma.
- - deyarli bir xil qiziqarli maqola, ehtimol ko'proq rasmiyatchilik va kamroq misollardan tashqari.
- - Selderey bilan birgalikda ishlash haqida.
- - vazifalarning noaniqligi, sana o'rniga ID tomonidan yuklash, transformatsiya, fayl tuzilishi va boshqa qiziqarli narsalar haqida.
- - men faqat o'tishda aytib o'tgan vazifalar va Trigger qoidasiga bog'liqlik.
- - rejalashtiruvchida ba'zi "ishlarni mo'ljallanganidek" qanday engish, yo'qolgan ma'lumotlarni yuklash va vazifalarni ustuvorlashtirish.
- - havo oqimi metama'lumotlariga foydali SQL so'rovlari.
- - maxsus sensorni yaratish haqida foydali bo'lim mavjud.
- — Data Science uchun AWS-da infratuzilmani yaratish haqida qiziqarli qisqacha eslatma.
- - keng tarqalgan xatolar (kimdir hali ham ko'rsatmalarni o'qimasa).
- - odamlar parollarni saqlashga qanday tabassum qilyaptilar, garchi siz shunchaki ulanishlardan foydalanishingiz mumkin.
- - yashirin DAG yo'nalishi, funktsiyalarni kontekstga kiritish, yana bog'liqliklar haqida, shuningdek, topshiriqlarni ishga tushirishni o'tkazib yuborish haqida.
- - foydalanish haqida
default argumentsиparamsshablonlarda, shuningdek, o'zgaruvchilar va ulanishlarda. - - rejalashtiruvchi Airflow 2.0 ga qanday tayyorlanayotgani haqidagi hikoya.
- - bizning klasterimizni joylashtirish haqida biroz eskirgan maqola
docker-compose. - - andozalar va kontekstni yo'naltirish yordamida dinamik vazifalar.
- — pochta va Slack orqali standart va moslashtirilgan bildirishnomalar.
- - Tarmoqli vazifalar, makroslar va XCom.
Va maqolada ishlatiladigan havolalar:
- - shablonlarda foydalanish mumkin bo'lgan to'ldiruvchilar.
- - Daglarni yaratishda keng tarqalgan xatolar.
- -
docker-composetajriba, disk raskadrovka va boshqalar uchun. - — Telegram REST API uchun Python o'rami.
Manba: www.habr.com




