Sawubona, ngingu-Dmitry Logvinenko - Unjiniyela Wedatha Womnyango Wokuhlaziya weqembu le-Vezet lezinkampani.
Ngizokutshela ngethuluzi elihle lokuthuthukisa izinqubo ze-ETL - Apache Airflow. Kodwa i-Airflow iguquguquka futhi inezici eziningi kangangokuthi kufanele uyibhekisise ngisho noma ungabandakanyeki ekugelezeni kwedatha, kodwa unesidingo sokuvula noma yiziphi izinqubo futhi uqaphe ukusebenza kwazo.
Futhi yebo, ngeke ngitshele kuphela, kodwa futhi ngibonise: uhlelo lunekhodi eningi, izithombe-skrini nezincomo.

Ovame ukukubona uma usebenzisa i-google igama elithi Airflow / Wikimedia Commons
Uhlu lokuqukethwe
Isingeniso
I-Apache Airflow ifana ne-Django:
- ebhalwe nge-python
- kunephaneli enkulu yokuphatha,
- enwebekayo unomphela
- okungcono kuphela, futhi yenzelwe izinjongo ezihluke ngokuphelele, okungukuthi (njengoba kubhaliwe ngaphambi kwe-kat):
- ukusebenza nokuqapha imisebenzi enanini elingenamkhawulo lemishini (njengoba abaningi beCelery / Kubernetes nonembeza wakho uzokuvumela)
- ngesizukulwane sokuhamba komsebenzi esiguqukayo esivela kulula kakhulu ukubhala nokuqonda ikhodi yePython
- kanye nekhono lokuxhuma noma iyiphi i-database nama-API komunye nomunye kusetshenziswa kokubili izingxenye esezilungile nama-plugin enziwe ekhaya (okuyinto elula kakhulu).
Sisebenzisa i-Apache Airflow kanje:
- siqoqa idatha emithonjeni ehlukahlukene (izimo eziningi ze-SQL Server ne-PostgreSQL, ama-API ahlukahlukene anamamethrikhi ohlelo lokusebenza, ngisho no-1C) ku-DWH ne-ODS (sine-Vertica ne-Clickhouse).
- kuthuthuke kangakanani
cron, eqala izinqubo zokuhlanganisa idatha ku-ODS, futhi iphinde igade ukugcinwa kwayo.
Kuze kube muva nje, izidingo zethu bezimbozwe iseva eyodwa encane enama-cores angama-32 kanye ne-50 GB ye-RAM. Ku-Airflow, lokhu kusebenza:
- more 200 amagremu (empeleni ukuhamba komsebenzi, lapho sihlohle khona imisebenzi),
- ngayinye ngokwesilinganiso 70 imisebenzi,
- lobu buhle buqala (futhi ngokwesilinganiso) kanye ngehora.
Futhi mayelana nendlela esandise ngayo, ngizobhala ngezansi, kodwa manje ake sichaze i-über-inkinga esizoyixazulula:
Kunemithombo emithathu yamaseva e-SQL, ngayinye enemininingwane yolwazi engama-50 - izehlakalo zephrojekthi eyodwa, ngokulandelana, inesakhiwo esifanayo (cishe yonke indawo, i-mua-ha-ha), okusho ukuthi ngayinye inetafula lama-oda (ngenhlanhla, itafula elinalokho. Igama lingaphushwa kunoma yiliphi ibhizinisi). Sithatha idatha ngokwengeza izinkambu zesevisi (iseva yomthombo, isizindalwazi somthombo, i-ID yomsebenzi we-ETL) bese siziphonsela phakathi, sithi, Vertica.
Hamba!
Ingxenye eyinhloko, ephathekayo (kanye nethiyori encane)
Kungani thina (nawe)
Lapho izihlahla zazinkulu futhi ngangilula SQL-schik ekuthengiseni okukodwa kwaseRussia, sikhwabanise izinqubo ze-ETL aka ukugeleza kwedatha sisebenzisa amathuluzi amabili atholakalayo kithi:
- Isikhungo samandla se-Informatica - isistimu esabalalisa ngokwedlulele, ekhiqiza kakhulu, enehadiwe yayo, inguqulo yayo. Ngasebenzisa uNkulunkulu angavumeli 1% yamakhono ayo. Kungani? Okokuqala, lesi sikhombimsebenzisi, endaweni ethile kusukela kuma-380s, sasicindezela ngokwengqondo. Okwesibili, le contraption yakhelwe izinqubo eziwubukhazikhazi kakhulu, ukuphinda kusetshenziswe ingxenye ethukuthele namanye amaqhinga ebhizinisi abaluleke kakhulu. Mayelana neqiniso lokuthi kubiza, njengophiko lwe-Airbus AXNUMX / ngonyaka, ngeke sisho lutho.
Qaphela, isithombe-skrini singalimaza abantu abangaphansi kweminyaka engama-30 kancane

