Molo, ndinguDmitry Logvinenko -uNjineli weDatha weSebe loHlaziyo lweqela leVezet leenkampani.
Ndiza kukuxelela ngesixhobo esimangalisayo sokuphuhlisa iinkqubo ze-ETL-Apache Airflow. Kodwa i-Airflow iyaguquguquka kwaye inamacala amaninzi kangangokuba kuya kufuneka uyijonge ngakumbi nokuba awubandakanyekanga kuqukuqelo lwedatha, kodwa unesidingo sokumisela rhoqo naziphi na iinkqubo kwaye ubeke iliso ekusebenzeni kwazo.
Kwaye ewe, andiyi kuxelela kuphela, kodwa ndibonise kwakhona: inkqubo inekhowudi eninzi, izikrini kunye neengcebiso.
Yintoni oqhele ukuyibona xa uGoogle igama elithi Airflow / Wikimedia Commons
Uluhlu lomxholo
Intshayelelo Elona candelo liphambili, elisebenzayo (kunye nethiyori encinci) Inxalenye yokugqibela, ireferensi kunye nolwazi iimbekiselo
Intshayelelo
IApache Airflow ifana neDjango:
- ibhalwe ngepython
- kukho ipaneli enkulu yolawulo,
- yandiswa ngokungenammiselo
- ngcono kuphela, kwaye yenziwe ngeenjongo ezahlukeneyo ngokupheleleyo, ezizezi (njengoko kubhaliwe phambi kwekata):
- ukuqhuba kunye nokubeka esweni imisebenzi kwinani elingenamkhawulo loomatshini (ubuninzi beCelery / Kubernetes kunye nesazela sakho siya kukuvumela)
- kunye nesizukulwana sokuhamba komsebenzi esivela kulula kakhulu ukubhala nokuqonda ikhowudi yePython
- kunye nokukwazi ukudibanisa nayiphi na i-database kunye nee-API kunye nomnye usebenzisa amacandelo asele esele enziwe kunye neeplagi ezenziwe ekhaya (ezilula kakhulu).
Sisebenzisa iApache Airflow ngolu hlobo:
- siqokelela idatha kwimithombo eyahlukeneyo (ininzi ye-SQL Server kunye ne-PostgreSQL iimeko, ii-APIs ezahlukeneyo kunye neemetrics zesicelo, nokuba yi-1C) kwi-DWH kunye ne-ODS (sineVertica kunye neClickhouse).
- njani phambili
cron
, eqala iinkqubo zokudibanisa idatha kwi-ODS, kwaye ibeke esweni ukugcinwa kwayo.
Kuze kube kutshanje, iimfuno zethu zigutyungelwe ngumncedisi omnye omncinci kunye ne-32 cores kunye ne-50 GB ye-RAM. Kwi-Airflow, oku kusebenza:
- Π±ΠΎΠ»Π΅Π΅ 200 iidags (eneneni ukuhamba komsebenzi, apho sasifaka khona imisebenzi),
- kwi-avareji nganye 70 imisebenzi,
- oku kulunga kuyaqala (kwakhona kumndilili) kanye ngeyure.
Kwaye malunga nendlela esandise ngayo, ndiza kubhala ngezantsi, kodwa ngoku makhe sichaze i-ΓΌber-ingxaki esiza kuyisombulula:
Kukho ezintathu umthombo SQL Servers, ngamnye kunye 50 yogcino-lwazi - imizekelo yeprojekthi enye, ngokulandelelana, banesakhiwo esifanayo (phantse yonke indawo, mua-ha-ha), nto leyo ethetha ukuba ngamnye itafile Orders (ngethamsanqa, itafile kunye naloo nto. Igama lingatyhalelwa kulo naliphi na ishishini). Sithatha idatha ngokongeza imimandla yenkonzo (umncedisi womthombo, isiseko sedatha, i-ID yomsebenzi we-ETL) kwaye ngokungenangqondo uziphose kuyo, yithi, i-Vertica.
Masihambe!
Elona candelo liphambili, elisebenzayo (kunye nethiyori encinci)
Kutheni thina (kunye nawe)
Xa imithi yayinkulu kwaye ndandilula SQL
-schik kwindawo enye yokuthengisa yaseRashiya, siye saqhatha iinkqubo ze-ETL aka ukuhamba kwedatha usebenzisa izixhobo ezibini ezikhoyo kuthi:
- Informatica Power Centre -inkqubo esasazeke kakhulu, enemveliso kakhulu, enehardware yayo, inguqulelo yayo. Ndasebenzisa uThixo angavumeli i-1% yezakhono zayo. Ngoba? Ewe, okokuqala, olu jongano, kwindawo ethile ukusuka kwi-380s, lubeka uxinzelelo lwengqondo kuthi. Okwesibini, le contraption yenzelwe iinkqubo ezintle kakhulu, ukuphinda kusetyenziswe icandelo elinomsindo kunye namanye amaqhinga abaluleke kakhulu eshishini. Malunga neendleko, njengephiko le-Airbus AXNUMX / ngonyaka, asiyi kuthetha nto.
Qaphela, umfanekiso weskrini unokulimaza abantu abangaphantsi kwama-30 kancinci
- Iseva yoHlanganiso lweSeva yeSQL - sisebenzise eli qabane ekuhambeni kwethu kweprojekthi yangaphakathi. Ewe, eneneni: sele sisebenzisa i-SQL Server, kwaye iya kuba yinto engekho ngqiqweni ngandlela thile ukungasebenzisi izixhobo zayo ze-ETL. Yonke into ekuyo ilungile: zombini i-interface ihle, kwaye ingxelo yenkqubela phambili ... Kodwa akusiyo isizathu sokuba sithanda iimveliso zesoftware, oh, kungekhona oku. Yiguqulele
dtsx
(eyi-XML enamaqhuqhuva adityanisiweyo ekugcinweni) singakwazi, kodwa yintoni inqaku? Kuthekani ngokwenza ipakethe yomsebenzi eya kutsala amakhulu eetafile ukusuka kwiseva enye ukuya kwenye? Ewe, leliphi ikhulu, umnwe wakho wesalathisi uya kuwa kwiziqwenga ezingamashumi amabini, ucofa iqhosha lemouse. Kodwa ngokuqinisekileyo ibonakala isefashonini ngakumbi:
Ngokuqinisekileyo sakhangela iindlela zokuphuma. Case even phantse yeza kwijenereyitha yephakheji yeSSIS ezibhale ngokwayo ...
β¦kwaye ke ndafumana umsebenzi omtsha. Kwaye iApache Airflow yandifumana kuyo.
Xa ndafumanisa ukuba iinkcazo zenkqubo ye-ETL ziyikhowudi yePython elula, andizange ndidanise ngovuyo. Yile ndlela imijelo yedatha eguqulelwe ngayo kwaye yahlulwa, kwaye ukugalela iitafile ezinesakhiwo esinye ukusuka kumakhulu ogcino-lwazi ukuya kwithagethi enye yaba ngumbandela wekhowudi yePython kwisikrini esinye nesiqingatha okanye ezibini ezili-13.
Ukudibanisa iqela
Masingacwangcisi i-kindergarten ngokupheleleyo, kwaye singathethi malunga nezinto ezicacileyo apha, njengokufakela i-Airflow, i-database yakho ekhethiweyo, i-Celery kunye nezinye iimeko ezichazwe kwi-docks.
Ukuze siqalise kwangoko imifuniselo, ndiye ndazoba docker-compose.yml
apho ku:
- Masiphakamise ngokwenene Ukuhamba komoya: Umcwangcisi, umncedisi wewebhu. Intyatyambo iya kujikeleza apho ukuze ibeke iliso kwimisebenzi yeCelery (kuba sele ityhiliziwe
apache/airflow:1.10.10-python3.7
, kodwa asinangxaki) - PostgreSQL, apho i-Airflow iya kubhala ulwazi lwayo lwenkonzo (idatha yomcwangcisi, izibalo zokubulawa, njl.), kunye neCelery iya kumakisha imisebenzi egqityiweyo;
- Redis, eya kusebenza njenge-task broker yeCelery;
- Umsebenzi weCelery, eya kubandakanyeka ekusebenzeni ngokuthe ngqo kwemisebenzi.
- Ukufowunela
./dags
siya kongeza iifayile zethu kunye nenkcazo yeedags. Ziya kucholwa kwimpukane, ngoko akukho mfuneko yokujija yonke into emva kokuthimla ngakunye.
Kwezinye iindawo, ikhowudi kwimizekelo ayiboniswanga ngokupheleleyo (ukuze ingafaki isicatshulwa), kodwa kwenye indawo iguqulwa kwinkqubo. Imizekelo epheleleyo yekhowudi yokusebenza inokufumaneka kwindawo yokugcina
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
Amanqaku:
- Kwindibano yokuqulunqa, ndandithembele kakhulu kumfanekiso owaziwayo
puckel/docker-airflow - qiniseka ukuyijonga. Mhlawumbi awudingi nto yimbi ebomini bakho. - Zonke iisetingi zeAirflow azifumaneki kuphela nge
airflow.cfg
, kodwa nangokuguquguquka kwemekobume (enkosi kubaphuhlisi), endithe ndathatha ithuba ngayo. - Ngokwendalo, ayisilungele imveliso: khange ndibeke ukubetha kwentliziyo ngabom kwizikhongozeli, andikhange ndikhathazeke ngokhuseleko. Kodwa ndenze ubuncinci obufanelekileyo kubavavanyi bethu.
- Qaphela oko:
- Ifolda yedag mayifikeleleke kubo bobabini umcwangcisi kunye nabasebenzi.
- Kukwasebenza okufanayo kuwo onke amathala eencwadi esithathu - kufuneka afakwe onke koomatshini abanomcwangcisi kunye nabasebenzi.
Ewe, ngoku ilula:
$ docker-compose up --scale worker=3
Emva kokuba yonke into iphakame, unokujonga ujongano lwewebhu:
- Ukuhamba komoya:
http://127.0.0.1:8080/admin/ - Intyatyambo:
http://127.0.0.1:5555/dashboard
Iingcamango ezisisiseko
Ukuba ubungaqondi kwanto kuzo zonke ezi "dags", ke nasi isichazi-magama esifutshane:
- Ishedyuli - oyena malume ubalulekileyo kwi-Airflow, ukulawula ukuba iirobhothi zisebenza nzima, kwaye kungekhona umntu: ubeka iliso kwishedyuli, uhlaziywa i-dags, uqalise imisebenzi.
Ngokubanzi, kwiinguqulelo ezindala, wayenengxaki yenkumbulo (hayi, hayi i-amnesia, kodwa iyavuza) kwaye iparamitha yelifa ide yahlala kwi-configs.
run_duration
- isithuba sokuqalisa kwakhona. Kodwa ngoku yonke into ilungile. - Dag (aka "dag") - "igrafu ye-acyclic eqondisiweyo", kodwa inkcazo enjalo iya kuxelela abantu abambalwa, kodwa eneneni sisitya semisebenzi esebenzisanayo (jonga ngezantsi) okanye i-analogue yePakethe kwi-SSIS kunye nokuhamba komsebenzi kwi-Informatica .
Ukongeza kwii-dags, kusenokubakho ii-subdags, kodwa ngokuqinisekileyo asiyi kufika kubo.
- DAG Qhuba - i-dag eqalisiwe, eyabelwe yona
execution_date
. Iidagrans zedag efanayo zinokusebenza ngokufanayo (ukuba uyenzile imisebenzi yakho ingenamsebenzi, kunjalo). - Umqhubi ziingceba zekhowudi ezinoxanduva lokwenza isenzo esithile. Kukho iintlobo ezintathu zabaqhubi:
- inyathelonjengokuthanda kwethu
PythonOperator
, enokuphumeza nayiphi na (esebenzayo) ikhowudi yePython; - tshintshelo, ehambisa idatha ukusuka kwindawo ukuya kwindawo, ithi,
MsSqlToHiveTransfer
; - woluvo kwelinye icala, iyakuvumela ukuba usabele okanye ucothise uphumezo oluqhubekayo lwedag de isiganeko senzeke.
HttpSensor
inokutsala isiphelo esichaziweyo, kwaye xa impendulo efunwayo ilindile, qalisa ukudluliselwaGoogleCloudStorageToS3Operator
. Ingqondo ethandabuzayo iya kubuza: βNgoba? Emva kwayo yonke loo nto, ungenza uphinda-phindo kanye kumsebenzisi!" Kwaye ke, ukuze ungavala ichibi lemisebenzi kunye nabaqhubi abamisiweyo. Uluvo luqala, lujonga kwaye lufe phambi komzamo olandelayo.
- inyathelonjengokuthanda kwethu
- Task - abaqhubi ababhengeziweyo, kungakhathaliseki ukuba luhlobo luni na, kwaye luqhotyoshelwe kwi-dag banyuselwa kwinqanaba lomsebenzi.
- umzekelo womsebenzi - xa umcwangcisi jikelele egqibe ekubeni lixesha lokuthumela imisebenzi edabini kubasebenzi abaqhubayo (kanye kanye kuloo ndawo, ukuba sisebenzisa
LocalExecutor
okanye kwindawo ekude kwimeko yeCeleryExecutor
), inika umxholo kubo (oko kukuthi, uluhlu lwezinto eziguquguqukayo - iiparamitha zokuphumeza), yandisa umyalelo okanye itemplates zokubuza, kwaye izidibanise.
Senza imisebenzi
Okokuqala, makhe sichaze iskimu esiqhelekileyo se-doug yethu, kwaye emva koko siya kuntywila kwiinkcukacha ngakumbi nangakumbi, kuba sisebenzisa izisombululo ezithile ezingezizo.
Ke, ngeyona ndlela ilula, i-dag enjalo iya kujongeka ngolu hlobo:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Masiyiqonde:
- Okokuqala, singenisa ii-libs eziyimfuneko kunye enye into;
sql_server_ds
- yi leList[namedtuple[str, str]]
ngamagama oqhagamshelo oluvela kwi-Airflow Connections kunye nogcino-lwazi apho siya kuthatha ipleyiti yethu;dag
-isibhengezo sedag yethu, ekufuneka ingenileglobals()
, kungenjalo Airflow akayi kuyifumana. UDoug naye kufuneka athi:- Ngubani igama lakhe
orders
-eli gama liya kuvela kujongano lwewebhu, - ukuba uya kusebenza ukususela ezinzulwini zobusuku ngomhla wesibhozo kaJulayi,
- kwaye kufuneka ibaleke, malunga neeyure ezi-6 (kubantu abanzima apha endaweni
timedelta()
kuvumelekilecron
-umgca0 0 0/6 ? * * *
, ngenxa yokuphola kancinci - imbonakalo efana@daily
);
- Ngubani igama lakhe
workflow()
izakwenza owona msebenzi uphambili, kodwa hayi ngoku. Okwangoku, siza kulahla umxholo wethu kwilog.- Kwaye ngoku umlingo olula wokudala imisebenzi:
- sibaleka ngemithombo yethu;
- qalisa
PythonOperator
, eya kwenza i-dummy yethuworkflow()
. Ungalibali ukucacisa igama elikhethekileyo (ngaphakathi kwedag) lomsebenzi kwaye ubophe i-dag ngokwayo. Iflegiprovide_context
ngokulandelayo, iya kugalela iingxoxo ezongezelelweyo kumsebenzi, esiya kuqokelela ngononophelo siwusebenzisa**context
.
Okwangoku, kuphelele apho. Into esinayo:
- i-dag entsha kujongano lwewebhu,
- ikhulu elinesiqingatha imisebenzi eya kwenziwa ngokunxuseneyo (ukuba Airflow, useto lweCelery kunye nomthamo umncedisi uyakuvumela).
Kulungile, phantse ndiyifumene.
Ngubani oza kufaka izinto ezixhomekeke kuwe?
Ukwenza lula yonke le nto, ndangena ngaphakathi docker-compose.yml
ukuqhubekeka requirements.txt
kuzo zonke iindawo.
Ngoku ayisekho:
Izikwere ezingwevu ziimeko zemisebenzi eyenziwa ngumcwangcisi.
Silinda kancinci, imisebenzi ithathwa ngabasebenzi:
Abaluhlaza, ngokuqinisekileyo, bawugqibile ngempumelelo umsebenzi wabo. Iibomvu aziphumelelanga kakhulu.
Ngendlela, akukho folda kwiprod yethu
./dags
, akukho lungqamaniso phakathi koomatshini - zonke iidags zilelegit
kwiGitlab yethu, kunye neGitlab CI isasaza uhlaziyo koomatshini xa udityaniswamaster
.
Kancinci malunga neNtyantyambo
Ngelixa abasebenzi bebhuqa iipacifiers zethu, masikhumbule esinye isixhobo esinokusibonisa into - Intyatyambo.
Iphepha lokuqala elinolwazi olusisishwankathelo kwiindawo zabasebenzi:
Elona phepha libukhali elinemisebenzi eye yasebenza:
Elona phepha likruqulayo elinesimo somthengisi wethu:
Elona phepha liqaqambileyo linegrafu yobume bomsebenzi kunye nexesha labo lokwenziwa:
Silayisha phantsi
Ke, yonke imisebenzi isebenzile, unokuthwala iingxwelerha.
Kwaye baninzi abangxwelerhekileyo - ngenxa yesizathu okanye esinye. Kwimeko yokusetyenziswa ngokuchanekileyo kwe-Airflow, ezi zikwere zibonisa ukuba idatha ngokuqinisekileyo ayizange ifike.
Kufuneka ujonge ilog kwaye uqalise kwakhona iimeko zomsebenzi eziwileyo.
Ngokucofa nakwesiphi na isikwere, siza kubona izenzo ezifumanekayo kuthi:
Ungathatha kwaye wenze Cacisa abawileyo. Oko kukuthi, siyalibala ukuba kukho into engaphumelelanga apho, kwaye umsebenzi ofanayo womzekelo uya kuya kumcwangcisi.
Kucacile ukuba ukwenza oku nge mouse kunye nazo zonke izikwere ezibomvu azikho ubuntu kakhulu - oku ayisiyiyo into esiyilindeleyo kwi-Airflow. Ngokwemvelo, sinezixhobo zentshabalalo enkulu: Browse/Task Instances
Masikhethe yonke into ngaxeshanye kwaye sibuyisele ku-zero, cofa into eyiyo:
Emva kokucoca, iiteksi zethu zijongeka ngolu hlobo (sele zilinde umcwangcisi ukuba azicwangcise):
Ukudibanisa, iigwegwe kunye nezinye izinto eziguquguqukayo
Lixesha lokujonga iDAG elandelayo, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ
ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Ngaba wonke umntu wakha wenza uhlaziyo lwengxelo? Nguye lo kwakhona: kukho uluhlu lwemithombo evela apho unokufumana khona idatha; kukho uluhlu apho lubekwe khona; ungalibali ukucula xa yonke into yenzeka okanye yaphuka (kakuhle, oku akukho ngathi, hayi).
Masingene kwifayile kwakhona kwaye sijonge izinto ezintsha ezingacacanga:
from commons.operators import TelegramBotSendMessage
-akukho nto isithintelayo ekwenzeni abethu abaqhubi, esithe sathatha ithuba ngokwenza i-wrapper encinci yokuthumela imiyalezo kwi-Unblocked. (Siza kuthetha ngakumbi ngalo mqhubi apha ngezantsi);default_args={}
- i-dag inokuhambisa iingxoxo ezifanayo kubo bonke abaqhubi bayo;to='{{ var.value.all_the_kings_men }}'
- intsimito
asiyi kuba ne-hardcoded, kodwa yenziwe ngokuguquguqukayo kusetyenziswa iJinja kunye noguquko ngoluhlu lwee-imeyile, endizibeke ngononophelo.Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
β imeko yokuqalisa umqhubi. Kwimeko yethu, ileta iya kubhabha kubaphathi kuphela ukuba bonke abaxhomekeke kuye basebenza ngempumelelo;tg_bot_conn_id='tg_main'
- iingxoxoconn_id
yamkela ii-ID zoqhagamshelo esizidalayoAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
-imiyalezo kwiTelegram iya kubhabha kuphela ukuba kukho imisebenzi ewileyo;task_concurrency=1
- siyakwalela ukusungulwa ngaxeshanye kwemisebenzi emininzi yomsebenzi omnye. Ngaphandle koko, siya kufumana ukuqaliswa ngaxeshanye ezininziVerticaOperator
(ejonga itafile enye);report_update >> [email, tg]
- zonkeVerticaOperator
dibana ekuthumeleni iileta kunye nemiyalezo, ngolu hlobo:
Kodwa kuba abasebenzi bezazisi baneemeko ezahlukeneyo zokuqaliswa, inye kuphela eya kusebenza. KwiJongo loMthi, yonke into ibonakala incinci kancinci:
Ndiza kuthetha amagama ambalwa malunga iimakhro kunye nabahlobo babo - ezahlukeneyo.
Iimakhro zizibambi-ndawo zeJinja ezinokubeka endaweni yolwazi oluninzi oluluncedo kwiingxoxo zabaqhubi. Umzekelo, njengale:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
iya kwandisa imixholo yokuguquguquka kwemeko execution_date
kwifomathi YYYY-MM-DD
: 2020-07-14
. Elona candelo lingcono kukuba iimeko eziguquguqukayo zibethelelwa kumzekelo othile womsebenzi (isikwere kwiJongo loMthi), kwaye xa iqalwa ngokutsha, izibambi-ndawo ziya kwanda ziye kumaxabiso afanayo.
Amaxabiso anikiweyo anokujongwa kusetyenziswa iqhosha eliNikelwe kumzekelo ngamnye womsebenzi. Nantsi indlela umsebenzi wokuthumela ileta:
Ke ngoko emsebenzini ngokuthumela umyalezo:
Uluhlu olupheleleyo lweemakhro ezakhelwe ngaphakathi zoguqulelo olukhoyo lwamva nje luyafumaneka apha:
Ngapha koko, ngoncedo lweeplagi, sinokubhengeza ezethu iimacros, kodwa lelinye ibali.
Ukongeza kwizinto ezichazwe kwangaphambili, sinokutshintsha amaxabiso eenguqu zethu (sele ndiyisebenzise le khowudi ingentla). Masidale ngaphakathi Admin/Variables
izinto ezimbini:
Yonke into ongayisebenzisa:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
Ixabiso linokuba yi-scalar, okanye inokuba yi-JSON. Kwimeko ye-JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
sebenzisa nje umendo oya kwisitshixo esifunekayo: {{ var.json.bot_config.bot.token }}
.
Ndiza kuthetha igama elinye kwaye ndibonise umfanekiso wekhusi malunga unxibelelwano. Yonke into isisiseko apha: kwiphepha Admin/Connections
sidala uqhagamshelo, songeza ii-logins / amagama ayimfihlo kunye neeparameters ezithile ngakumbi apho. Ndiyayithanda lento:
Amagama okugqithisa angenziwa ngoguqulelo oluntsonkothileyo (ngokucokisekileyo kunokungagqibekanga), okanye ungalushiya ngaphandle uhlobo lomdibaniso (njengoko ndenze tg_main
) - inyani yeyokuba uluhlu lweentlobo luqinisiwe kwiimodeli ze-Airflow kwaye alunakwandiswa ngaphandle kokungena kwiikhowudi zomthombo (ukuba ngequbuliso andizange ndijonge into ethile, nceda undilungise), kodwa akukho nto iya kusithintela ekufumaneni iikhredithi igama.
Unokwenza udibaniso oluninzi ngegama elifanayo: kulo mzekelo, indlela BaseHook.get_connection()
, esifumana unxibelelwano ngegama, uya kunika random ukusuka kwiinamesakes ezininzi (kuya kuba sengqiqweni ngakumbi ukwenza iRound Robin, kodwa masiyishiye kwisazela sabaphuhlisi beAirflow).
Izinto eziguquguqukayo kunye noQhagamshelwano ngokuqinisekileyo zizixhobo ezipholileyo, kodwa kubalulekile ukuba ungaphulukani nebhalansi: zeziphi iinxalenye zokuhamba kwakho ozigcina kwikhowudi ngokwayo, kwaye zeziphi iindawo ozinika i-Airflow yokugcina. Ngakolunye uhlangothi, ukutshintsha ngokukhawuleza ixabiso, umzekelo, ibhokisi lokuposa, linokuba lula nge-UI. Kwelinye icala, oku kuseyimbuyekezo kunqakrazo lwemouse, apho thina (mna) sasifuna ukuyisusa.
Ukusebenza ngoqhagamshelwano ngomnye wemisebenzi amagwegwe. Ngokubanzi, i-Airflow hook ngamanqaku okuyidibanisa kwiinkonzo zomntu wesithathu kunye namathala eencwadi. Umzekelo, JiraHook
iya kusivulela umxhasi ukuba sisebenzisane noJira (ungahambisa imisebenzi emva naphambili), kwaye ngoncedo lwe SambaHook
ungatyhala ifayile yendawo kuyo smb
-inqaku.
Ukwahlulahlula umsebenzisi oqhelekileyo
Kwaye sisondele ekujongeni indlela eyenziwe ngayo TelegramBotSendMessage
Ikhowudi commons/operators.py
ngoyena msebenzisi:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Apha, njengayo yonke enye into kwi-Airflow, yonke into ilula kakhulu:
- Ilifa elivela
BaseOperator
, ephumeza izinto ezimbalwa ezithe ngqo kwi-Airflow (jonga ukuphumla kwakho) - Imimandla echaziweyo
template_fields
, apho iJinja iya kukhangela i-macros ukuba iqhubeke. - Lungiselela iingxoxo ezifanelekileyo
__init__()
, seta okungagqibekanga apho kuyimfuneko. - Asizange silibale nangokuqaliswa kwesinyanya.
- Wavula ikhonkco elihambelanayo
TelegramBotHook
ifumene into yomxhasi kuyo. - Indlela ebhalwe ngaphezulu (echazwe ngokutsha).
BaseOperator.execute()
, leyo i-Airfow iya kubetha xa kufika ixesha lokuqalisa umqhubi - kuyo siya kuphumeza isenzo esiphambili, silibale ukungena. (Singena, ngendlela, kanyestdout
ΠΈstderr
-Ukuhamba komoya kuya kunqanda yonke into, isonge kakuhle, ibole apho kuyimfuneko.)
Makhe sibone into esinayo commons/hooks.py
. Inxalenye yokuqala yefayile, kunye nekhonkco ngokwayo:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Andazi nokuba mandichaze ntoni apha, ndiza kuqaphela amanqaku abalulekileyo:
- Sizuza ilifa, cinga malunga neengxabano - kwiimeko ezininzi iya kuba enye:
conn_id
; - Ukongamela iindlela ezisemgangathweni: Ndizithintele
get_conn()
, apho ndifumana iparameters zoqhagamshelwano ngegama kwaye ndifumane nje icandeloextra
(le yintsimi ye-JSON), apho mna (ngokwemiyalelo yam!) ndibeka uphawu lwebhoti yeTelegram:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - Ndenza umzekelo wethu
TelegramBot
, ukunika uphawu oluthile.
Kuko konke. Unokufumana umxhasi kwikhonkco usebenzisa TelegramBotHook().clent
okanye TelegramBotHook().get_conn()
.
Kwaye inxalenye yesibini yefayile, apho ndenza i-microwrapper yeTelegram REST API, ukuze ungatsali okufanayo. python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
Indlela echanekileyo kukudibanisa yonke into:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- kwi-plugin, faka kwindawo yokugcina yoluntu, kwaye uyinike kwi-Open Source.
Ngelixa sifunda konke oku, uhlaziyo lwethu lwengxelo lukwazile ukungaphumeleli kwaye lundithumele umyalezo wempazamo kwisitishi. Ndizakujonga ukuba ayilunganga na...
Kukho into eqhekekileyo kwidoji yethu! Ngaba asiyiyo le nto besiyilindele? Ngqo!
Uya kugalela?
Ngaba uvakalelwa kukuba kukho into endiyiphosileyo? Kubonakala ngathi uthembise ukudlulisa idatha kwi-SQL Server ukuya kwi-Vertica, kwaye emva koko wayithabatha kwaye wayisusa kwisihloko, i-scoundrel!
Esi sigebenga sasisenziwa ngabom, kwafuneka ndikucacisele isigama esithile. Ngoku ungaya phambili.
Isicwangciso sethu ibisithi:
- Yenza dag
- Veza imisebenzi
- Jonga indlela entle ngayo yonke into
- Yabela amanani eseshoni ukuba azalise
- Fumana idatha kwi-SQL Server
- Faka idatha kwiVertica
- Qokelela amanani
Ke, ukuze yonke le nto isebenze, ndenze ukongeza okuncinci kwethu docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
Apho siphakamisa:
- Vertica njengomamkeli
dwh
ngezona zicwangciso ezihlala zikhona, - Imizekelo emithathu ye-SQL Server,
- sigcwalisa i-database ekugqibeleni ngedatha ethile (akukho meko ungajongi kuyo
mssql_init.py
!)
Siphehlelela konke okuhle ngoncedo lomyalelo onzima ngakumbi kunexesha elidlulileyo:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Yintoni eveliswe ngummangaliso wethu we-randomizer, ungasebenzisa loo nto Data Profiling/Ad Hoc Query
:
Into ephambili ayiyikubonisa kubahlalutyi
cacisa ngakumbi Iiseshoni ze-ETL Andiyi, yonke into incinci apho: senza isiseko, kukho uphawu kuyo, sisonga yonke into ngomphathi womxholo, kwaye ngoku senza oku:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
iseshoni.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Ixesha lifikile qokelela idatha yethu kwiitafile zethu ezilikhulu elinesiqingatha. Masenze oku ngoncedo lwemigca engathobekiyo kakhulu:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Ngoncedo lwehuku sifumana kwi-Airflow
pymssql
-dibanisa - Masitshintshe isithintelo ngohlobo lomhla kwisicelo - siya kuphoswa kumsebenzi yi-injini yetemplate.
- Ukondla isicelo sethu
pandas
ngubani oza kusifumanaDataFrame
- iya kuba luncedo kuthi kwixesha elizayo.
Ndisebenzisa indawo
{dt}
endaweni yesicelo iparameter%s
kungekhona ngenxa yokuba ndinguPinocchio ongendawo, kodwa ngenxa yokubapandas
ayikwazi ukusingathapymssql
kwaye utyibilika eyokugqibelaparams: List
nangona efuna ngokwenenetuple
.
Kwakhona qaphela ukuba umphuhlisipymssql
wagqiba kwelokuba angaphindi ndimxhase, kwaye lixesha lokuphumapyodbc
.
Makhe sibone ukuba i-Airflow ifake ntoni kwiingxoxo zemisebenzi yethu:
Ukuba akukho datha, ke akukho nqaku lokuqhubeka. Kodwa kwakhona kuyamangalisa ukuqwalasela ukuzaliswa ngempumelelo. Kodwa oku akuyompazamo. A-ah-ah, ukwenza ntoni?! Kwaye nantsi into:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
uya kuxelela Airflow ukuba akukho mpazamo, kodwa siyawutsiba umsebenzi. I-interface ayiyi kuba nesikwere esiluhlaza okanye esibomvu, kodwa i-pink.
Masilahle idatha yethu iikholamu ezininzi:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
Ngaloo ndlela:
- Uvimba weenkcukacha esithathe kuwo iiodolo,
- I-ID yeseshoni yethu yezikhukula (iya kwahluka kuwo wonke umsebenzi),
- I-hash evela kumthombo kunye ne-ID yomyalelo - ukwenzela ukuba kwi-database yokugqibela (apho yonke into igalelwe kwitafile enye) sine-ID yomyalelo owodwa.
Inyathelo elingaphambili lihleli: galela yonke into kwiVertica. Kwaye, ngokungaqhelekanga, enye yezona ndlela zibalaseleyo nezisebenzayo zokwenza oku kukusebenzisa i-CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- Senza umamkeli okhethekileyo
StringIO
. pandas
izobeka wethu ngobubeleDataFrame
kwifomCSV
-imigca.- Masivule umdibaniso kwiVertica yethu esiyithandayo ngehuku.
- Kwaye ngoku ngoncedo
copy()
thumela idatha yethu ngqo kwiVertika!
Siza kuthatha kumqhubi ukuba mingaphi na imigca eyazaliswayo, kwaye uxelele umphathi weseshoni ukuba yonke into ilungile:
session.loaded_rows = cursor.rowcount
session.successful = True
Kuko konke.
Kwintengiso, senza ipleyiti ekujoliswe kuyo ngesandla. Apha ndizivumele umatshini omncinci:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Ndiya sebenzisa
VerticaOperator()
Ndidala i-schema yedatha kunye netafile (ukuba azikho, kunjalo). Into ephambili kukulungiselela ngokuchanekileyo ukuxhomekeka:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
Ukuqulunqa
- Ewe, - yathi impuku encinci, - akunjalo, ngoku
Ngaba uqinisekile ukuba ndisesona silwanyana soyikekayo ehlathini?
UJulia Donaldson, iGruffalo
Ndicinga ukuba mna noogxa bam sinokhuphiswano: ngubani oza kudala kwaye aqalise inkqubo ye-ETL ukusuka ekuqaleni: bona kunye ne-SSIS yabo kunye nempuku kunye nam nge-Airflow ... Kwaye ke siya kuthelekisa ukukhululeka kokugcinwa ... Wow, ndicinga ukuba uya kuvuma ukuba ndiza kubabetha kuzo zonke iinkalo!
Ukuba kancinci ngakumbi, ke iApache Airflow - ngokuchaza iinkqubo ngendlela yekhowudi yenkqubo - ndenze umsebenzi wam kakhulu ukhululekile kwaye uyonwabisa.
Ukwandiswa kwayo okungenamkhawulo, kokubini ngokwemigaqo ye-plug-ins kunye ne-predisposition to scalability, ikunika ithuba lokusebenzisa i-Airflow phantse kuyo nayiphi na indawo: nakumjikelezo opheleleyo wokuqokelela, ukulungiselela kunye nokucubungula idatha, nasekuqaliseni iirokethi (ukuya ku-Mars, ikhosi).
Inxalenye yokugqibela, ireferensi kunye nolwazi
Iraki esikuqokelelele yona
start_date
. Ewe, le sele iyimeme yendawo. Ngengxoxo ephambili kaDougstart_date
zonke zidlule. Ngokufutshane, ukuba uyayichazastart_date
umhla wangoku, kunyeschedule_interval
- ngenye imini, ke DAG uya kuqala ngomso akukho ngaphambili.start_date = datetime(2020, 7, 7, 0, 1, 2)
Kwaye akusekho ngxaki.
Kukho enye impazamo yexesha lokusebenza ehambelana nayo:
Task is missing the start_date parameter
, ehlala ibonisa ukuba ulibele ukubophelela kumsebenzisi wedag.- Konke kumatshini omnye. Ewe, kunye neziseko (I-Airflow ngokwayo kunye neengubo zethu), kunye nomncedisi wewebhu, kunye nomcwangcisi, kunye nabasebenzi. Kwaye yade yasebenza. Kodwa ekuhambeni kwexesha, inani lemisebenzi yeenkonzo likhule, kwaye xa i-PostgreSQL iqala ukuphendula kwisalathisi kwi-20 s endaweni ye-5 ms, sayithatha sayithwala.
- UMlawuli wendawo. Ewe, sisahleli phezu kwayo, kwaye sele sifikile ekupheleni kwenzonzobila. I-LocalExecutor isanele kuthi ukuza kuthi ga ngoku, kodwa ngoku lixesha lokwandisa nomsebenzi omnye, kwaye kuya kufuneka sisebenze nzima ukuya kwiCeleryExecutor. Kwaye ngenxa yokuba unokusebenza nayo kumatshini omnye, akukho nto ikuthintela ekusebenziseni iCelery nakwiseva, ethi "ewe, ayisoze yangena kwimveliso, ngokunyaniseka!"
- Ukungasetyenziswa izixhobo ezakhelwe ngaphakathi:
- Uxhumo ukugcina iziqinisekiso zenkonzo,
- SLA Uyakhumbula ukuphendula kwimisebenzi engakhange isebenze ngexesha,
- xcom kutshintshiselwano lwemetadata (ndatsho Imetaidatha!) phakathi kwemisebenzi yedag.
- Ukuxhatshazwa kwemeyile. Kulungile, ndithini? Izilumkiso zazisekiwe kulo lonke uphindo lwemisebenzi ewileyo. Ngoku umsebenzi wam we-Gmail une>90k yee-imeyile ezisuka kwi-Airflow, kwaye umbhobho weimeyile yewebhu uyala ukuchola nokucima ngaphezulu kwe-100 ngexesha.
Imigibe engakumbi:
I-Apache Airflow Pitfails
Izixhobo ezizenzekelayo ezingakumbi
Ukuze sisebenze ngakumbi ngeentloko zethu hayi ngezandla zethu, iAirflow isilungiselele oku:
I-API yokuphinda - usenayo imo ye-Experimental, engamthinteli ekusebenzeni. Ngayo, awukwazi ukufumana kuphela ulwazi malunga needags kunye nemisebenzi, kodwa uphinde uyeke / uqale i-dag, yenza i-DAG Run okanye i-pool.CLI - izixhobo ezininzi ziyafumaneka ngelayini yomyalelo engeyiyo nje into engalunganga ukuyisebenzisa ngeWebUI, kodwa azikho ngokubanzi. Umzekelo:backfill
efunekayo ukuqalisa kwakhona iimeko zomsebenzi.
Ngokomzekelo, abahlalutyi beza bathi: βKwaye wena, qabane, awunamdenge kwidatha ukususela ngomhla woku-1 ukuya kowe-13 kuJanuwari! Yilungise, yilungise, yilungise, yilungise!" Kwaye uyihob enjalo:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- Inkonzo esisiseko:
initdb
,resetdb
,upgradedb
,checkdb
. run
, ekuvumela ukuba wenze umsebenzi omnye womzekelo, kunye namanqaku kuko konke ukuxhomekeka. Ngaphezu koko, ungayiqhuba ngeLocalExecutor
, nokuba uneqela leCelery.- Yenza into efanayo
test
, kuphela nakwiziseko akabhali nto. connections
ivumela ukudalwa kobunzima boqhagamshelwano olusuka kwiqokobhe.
IPython API -indlela enzima kakhulu yokunxibelelana, eyenzelwe iiplagi, kwaye inganyakazi kuyo ngezandla ezincinci. Kodwa ngubani oza kusinqanda ukuba singayi kuye/home/airflow/dags
, balekaipython
kwaye uqale ukumosha? Unako, umzekelo, ukuthumela ngaphandle lonke uqhagamshelo ngale khowudi ilandelayo:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- Iqhagamshela kwi-Airflow metadatabase. Andiyicebisi ukuba ndiyibhalele, kodwa ukufumana iindawo zomsebenzi kwiimethrikhi ezahlukeneyo kunokukhawuleza kwaye kube lula kunayo nayiphi na i-APIs.
Masithi asiyiyo yonke imisebenzi yethu engenamandla, kodwa ngamanye amaxesha inokuwa, kwaye oku kuqhelekile. Kodwa iibhlokhi ezimbalwa sele zikrokreleka, kwaye kuya kuba yimfuneko ukujonga.
Lumka iSQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
iimbekiselo
Kwaye kunjalo, amakhonkco alishumi okuqala ukusuka ekukhutshweni kweGoogle yimixholo yeAirflow ifolda ukusuka kwiibhukhimakhi zam.
Uxwebhu lweApache Airflow - kunjalo, kufuneka siqale ngeofisi. amaxwebhu, kodwa ngubani ofunda imiyalelo?Iindlela eziPhambili - Ewe, ubuncinci funda iingcebiso ezivela kubadali.I-Airflow UI - kwasekuqaleni: ujongano lomsebenzisi kwimifanekisoUkuqonda iikhonsepthi eziphambili ze-Apache Airflow - iingqiqo ezisisiseko zichazwe kakuhle, ukuba (ngokukhawuleza!) Awuzange uqonde into evela kum.Ibhlog yeTianlong-IsiKhokelo seNdlela yokuYakha iSeva/Iqela lokuhamba komoya - isikhokelo esifutshane sokuseta i-Airflow cluster.Ukuqhuba iApache Airflow eLyft - phantse inqaku elifanayo elinomdla, ngaphandle kokuba mhlawumbi ngakumbi, kunye nemizekelo embalwa.Indlela iApache Airflow ehambisa ngayo imisebenzi kubasebenzi beCelery - malunga nokusebenza ngokubambisana neCelery.IiNdlela eziGqwesileyo zokuBhala zeDAG kwiApache Airflow - malunga nokungabi namandla kwemisebenzi, ukulayishwa nge-ID endaweni yomhla, ukuguqulwa, isakhiwo sefayile kunye nezinye izinto ezinomdla.Ukulawula ukuxhomekeka kwiApache Airflow - ukuxhomekeka kwemisebenzi kunye ne-Trigger Rule, endiyikhankanyileyo kuphela xa ndidlula.Ukuhamba komoya: Xa iDAG yakho ikude ngasemva kweShedyuli - indlela yokoyisa ezinye "imisebenzi njengoko kucetywayo" kumcwangcisi, ukulayisha idatha elahlekileyo kunye nokubeka phambili imisebenzi.Imibuzo eluncedo yeSQL yeApache Airflow -Imibuzo eluncedo yeSQL kwimetadata yeAirflow.Qalisa ukuphuhlisa ukuhamba komsebenzi ngeApache Airflow - kukho icandelo eliluncedo malunga nokudala inzwa yesiko.Ukwakha i-Fetchr Data Science Infra kwi-AWS nge-Presto kunye ne-Airflow - inqaku elifutshane elinomdla malunga nokwakha isiseko kwi-AWS yeSayensi yeDatha.Iimpazamo ezi-7 eziQhelekileyo zokuJonga xa Ulungisa i-Airflow DGs - iimpazamo eziqhelekileyo (xa umntu engekafundi imiyalelo).Gcina kwaye ufikelele igama lokugqitha usebenzisa iApache Airflow -ncuma indlela abantu ababambelela ngayo ukugcina amagama ayimfihlo, nangona ungasebenzisa nje uQhagamshelwano.IZen yePython kunye neApache Airflow - Ugqithiso olufihlakeleyo lwe-DAG, ukuphosa umxholo kwimisebenzi, kwakhona malunga nokuxhomekeka, kunye nokutsiba ukuqaliswa komsebenzi.Ukuqukuqela komoya: Iingcebiso ezingaziwayo, amaqhinga, kunye neZenzo eziNgcono - malunga nokusetyenziswadefault arguments
ΠΈparams
kwiitemplates, kunye noGuquguquko kunye noQhagamshelwano.Ukwenza iProfayili yoMcwangcisi wokuHamba koMoya - ibali malunga nendlela umcwangcisi alungiselela ngayo i-Airflow 2.0.I-Apache Airflow enabasebenzi aba-3 beCelery kwi-docker-compose -inqaku eliphelelwe lixesha malunga nokuthumela iqela lethu ngaphakathidocker-compose
.ImiSebenzi emi-4 yokuThengisa usebenzisa uMxholo wokuHamba komoya - Imisebenzi eguqukayo kusetyenziswa iitemplates kunye nogqithiso lomxholo.Izaziso zemposiso kwi-Airflow -Izaziso ezisemgangathweni kunye nesiko ngeposi kunye neSlack.I-Airflow Workshop: iiDAG ezinzima ezingenazo iintonga -Imisebenzi yesebe, iimacros kunye neXCom.
Kwaye amakhonkco asetyenziswe kwinqaku:
ireferensi enkulu -izibambi ndawo ziyafumaneka ukuze zisetyenziswe kwiitemplates.Imigibe eqhelekileyoβUkuhamba komoya - Iimpazamo eziqhelekileyo xa udala iidags.puckel/docker-airflow: Docker Apache Airflow -docker-compose
kuvavanyo, ukulungisa iimpazamo nokunye.I-python-telegram-bot/python-telegram-bot: Sikwenzele isisongelo ongenakwala. -Isonga sePython yeTelegram REST API.
umthombo: www.habr.com