- Iseva ye-SQL yokuhlanganisa iseva - sisebenzise leli qabane ekugelezeni kwethu kwangaphakathi kwephrojekthi. Nokho, eqinisweni: sesivele sisebenzisa i-SQL Server, futhi kungaba okungenangqondo ngandlela thile ukungasebenzisi amathuluzi ayo e-ETL. Konke okukuyo kuhle: kokubili isikhombimsebenzisi sihle, futhi inqubekelaphambili ibika ... Kodwa akusona isizathu sokuthi sithanda imikhiqizo yesofthiwe, oh, hhayi ngalokhu. Inguqulo
dtsx(okuyi-XML enamanodi ashiyiwe ekulondolozweni) singakwazi, kodwa yini iphuzu? Kuthiwani ngokwenza iphakethe lomsebenzi elizodonsa amakhulu amatafula lisuka kwesinye iseva liye kwenye? Yebo, ikhulu elingakanani, umunwe wakho wenkomba uzowa ezingxenyeni ezingamashumi amabili, ngokuchofoza inkinobho yegundane. Kodwa ngokuqinisekile kubukeka kuyimfashini kakhulu:
Ngokuqinisekile safuna izindlela zokuphuma. Icala elilinganayo cishe ifike kujeneretha wephakheji ye-SSIS ozibhalele yona...
…bese ngathola umsebenzi omusha. Futhi i-Apache Airflow yangifica kuyo.
Lapho ngithola ukuthi izincazelo zenqubo ye-ETL ziyikhodi yePython elula, angizange ngidansele injabulo. Lena yindlela ukusakazwa kwedatha okwahunyushwa ngayo futhi kwahlukaniswa ngayo, futhi ukuthulula amatafula anesakhiwo esisodwa kusuka kusizindalwazi esingamakhulu kuya kuthagethi eyodwa kwaba indaba yekhodi yePython esikrinini esisodwa nesigamu noma ezimbili ezingu-13 ”.
Ukuhlanganisa iqoqo
Masingahleli inkulisa ngokuphelele, futhi singakhulumi ngezinto ezisobala ngokuphelele lapha, njengokufaka i-Airflow, isizindalwazi sakho esikhethiwe, isilimo esidliwayo esinamagatsha anamanzi namanye amacala achazwe emadokodweni.
Ukuze siqale ngokushesha ukuhlola, ngidwebe docker-compose.yml lapho:
- Asiphakamise ngempela Ukungena komoya: Isihleli, Webserver. I-Flower izophinde ijikeleze lapho ukuze iqaphe imisebenzi ye-Celery (ngoba isivele iphushelwe kukho
apache/airflow:1.10.10-python3.7, kodwa asinankinga) - I-PostgreSQL, lapho i-Airflow izobhala khona ulwazi lwayo lwesevisi (idatha yeshejuli, izibalo zokwenziwa, njll.), futhi I-Celery izomaka imisebenzi eqediwe;
- Redis, ozosebenza njenge-task broker ye-Celery;
- Isisebenzi se-Celery, ezobe ibambe iqhaza ekusebenzeni okuqondile kwemisebenzi.
- Kufolda
./dagssizofaka amafayela ethu nencazelo yama-dags. Zizolandwa empukaneni, ngakho-ke asikho isidingo sokuhlanganisa sonke isitaki ngemva kokuthimula ngakunye.
Kwezinye izindawo, ikhodi esezibonelweni ayibonisiwe ngokuphelele (ukuze ingahlanganisi umbhalo), kodwa endaweni ethile iguqulwa inqubo. Izibonelo zekhodi yokusebenza ephelele zingatholakala endaweni yokugcina .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerAmanothi:
- Emhlanganweni wokuqanjwa, ngangithembele kakhulu esithombeni esaziwayo - qiniseka ukuthi uyayihlola. Mhlawumbe awudingi okunye empilweni yakho.
- Zonke izilungiselelo ze-Airflow azitholakali kuphela nge
airflow.cfg, kodwa nangezinto eziguquguqukayo zemvelo (ngibonga abathuthukisi), engisebenzise ngokungalungile. - Ngokwemvelo, akulungile ukukhiqizwa: ngamabomu angizange ngibeke ukushaya kwenhliziyo ezitsheni, angizange ngizihluphe ngokuphepha. Kodwa ngenze ubuncane obufanele abahloli bethu.
- Qaphela ukuthi:
- Ifolda ye-dag kufanele ifinyeleleke kubo bobabili abahleli kanye nabasebenzi.
- Okufanayo kusebenza kuyo yonke imitapo yolwazi evela eceleni - kufanele yonke ifakwe emishinini enomhleli nabasebenzi.
Nokho, manje kulula:
$ docker-compose up --scale worker=3Ngemuva kokuthi yonke into isivele, ungabheka izixhumanisi zewebhu:
- Ukungena komoya:
- Imbali:
Imiqondo eyisisekelo
Uma ubungaqondi lutho kuzo zonke lezi “dags”, nasi isichazamazwi esifushane:
- Umhleli - umalume obaluleke kakhulu ku-Airflow, olawula ukuthi amarobhothi asebenza kanzima, hhayi umuntu: uqapha ishejuli, ubuyekeza ama-dags, wethula imisebenzi.
Ngokuvamile, ezinguqulweni ezindala, wayenenkinga yenkumbulo (cha, hhayi i-amnesia, kodwa ukuvuza) futhi ipharamitha yefa yahlala ngisho kuzilungiselelo.
run_duration- isikhawu sokuqalisa kabusha. Kodwa manje konke kuhamba kahle. - DAG (aka "dag") - "igrafu ye-acyclic eqondisiwe", kodwa incazelo enjalo izotshela abantu abambalwa, kodwa empeleni iyisitsha semisebenzi exhumanayo (bona ngezansi) noma i-analogue yePhakheji ku-SSIS kanye nokugeleza komsebenzi ku-Informatica .
Ngaphezu kwama-dags, kungase kube khona ama-subdags, kodwa cishe ngeke sifike kuwo.
- I-DAG Run - i-dag eqalisiwe, eyabelwe yona
execution_date. Ama-dagrans we-dag efanayo angasebenza ngokuhambisana (uma wenze imisebenzi yakho yaba yinto engenamsebenzi, kunjalo). - I-Operator izingcezu zekhodi ezinesibopho sokwenza isenzo esithile. Kunezinhlobo ezintathu zama-opharetha:
- isenzonjengentandokazi yethu
PythonOperator, engasebenzisa noma iyiphi (evumelekile) ikhodi yePython; - ukudluliselwa, ehambisa idatha isuka endaweni iye kwenye, ithi,
MsSqlToHiveTransfer; - inzwa ngakolunye uhlangothi, kuzokuvumela ukuthi usabele noma wehlise ijubane ukuqhubeka kwe-dag kuze kube yilapho kwenzeka umcimbi.
HttpSensoringadonsa iphoyinti lokugcina elishiwo, futhi lapho impendulo oyifunayo ilindile, qala ukudlulisaGoogleCloudStorageToS3Operator. Umqondo othanda ukwazi uzobuza: “ngani? Phela, ungenza izimpinda khona kanye ku-opharetha!” Futhi-ke, ukuze ungavimbeli inqwaba yemisebenzi nabaqhubi abamisiwe. Inzwa iqala, ihlole futhi ife ngaphambi komzamo olandelayo.
- isenzonjengentandokazi yethu
- Umsebenzi - ama-opharetha amenyezelwe, kungakhathaliseki ukuthi hlobo luni, futhi anamathiselwe ku-dag anyuselwa ezingeni lomsebenzi.
- isibonelo somsebenzi - lapho umhleli ojwayelekile enquma ukuthi sekuyisikhathi sokuthumela imisebenzi empini kubasebenzi abasebenza (khona lapho, uma sisebenzisa
LocalExecutornoma ku-node eyihlane uma kwenzekaCeleryExecutor), ibanika umongo (okungukuthi, isethi yezinto eziguquguqukayo - amapharamitha wokwenza), inweba umyalo noma izifanekiso zemibuzo, futhi izihlanganise.
Senza imisebenzi
Okokuqala, ake siveze uhlelo olujwayelekile lwe-doug yethu, bese sizongena emininingwaneni ngokwengeziwe, ngoba sisebenzisa izixazululo ezingezona ezincane.
Ngakho-ke, ngendlela elula, i-dag enjalo izobukeka kanje:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Ake sikuthole:
- Okokuqala, singenisa ama-libs adingekayo kanye Okunye;
sql_server_dsIngabeList[namedtuple[str, str]]ngamagama okuxhunywa ku-Airflow Connections kanye nemininingwane yolwazi esizothatha kuyo ipuleti lethu;dag- isimemezelo se-dag yethu, okumele ibe phakathiglobals(), ngaphandle kwalokho i-Airflow ngeke iyithole. U-Doug naye udinga ukuthi:- ubani igama lakhe
orders- leli gama lizovela kusixhumi esibonakalayo sewebhu, - ukuthi uzosebenza kusukela phakathi kwamabili mhlaka XNUMX kuJulayi,
- futhi kufanele isebenze, cishe njalo emahoreni ayi-6 (kubantu abaqinile lapha esikhundleni salokho
timedelta()eyamukelekayocron- umugqa0 0 0/6 ? * * *, kokupholile - isisho esifana ne@daily);
- ubani igama lakhe
workflow()uzokwenza umsebenzi oyinhloko, kodwa hhayi manje. Okwamanje, sizovele silahle umongo wethu kulogi.- Futhi manje umlingo olula wokudala imisebenzi:
- sigijima emithonjeni yethu;
- qala
PythonOperator, okuzokwenza i-dummy yethuworkflow(). Ungakhohlwa ukucacisa igama eliyingqayizivele (ngaphakathi kwedag) lomsebenzi futhi ubophe insangu ngokwayo. Hlaba umkhosiprovide_contextyona, izothela izimpikiswano ezengeziwe emsebenzini, esizoziqoqa ngokucophelela sisebenzisa**context.
Okwamanje, yilokho kuphela. Esikutholile:
- i-dag entsha kusixhumi esibonakalayo sewebhu,
- imisebenzi eyikhulu nengxenye ezokwenziwa ngokuhambisana (uma i-Airflow, izilungiselelo ze-Celery kanye nomthamo weseva zikuvumela).
Hhayi-ke, cishe ngiyitholile.

Ubani ozofaka okuncikile?
Ukwenza yonke le nto ibe lula, ngingene ngaphakathi docker-compose.yml ukucubungula requirements.txt kuwo wonke ama-node.
Manje isihambile:

Izikwele ezimpunga yizenzakalo zomsebenzi ezicutshungulwa isihleli.
Silinda kancane, imisebenzi ifinyezwa ngabasebenzi:

Abaluhlaza, kunjalo, bawuqedile ngempumelelo umsebenzi wabo. Okubomvu akuphumelelanga kakhulu.
Phela, ayikho ifolda ku-prod yethu
./dags, akukho ukuvumelanisa phakathi kwemishini - wonke ama-dags alele phakathigitku-Gitlab yethu, futhi i-Gitlab CI isabalalisa izibuyekezo emishinini lapho ihlanganamaster.
Okuncane ngoFlower
Ngenkathi abasebenzi beshaya ama-pacifiers ethu, masikhumbule elinye ithuluzi elingasibonisa okuthile - Imbali.
Ikhasi lokuqala elinolwazi olufingqiwe kumanodi omsebenzi:

Ikhasi eliqine kakhulu elinemisebenzi eqale ukusebenza:

Ikhasi elibhora kakhulu elinesimo somthengisi wethu:

Ikhasi elikhanyayo linamagrafu esimo somsebenzi kanye nesikhathi sawo sokwenza:

Silayisha ngaphansi
Ngakho-ke, yonke imisebenzi isisebenzile, ungathwala abalimele.

Futhi kwakukhona abaningi abalimele - ngesizathu esisodwa noma esinye. Esimeni sokusetshenziswa okufanele kwe-Airflow, zona kanye lezi zikwele zibonisa ukuthi idatha ayizange ifike.
Udinga ukubuka ilogu futhi uqale kabusha izimo zomsebenzi eziwile.
Ngokuchofoza kunoma yisiphi isikwele, sizobona izenzo ezitholakala kithi:

Ungathatha futhi wenze Sula abawile. Okusho ukuthi, siyakhohlwa ukuthi kukhona okuhlulekile lapho, futhi umsebenzi ofanayo wesibonelo uzoya kumhleli.

Kuyacaca ukuthi ukwenza lokhu ngegundane ngazo zonke izikwele ezibomvu akubona ubuntu - lokhu akukhona esikulindele ku-Airflow. Ngokwemvelo, sinezikhali zokucekela phansi okukhulu: Browse/Task Instances

Masikhethe yonke into ngesikhathi esisodwa bese sisetha kabusha kuqanda, chofoza into elungile:

Ngemva kokuhlanza, amatekisi ethu abukeka kanje (asevele alinde umhleli ukuthi awahlele):

Izixhumanisi, izingwegwe nezinye eziguquguqukayo
Isikhathi sokubheka i-DAG elandelayo, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ingabe wonke umuntu wake wenza isibuyekezo sombiko? Lona futhi: kukhona uhlu lwemithombo lapho ungathola khona idatha; kukhona uhla lapho kubekwa khona; ungakhohlwa ukushaya i-honk lapho konke kwenzeka noma kwaphuka (kahle, lokhu akukona ngathi, cha).
Ake sihlole ifayela futhi futhi sibheke izinto ezintsha ezingacacile:
from commons.operators import TelegramBotSendMessage- akukho okusivimbelayo ekwenzeni ama-opharetha ethu, esizuze ngawo ngokwenza isembozo esincane sokuthumela imilayezo ku-Unblocked. (Sizokhuluma kabanzi ngalo opharetha ngezansi);default_args={}- i-dag ingasabalalisa izimpikiswano ezifanayo kubo bonke abaqhubi bayo;to='{{ var.value.all_the_kings_men }}'- inkambutongeke sibe namakhodi aqinile, kodwa akhiqizwa ngamandla sisebenzisa i-Jinja kanye noguquko olunohlu lwama-imeyili, engilufake ngokucophelela.Admin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— umbandela wokuqala u-opharetha. Kithina, incwadi izondizela kubaphathi kuphela uma konke ukuncika sekusebenzile ngempumelelo;tg_bot_conn_id='tg_main'- izingxabanoconn_idyamukela ama-ID okuxhumana esiwakha ngawoAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Imiyalezo kuTelegram izondiza ihambe kuphela uma kunemisebenzi ewile;task_concurrency=1- sikwenqabela ukwethulwa kanyekanye kwezimo ezimbalwa zomsebenzi womsebenzi owodwa. Uma kungenjalo, sizothola ukwethulwa okuningana ngesikhathi esisodwaVerticaOperator(ebuka itafula elilodwa);report_update >> [email, tg]- konkeVerticaOperatorhlangana ekuthumeleni izincwadi nemiyalezo, kanje:

Kodwa njengoba opharetha bezaziso benezimo ezihlukene zokuqalisa, eyodwa kuphela ezosebenza. Ku-Tree View, yonke into ibukeka ingabonakali kancane:

Ngizosho amagama ambalwa mayelana amakhro nabangane babo - eziguquguqukayo.
Amamakhro ayizibambi zendawo zamaJinja ezingafaka esikhundleni sokwaziswa okuwusizo okuhlukahlukene zibe izimpikiswano zomqhubi. Ngokwesibonelo, kanje:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} izonwebeka ifinyelele kokuqukethwe okuguquguqukayo komongo execution_date ngefomethi YYYY-MM-DD: 2020-07-14. Ingxenye engcono kakhulu ukuthi okuguquguqukayo komongo kubethelwe endaweni ethile yomsebenzi (isikwele ku-Tree View), futhi lapho kuqalwa kabusha, izimeli zizokhula zibe amanani afanayo.
Amanani abelwe angabukwa kusetshenziswa inkinobho ethi Rendered kusenzakalo ngasinye somsebenzi. Nansi indlela umsebenzi wokuthumela incwadi:

Futhi ngakho-ke emsebenzini ngokuthumela umlayezo:

Uhlu oluphelele lwamamakhro akhelwe ngaphakathi enguqulo yakamuva etholakalayo luyatholakala lapha:
Ngaphezu kwalokho, ngosizo lwama-plugin, singakwazi ukumemezela ama-macros ethu, kodwa leyo enye indaba.
Ngokungeziwe ezintweni ezichazwe ngaphambilini, singashintsha amanani okuguquguqukayo kwethu (sengivele ngisebenzise lokhu kukhodi engenhla). Masidale phakathi Admin/Variables izinto ezimbalwa:

Konke ongakusebenzisa:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Inani lingaba isikali, noma futhi lingaba i-JSON. Uma kwenzeka i-JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}vele usebenzise indlela eya kukhiye oyifunayo: {{ var.json.bot_config.bot.token }}.
Ngizosho igama elilodwa ngokoqobo futhi ngibonise isithombe-skrini esisodwa mayelana ukuxhumana. Konke kuyisisekelo lapha: ekhasini Admin/Connections sakha uxhumano, sengeza ukungena kwethu / amaphasiwedi kanye nemingcele ethize lapho. Kanje:

Amagama ayimfihlo angabethelwa (ngokucophelela kakhulu kunokuzenzakalelayo), noma ungashiya ngaphandle uhlobo lokuxhuma (njengoba ngenzile tg_main) - Iqiniso liwukuthi uhlu lwezinhlobo lufakwe ngokuqinile kumamodeli we-Airflow futhi alukwazi ukunwetshwa ngaphandle kokungena kumakhodi omthombo (uma kungazelelwe ngingazange ngisebenzise okuthile ku-google, ngicela ungilungise), kodwa akukho okuzosivimba ekutholeni amakhredithi igama.
Ungenza futhi ukuxhumana okuningana ngegama elifanayo: kulokhu, indlela BaseHook.get_connection(), esithola ukuxhumana ngegama, izonikeza okungahleliwe kusuka kuma-namesakes amaningana (kungaba okunengqondo kakhulu ukwenza i-Round Robin, kodwa ake sikushiye kunembeza wabathuthukisi be-Airflow).
Okuguquguqukayo Nokuxhumana kungamathuluzi apholile ngempela, kodwa kubalulekile ukuthi ungalahlekelwa ibhalansi: yiziphi izingxenye zokugeleza kwakho ozigcina kukhodi ngokwayo, nokuthi yiziphi izingxenye ozinikeza ku-Airflow ukuze zigcinwe. Ngakolunye uhlangothi, kungaba lula ukushintsha inani ngokushesha, isibonelo, ibhokisi leposi, nge-UI. Ngakolunye uhlangothi, lokhu kusewukubuyisela ekuchofozeni igundane, lapho thina (mina) sasifuna ukukususa.
Ukusebenza ngezixhumanisi kungomunye wemisebenzi izingwegwe. Ngokuvamile, amahhuku e-Airflow angamaphuzu okuyixhuma kumasevisi ezinkampani zangaphandle nemitapo yolwazi. Isb, JiraHook izosivulela iklayenti ukuze sihlanganyele noJira (ungahambisa imisebenzi uye phambili), futhi ngosizo lwe SambaHook ungaphusha ifayela lendawo ku smb- iphuzu.
Ukuhlaziya opharetha ngokwezifiso
Futhi sisondele ekubhekeni ukuthi yenziwa kanjani TelegramBotSendMessage
Ikhodi commons/operators.py no-opharetha wangempela:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Lapha, njengakho konke okunye ku-Airflow, yonke into ilula kakhulu:
- Kuthathwe njengefa
BaseOperator, esebenzisa izinto ezimbalwa eziqondene ne-Airflow (bheka isikhathi sakho sokuphumula) - Izinkambu ezimenyezelwe
template_fields, lapho uJinja ezobheka khona amamakhro azocutshungulwa. - Uhlele izimpikiswano ezifanele
__init__(), setha okumisiwe lapho kudingeka. - Asizange sikukhohlwe nokuqalwa kwedlozi.
- Ivule ihuku elihambisanayo
TelegramBotHookithole into yeklayenti kuyo. - Indlela ekhishiwe (echazwe kabusha).
BaseOperator.execute(), okuyinto i-Airfow ezonyakaza lapho kufika isikhathi sokuqalisa opharetha - kuyo sizosebenzisa isenzo esiyinhloko, sikhohlwe ukungena ngemvume. (Singena, vele, singenestdoutиstderr- I-Airflow izovimba yonke into, iyisonge kahle, ibole lapho kunesidingo.)
Ake sibone esinakho commons/hooks.py. Ingxenye yokuqala yefayela, enehhuku ngokwayo:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientAngazi nokuthi ngizochaza ini lapha, ngizovele ngiphawule amaphuzu abalulekile:
- Sithola ifa, cabanga ngezimpikiswano - ezimweni eziningi kuzoba eyodwa:
conn_id; - Ukweqa izindlela ezijwayelekile: Ngizikhawulele
get_conn(), lapho ngithola khona imingcele yokuxhuma ngegama futhi ngivele ngithole isigabaextra(le inkambu ye-JSON), lapho mina (ngokwemiyalo yami!) ngibeke ithokheni ye-Telegram bot:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ngenza isibonelo sethu
TelegramBot, enikeza uphawu oluthile.
Yilokho kuphela. Ungathola iklayenti usebenzisa ihuku TelegramBotHook().clent noma TelegramBotHook().get_conn().
Futhi ingxenye yesibili yefayela, lapho ngenza khona i-microwrapper yeTelegram REST API, ukuze ngingadonsi okufanayo. ngendlela eyodwa sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Indlela efanele iwukuhlanganisa konke:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- ku-plugin, faka endaweni yokugcina yomphakathi, futhi uyinikeze ku-Open Source.
Ngenkathi sifunda konke lokhu, izibuyekezo zethu zombiko zikwazile ukwehluleka futhi zangithumelela umlayezo wephutha esiteshini. Ngizobheka ukuthi akulungile yini...

Kukhona okwaphuka edojini lethu! Akukhona yini lokho ebesikulindele? Impela!
Uzophalaza?
Ingabe uzizwa ngiphuthelwe okuthile? Kubonakala sengathi uthembise ukudlulisa idatha esuka ku-SQL Server iye ku-Vertica, wabe eseyithatha futhi wayisusa esihlokweni, i-scoundrel!
Lesi sihluku besingenhloso, bekumele ngikucacisele amagama athile. Manje ungadlulela phambili.
Uhlelo lwethu bekungukuthi:
- Yenza dag
- Khiqiza imisebenzi
- Bheka ukuthi kuhle kanjani konke
- Yabela izinombolo zeseshini ezizogcwaliswa
- Thola idatha ku-SQL Server
- Faka idatha ku-Vertica
- Qoqa izibalo
Ngakho-ke, ukuze konke lokhu kusebenze, ngenze isengezo esincane kweyethu docker-compose.yml:
i-docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyLapho siphakamisa:
- UVertica ungumbungazi ohamba phambili
dwhngezilungiselelo ezizenzakalelayo kakhulu, - izimo ezintathu ze-SQL Server,
- sigcwalisa imininingwane ekugcineni ngedatha ethile (noma kunjalo ungabheki
mssql_init.py!)
Sethula konke okuhle ngosizo lomyalo onzima kakhulu kunesikhathi sokugcina:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Okukhiqizwe isimangaliso sethu se-randomizer, ungasebenzisa into Data Profiling/Ad Hoc Query:

Into esemqoka ukuthi ungayibonisi kubahlaziyi
chaza kabanzi Izikhathi ze-ETL Ngeke, yonke into incane lapho: senza isisekelo, kukhona uphawu kuso, sigoqa yonke into ngomphathi komongo, futhi manje senza lokhu:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15iseshini.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passIsikhathi sesifikile qoqa idatha yethu ematafuleni ethu alikhulu nengxenye. Masenze lokhu ngosizo lwemigqa engenasizotha kakhulu:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Ngosizo lwehuku sithola ku-Airflow
pymssql-xhuma - Ake sishintshe ukuvinjelwa ngendlela yedethi esicelweni - sizophonswa kumsebenzi ngenjini yesifanekiso.
- Ukuphakela isicelo sethu
pandasngubani ozositholaDataFrame- kuyoba usizo kithi esikhathini esizayo.
Ngisebenzisa ukufaka esikhundleni
{dt}esikhundleni sepharamitha yesicelo%shhayi ngoba nginguPinocchio omubi, kodwa ngobapandasayikwazi ukuphathapymssqlbese eshelela esokugcinaparams: Listnakuba efuna ngempelatuple.
Futhi qaphela ukuthi unjiniyelapymssqlwanquma ukuthi angabe esameseka, futhi sekuyisikhathi sokuphumapyodbc.
Ake sibone ukuthi i-Airflow ifake ini izingxabano zemisebenzi yethu:

Uma ingekho idatha, asikho isidingo sokuqhubeka. Kodwa futhi kuyamangaza ukucabangela ukugcwalisa ngempumelelo. Kodwa lokhu akulona iphutha. A-ah-ah, yini okufanele uyenze?! Futhi nakhu:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException itshela i-Airflow ukuthi awekho amaphutha, kodwa siyaweqa umsebenzi. I-interface ngeke ibe nesikwele esiluhlaza noma esibomvu, kodwa i-pink.
Masilahle idatha yethu amakholomu amaningi:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Okuthiwa:
- Isizindalwazi esithathe kuso ama-oda,
- I-ID yeseshini yethu yezikhukhula (izohluka kuwo wonke umsebenzi),
- I-hashi evela kumthombo kanye ne-ID ye-oda - ukuze kusizindalwazi sokugcina (lapho konke kuthelwa etafuleni elilodwa) sibe ne-ID ye-oda eyingqayizivele.
Isinyathelo sokuqala sisele: thela yonke into ku-Vertica. Futhi, okuxakile ukuthi, enye yezindlela ezinhle kakhulu nezisebenzayo zokwenza lokhu nge-CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Senza isamukeli esikhethekile
StringIO. pandasuzosifaka ngomusa wethuDataFramengesimoCSV- imigqa.- Masivule uxhumano lwe-Vertica yethu esiyintandokazi ngehuku.
- Futhi manje ngosizo
copy()thumela idatha yethu ngqo ku-Vertika!
Sizothatha kumshayeli ukuthi mingaki imigqa egcwaliswe, bese sitshela umphathi weseshini ukuthi konke kulungile:
session.loaded_rows = cursor.rowcount
session.successful = TrueYilokho kuphela.
Lapho sithengisa, sakha ipuleti eliqondiwe mathupha. Lapha ngizivumele umshini omncane:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Ngiyasebenzisa
VerticaOperator()Ngakha i-schema sedathabhesi kanye netafula (uma zingekho kakade, kunjalo). Into esemqoka ukuhlela kahle ukuncika:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadUkufingqa
- Awu, - kusho igundane elincane, - akunjalo, manje
Ingabe uyaqiniseka ukuthi ngiyisilwane esibi kakhulu ehlathini?
UJulia Donaldson, The Gruffalo
Ngicabanga ukuthi uma mina nozakwethu sinomncintiswano: ubani ozodala ngokushesha futhi aqalise inqubo ye-ETL kusukela ekuqaleni: bona nge-SSIS yabo negundane nami nge-Airflow ... Futhi-ke sizophinde siqhathanise ukukhululeka kokugcinwa ... Hawu, ngicabanga ukuthi uzovuma ukuthi ngizobashaya kuzo zonke izinhlangothi!
Uma ubucayi kakhulu, i-Apache Airflow - ngokuchaza izinqubo ngendlela yekhodi yohlelo - ngenze umsebenzi wami kakhulu ukhululekile futhi kujabulise.
Ukwandiswa kwayo okungenamkhawulo, kokubili ngokwemibandela yama-plug-in kanye nokubikezelwa kokukaleka, kukunikeza ithuba lokusebenzisa i-Airflow cishe kunoma iyiphi indawo: ngisho nasemjikelezweni ogcwele wokuqoqa, ukulungiselela nokucubungula idatha, ngisho nasekuqaliseni amarokhethi (ku-Mars, inkambo).
Ingxenye yokugcina, inkomba kanye nolwazi
Ireki sikuqoqele yona
start_date. Yebo, lokhu sekuvele kuyi-meme yendawo. Ngempikiswano enkulu kaDougstart_datezonke ziyadlula. Kafushane, uma ucacise kustart_dateusuku lwamanje, kanyeschedule_interval- ngolunye usuku, bese i-DAG izoqala kusasa akukho ngaphambili.start_date = datetime(2020, 7, 7, 0, 1, 2)Futhi azisekho izinkinga.
Kukhona elinye iphutha lesikhathi sokusebenza elihlotshaniswa nayo:
Task is missing the start_date parameter, okuvame ukukhombisa ukuthi ukhohlwe ukubophezela ku-opharetha we-dag.- Konke kumshini owodwa. Yebo, nezisekelo (I-Airflow ngokwayo kanye ne-coating yethu), kanye neseva yewebhu, kanye nomhleli, nabasebenzi. Futhi kwasebenza. Kodwa ngokuhamba kwesikhathi, inani lemisebenzi yezinsizakalo lakhula, futhi lapho i-PostgreSQL iqala ukuphendula inkomba ngamasekhondi angu-20 esikhundleni sika-5 ms, sayithatha sahamba nayo.
- I-LocalExecutor. Yebo, sisahlezi phezu kwayo, futhi sesivele sifikile ekugcineni kwalasha. I-LocalExecutor isanele kuze kube manje, kodwa manje sekuyisikhathi sokunweba okungenani nesisebenzi esisodwa, futhi kuzodingeka sisebenze kanzima ukuze sithuthele ku-CeleryExecutor. Futhi ngenxa yokuthi ungasebenza nayo emshinini owodwa, akukho lutho olukuvimbela ukusebenzisa i-Celery ngisho nakuseva, "okuyiqiniso, engasoze yangena ekukhiqizeni, ngokwethembeka!"
- Ukungasebenzisi amathuluzi akhelwe ngaphakathi:
- Connections ukugcina imininingwane yesevisi,
- I-SLA iyaphuthelwa ukuphendula emisebenzini engazange isebenze ngesikhathi,
- xcom ngokushintshisana kwemethadatha (ngithe imetaidatha!) phakathi kwemisebenzi ye-dag.
- Ukuhlukunyezwa kwemeyili. Hhayi-ke, ngingathini? Izaziso zenzelwe zonke izimpinda zemisebenzi ewile. Manje i-Gmail yomsebenzi wami ine>ama-imeyili angu-90k avela ku-Airflow, futhi umlomo wombhobho wewebhu uyenqaba ukucosha nokususa angaphezu kuka-100 ngesikhathi.
Ezinye izingibe:
Amathuluzi e-automation amaningi
Ukuze sisebenze nakakhulu ngamakhanda ethu hhayi ngezandla zethu, i-Airflow isilungiselele lokhu:
- - usenesimo se-Experimental, esingamvimbi ukuthi asebenze. Ngayo, awukwazi ukuthola kuphela ulwazi mayelana nama-dags nemisebenzi, kodwa futhi umise/uqale i-dag, udale i-DAG Run noma ichibi.
- - amathuluzi amaningi ayatholakala ngomugqa womyalo okungekona nje okungalungile ukuwasebenzisa nge-WebUI, kodwa ngokuvamile awekho. Ngokwesibonelo:
backfillokudingekayo ukuze kuqalwe kabusha izimo zomsebenzi.
Ngokwesibonelo, abahlaziyi beza bathi: “Futhi wena, qabane, unembudane emininingwaneni kusukela ngoJanuwari 1 kuya ku-13! Lungisa, lulungise, lulungise, lulungise!" Futhi uyi-hob enjalo:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Isevisi yesisekelo:
initdb,resetdb,upgradedb,checkdb. run, okuvumela ukuthi wenze umsebenzi wesibonelo esisodwa, futhi uphinde uthole amaphuzu kukho konke ukuncika. Ngaphezu kwalokho, ungakwazi ukuyisebenzisa ngokusebenzisaLocalExecutor, ngisho noma uneqoqo le-Celery.- Yenza into efanayo kakhulu
test, kuphela futhi ezisekelweni ayibhali lutho. connectionsivumela ukudalwa okukhulu kokuxhumana okuvela kugobolondo.
- - indlela eqinile yokuxhumana, ehloselwe ama-plugin, hhayi ukugcwala kuwo ngezandla ezincane. Kodwa ubani ozosivimba ukuthi siye kuye
/home/airflow/dags, gijimaipythonbese uqala ukumosha? Ungakwazi, isibonelo, ukuthekelisa konke ukuxhumana ngekhodi elandelayo:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Ixhuma ku-metadatabase ye-Airflow. Angincomi ukuyibhalela, kodwa ukuthola izifunda zomsebenzi zamamethrikhi athile athile kungashesha kakhulu futhi kube lula kunanoma imaphi ama-API.
Ake sithi akuyona yonke imisebenzi yethu engenamandla, kodwa ngezinye izikhathi ingawa, futhi lokhu kuyinto evamile. Kodwa ukuvinjwa okumbalwa sekuvele kusolisa, futhi kuzodingeka ukuthi kuhlolwe.
Qaphela i-SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
izithenjwa
Futhi-ke, izixhumanisi zokuqala eziyishumi eziphuma ekukhishweni kwe-Google zingukuqukethwe kwefolda ye-Airflow ephuma kumabhukhimakhi ami.
- - Yebo, kufanele siqale ngehhovisi. imibhalo, kodwa ubani ofunda imiyalelo?
- - Hhayi-ke, okungenani funda izincomo ezivela kubadali.
- - isiqalo: isikhombimsebenzisi somsebenzisi ezithombeni
- - imiqondo eyisisekelo ichazwe kahle, uma (ngokungazelelwe!) Awuzange uqonde okuthile kimi.
- - umhlahlandlela omfushane wokusetha iqoqo le-Airflow.
- - cishe i-athikili efanayo ethokozisayo, ngaphandle kokuthi mhlawumbe ukuhleleka okwengeziwe, nezibonelo ezimbalwa.
- - mayelana nokusebenza ngokubambisana neCelery.
- - mayelana nokungabi namandla kwemisebenzi, ukulayisha nge-ID esikhundleni sosuku, ukuguqulwa, ukwakheka kwefayela nezinye izinto ezithokozisayo.
- - ukuncika kwemisebenzi kanye ne-Trigger Rule, engikhulume ngayo lapho ngidlula.
- - indlela yokunqoba eminye "imisebenzi njengoba ihlosiwe" kusihleli, layisha idatha elahlekile futhi ubeke imisebenzi kuqala.
- - Imibuzo ewusizo ye-SQL kumethadatha ye-Airflow.
- - kunesigaba esiwusizo mayelana nokudala inzwa yangokwezifiso.
- — inothi elifushane elithakazelisayo elimayelana nokwakha ingqalasizinda ku-AWS Yesayensi Yedatha.
- - amaphutha avamile (uma othile engayifundi imiyalelo).
- - mamatheka indlela abantu abawagcina ngayo amagama ayimfihlo, nakuba ungasebenzisa ama-Connections.
- - Ukudlulisela phambili kwe-DAG okungacacile, ukuphonsa umongo emisebenzini, futhi mayelana nokuncika, kanye nokweqa ukuqaliswa komsebenzi.
- - mayelana nokusetshenziswa
default argumentsиparamsezifanekisweni, kanye Neziguquguqukayo nezixhumi. - - indaba yokuthi umhleli uyilungiselela kanjani i-Airflow 2.0.
- - indatshana ephelelwe yisikhathi kancane mayelana nokuphakelwa kweqoqo lethu
docker-compose. - - Imisebenzi eguqukayo kusetshenziswa izifanekiso nokudlulisa umongo.
- - izaziso ezijwayelekile nezingokwezifiso ngeposi kanye ne-Slack.
- - Imisebenzi yegatsha, ama-macros kanye ne-XCom.
Futhi izixhumanisi ezisetshenziswe esihlokweni:
- - izibambindawo ziyatholakala ukuze zisetshenziswe kuzifanekiso.
- - Amaphutha ajwayelekile lapho udala ama-dags.
- -
docker-composeukuhlola, ukulungisa amaphutha nokunye okwengeziwe. - - I-Python wrapper yeTelegram REST API.
Source: www.habr.com




