Sveiki, es esmu Dmitrijs LogviÅenko ā uzÅÄmumu grupas Vezet Analytics nodaļas datu inženieris.
PastÄstÄ«Å”u par brÄ«niŔķīgu rÄ«ku ETL procesu izstrÄdei - Apache Airflow. TaÄu Airflow ir tik daudzpusÄ«ga un daudzpusÄ«ga, ka jums vajadzÄtu to aplÅ«kot vÄrÄ«gÄk pat tad, ja neesat iesaistÄ«ts datu plÅ«smÄs, bet jums ir nepiecieÅ”ams periodiski palaist kÄdus procesus un uzraudzÄ«t to izpildi.
Un jÄ, es ne tikai pastÄstÄ«Å”u, bet arÄ« parÄdÄ«Å”u: programmÄ ir daudz koda, ekrÄnuzÅÄmumu un ieteikumu.
Ko parasti redzat, meklÄjot Google vÄrdu Airflow / Wikimedia Commons
- tikai labÄk, un tas tika izgatavots pavisam citiem mÄrÄ·iem, proti (kÄ rakstÄ«ts pirms kata):
uzdevumu izpilde un uzraudzÄ«ba neierobežotÄ skaitÄ iekÄrtu (cik daudz Selery/Kubernetes un jÅ«su sirdsapziÅa to ļaus)
ar dinamisku darbplÅ«smas Ä£enerÄÅ”anu no ļoti viegli rakstÄma un saprotama Python koda
un iespÄja savienot jebkuras datu bÄzes un API savÄ starpÄ, izmantojot gan gatavus komponentus, gan paÅ”taisÄ«tus spraudÅus (kas ir ÄrkÄrtÄ«gi vienkÄrÅ”i).
MÄs izmantojam Apache Airflow Å”Ädi:
mÄs apkopojam datus no dažÄdiem avotiem (daudziem SQL Server un PostgreSQL gadÄ«jumiem, dažÄdÄm API ar lietojumprogrammu metriku, pat 1C) DWH un ODS (mums ir Vertica un Clickhouse).
cik attÄ«stÄ«ts cron, kas uzsÄk datu konsolidÄcijas procesus ODS, kÄ arÄ« uzrauga to uzturÄÅ”anu.
VÄl nesen mÅ«su vajadzÄ«bas apmierinÄja viens neliels serveris ar 32 kodoliem un 50 GB RAM. Gaisa plÅ«smÄ tas darbojas:
vairÄk 200 dienas (faktiski darbplÅ«smas, kurÄs mÄs ievietojÄm uzdevumus),
katrÄ vidÄji 70 uzdevumi,
Å”is labums sÄkas (arÄ« vidÄji) reizi stundÄ.
Un par to, kÄ mÄs paplaÅ”inÄjÄmies, es rakstÄ«Å”u zemÄk, bet tagad definÄsim Ć¼ber-problÄmu, kuru mÄs atrisinÄsim:
Ir trÄ«s avota SQL serveri, katrs ar 50 datu bÄzÄm - attiecÄ«gi viena projekta gadÄ«jumi, tiem ir vienÄda struktÅ«ra (gandrÄ«z visur, mua-ha-ha), kas nozÄ«mÄ, ka katram ir pasÅ«tÄ«jumu tabula (par laimi, tabula ar to vÄrdu var iespiest jebkurÄ biznesÄ). MÄs Åemam datus, pievienojot servisa laukus (avota serveris, avota datu bÄze, ETL uzdevuma ID) un naivi iemetam tos, piemÄram, Vertica.
Iesim!
GalvenÄ daļa, praktiskÄ (un nedaudz teorÄtiska)
KÄpÄc mÄs (un jÅ«s)
Kad koki bija lieli un es biju vienkÄrÅ”s SQL-schik vienÄ Krievijas mazumtirdzniecÄ«bÄ mÄs izkrÄpÄm ETL procesus jeb datu plÅ«smas, izmantojot divus mums pieejamos rÄ«kus:
InformÄcijas enerÄ£ijas centrs - ÄrkÄrtÄ«gi izplatÄ«ta sistÄma, ÄrkÄrtÄ«gi produktÄ«va, ar savu aparatÅ«ru, savu versiju veidoÅ”anu. Es izmantoju 1% no tÄs iespÄjÄm. KÄpÄc? PirmkÄrt, Ŕī saskarne kaut kur no 380. gadiem radÄ«ja mums garÄ«gu spiedienu. OtrkÄrt, Ŕī ierÄ«ce ir paredzÄta ÄrkÄrtÄ«gi smalkiem procesiem, niknai komponentu atkÄrtotai izmantoÅ”anai un citiem ļoti svarÄ«giem uzÅÄmuma trikiem. Par to, ka tas maksÄ, tÄpat kÄ Airbus AXNUMX spÄrns / gadÄ, mÄs neko neteiksim.
Uzmanieties, ekrÄnuzÅÄmums var nedaudz ievainot cilvÄkus, kas jaunÄki par 30 gadiem
SQL servera integrÄcijas serveris - mÄs izmantojÄm Å”o biedru mÅ«su iekÅ”ÄjÄs projekta plÅ«smÄs. Nu, patiesÄ«bÄ: mÄs jau izmantojam SQL Server, un bÅ«tu kaut kÄ nesaprÄtÄ«gi neizmantot tÄ ETL rÄ«kus. Viss tajÄ ir labs: gan interfeiss ir skaists, gan progresa ziÅojumi... Bet ne tÄpÄc mÄs mÄ«lam programmatÅ«ras produktus, ak, ne tÄpÄc. Versija to dtsx (kas ir XML ar mezgliem, kas sajaukti saglabÄÅ”anas laikÄ) mÄs varam, bet kÄda jÄga? KÄ bÅ«tu ar uzdevumu pakotnes izveidi, kas vilks simtiem tabulu no viena servera uz otru? JÄ, kÄds simts, rÄdÄ«tÄjpirksts nokritÄ«s no divdesmit gabaliÅiem, noklikŔķinot uz peles pogas. Bet tas noteikti izskatÄs modernÄk:
MÄs noteikti meklÄjÄm izejas. Lieta pat gandrÄ«z nonÄcis pie paÅ”rakstÄ«ta SSIS pakotÅu Ä£eneratora ...
ā¦un tad mani atrada jauns darbs. Un Apache Airflow mani apsteidza tajÄ.
Kad uzzinÄju, ka ETL procesu apraksti ir vienkÄrÅ”s Python kods, es vienkÄrÅ”i nedejoju aiz prieka. TÄdÄ veidÄ datu straumes tika versÄtas un diferencÄtas, un tabulu ar vienu struktÅ«ru ielieÅ”ana no simtiem datu bÄzu vienÄ mÄrÄ·Ä« kļuva par Python koda jautÄjumu pusotra vai divos 13 collu ekrÄnos.
Klastera salikŔana
NekÄrtosim pilnÄ«gi bÄrnudÄrzu un nerunÄsim Å”eit par pilnÄ«gi paÅ”saprotamÄm lietÄm, piemÄram, Airflow instalÄÅ”anu, jÅ«su izvÄlÄto datubÄzi, Selerijas un citiem dokos aprakstÄ«tajiem gadÄ«jumiem.
Lai mÄs nekavÄjoties varÄtu sÄkt eksperimentus, es ieskicÄju docker-compose.yml kurÄ:
PatiesÄ«bÄ paaugstinÄsim Airflow: plÄnotÄjs, tÄ«mekļa serveris. Flower arÄ« tur griezÄ«sies, lai uzraudzÄ«tu Selerijas uzdevumus (jo tas jau ir iespiests apache/airflow:1.10.10-python3.7, bet mums nav nekas pretÄ«)
PostgreSQL, kurÄ Airflow ierakstÄ«s savu servisa informÄciju (plÄnotÄja datus, izpildes statistiku utt.), bet Selery atzÄ«mÄs izpildÄ«tos uzdevumus;
Redis, kas darbosies kÄ Selerijas uzdevumu brokeris;
Selerijas strÄdnieks, kas nodarbosies ar tieÅ”u uzdevumu izpildi.
Uz mapi ./dags mÄs pievienosim savus failus ar dags aprakstu. Tie tiks savÄkti lidojuma laikÄ, tÄpÄc pÄc katras ŔķaudÄ«Å”anas nav nepiecieÅ”ams žonglÄt ar visu kaudzÄ«ti.
DažÄs vietÄs kods piemÄros nav pilnÄ«bÄ parÄdÄ«ts (lai nepÄrblÄ«vÄtu tekstu), bet kaut kur tas tiek modificÄts procesÄ. Pilnus darba kodu piemÄrus var atrast repozitorijÄ https://github.com/dm-logv/airflow-tutorial.
KompozÄ«cijas montÄÅ¾Ä lielÄ mÄrÄ paļÄvos uz labi zinÄmo tÄlu puckel/docker-gaisa plÅ«sma - noteikti pÄrbaudiet to. VarbÅ«t tev dzÄ«vÄ nekas cits nav vajadzÄ«gs.
Visi gaisa plÅ«smas iestatÄ«jumi ir pieejami ne tikai caur airflow.cfg, bet arÄ« ar vides mainÄ«gajiem (paldies izstrÄdÄtÄjiem), kurus es ļaunprÄtÄ«gi izmantoju.
Protams, tas nav gatavs ražoÅ”anai: es apzinÄti neliku sirdspukstus uz konteineriem, es neuztraucos ar droŔību. Bet es izdarÄ«ju mÅ«su eksperimentÄtÄjiem piemÄroto minimumu.
Pieraksti to:
Mapei dag ir jÄbÅ«t pieejamai gan plÄnotÄjam, gan darbiniekiem.
Tas pats attiecas uz visÄm treÅ”o puÅ”u bibliotÄkÄm ā tÄm visÄm jÄbÅ«t instalÄtÄm iekÄrtÄs ar plÄnotÄju un darbiniekiem.
Nu, tagad tas ir vienkÄrÅ”i:
$ docker-compose up --scale worker=3
Kad viss ir pacÄlies, varat apskatÄ«t tÄ«mekļa saskarnes:
Ja jÅ«s neko nesapratÄt visos Å”ajos "dags", tad Å”eit ir Ä«sa vÄrdnÄ«ca:
PlÄnotÄjs - vissvarÄ«gÄkais onkulis Airflow, kas kontrolÄ, lai roboti smagi strÄdÄtu, nevis cilvÄks: uzrauga grafiku, atjaunina dienas, palaiž uzdevumus.
KopumÄ vecÄkÄs versijÄs viÅam bija problÄmas ar atmiÅu (nÄ, nevis amnÄzija, bet noplÅ«des) un mantotais parametrs pat palika konfigurÄcijÄs run_duration ā tÄ restartÄÅ”anas intervÄls. Bet tagad viss ir kÄrtÄ«bÄ.
DAG (aka "dag") - "virzÄ«ts aciklisks grafiks", taÄu Å”Äda definÄ«cija pateiks dažiem cilvÄkiem, taÄu patiesÄ«bÄ tas ir konteiners uzdevumiem, kas mijiedarbojas viens ar otru (skatÄ«t zemÄk) vai paketes SSIS un darbplÅ«smas analogs informaticÄ. .
Papildus dagiem joprojÄm var bÅ«t subdags, bet mÄs, visticamÄk, lÄ«dz tiem netiksim.
DAG SkrÄjiens - inicializÄts dag, kuram tiek pieŔķirts savs execution_date. TÄ paÅ”a dag Dagrans var strÄdÄt paralÄli (ja jÅ«s, protams, padarÄ«jÄt savus uzdevumus idempotentus).
operators ir koda daļas, kas ir atbildīgas par noteiktas darbības veikŔanu. Ir trīs veidu operatori:
rÄ«cÄ«bakÄ mÅ«su mīļÄkie PythonOperator, kas var izpildÄ«t jebkuru (derÄ«gu) Python kodu;
pÄrsÅ«tÄ«t, kas pÄrsÅ«ta datus no vienas vietas uz otru, piemÄram, MsSqlToHiveTransfer;
devÄjs no otras puses, tas ļaus jums reaÄ£Ät vai palÄninÄt turpmÄko dag izpildi, lÄ«dz notiek notikums. HttpSensor var izvilkt norÄdÄ«to beigu punktu un, kad vÄlamÄ atbilde gaida, sÄkt pÄrsÅ«tÄ«Å”anu GoogleCloudStorageToS3Operator. ZiÅkÄrÄ«gs prÄts jautÄs: āKÄpÄc? Galu galÄ jÅ«s varat veikt atkÄrtojumus tieÅ”i pie operatora! Un pÄc tam, lai neaizsprostotu uzdevumu kopumu ar apturÄtajiem operatoriem. Sensors ieslÄdzas, pÄrbauda un nomirst pirms nÄkamÄ mÄÄ£inÄjuma.
Uzdevums - deklarÄtie operatori neatkarÄ«gi no veida un pievienoti dag tiek paaugstinÄti uz uzdevuma pakÄpi.
uzdevuma gadÄ«jums - kad Ä£enerÄlplÄnotÄjs nolÄma, ka ir pienÄcis laiks sÅ«tÄ«t uzdevumus kaujÄ izpildÄ«tÄjiem-strÄdniekiem (tieÅ”i uz vietas, ja mÄs izmantojam LocalExecutor vai uz attÄlo mezglu, ja CeleryExecutor), tas pieŔķir tiem kontekstu (t.i., mainÄ«go lielumu kopu - izpildes parametrus), paplaÅ”ina komandu vai vaicÄjumu veidnes un apvieno tÄs.
MÄs Ä£enerÄjam uzdevumus
Vispirms ieskicÄtu mÅ«su douga vispÄrÄjo shÄmu, un tad arvien vairÄk iedziļinÄsimies detaļÄs, jo mÄs izmantojam dažus netriviÄlus risinÄjumus.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
IzdomÄsim:
PirmkÄrt, mÄs importÄjam nepiecieÅ”amos libs un kaut kas cits;
sql_server_ds -Å o List[namedtuple[str, str]] ar savienojumu nosaukumiem no Airflow Connections un datu bÄzÄm, no kurÄm mÄs Åemsim savu plÄksni;
dag - mÅ«su dienas paziÅojums, kuram obligÄti jÄbÅ«t iekÅ”Ä globals(), pretÄjÄ gadÄ«jumÄ Airflow to neatradÄ«s. Dagam arÄ« jÄsaka:
kÄds ir viÅa vÄrds orders - Å”is nosaukums tiks parÄdÄ«ts tÄ«mekļa saskarnÄ,
ka viÅÅ” strÄdÄs no astotÄ jÅ«lija pusnakts,
un tam vajadzÄtu darboties aptuveni ik pÄc 6 stundÄm (nevis skarbajiem puiÅ”iem timedelta() pieļaujama cron- lÄ«nija 0 0 0/6 ? * * *, mazÄk forÅ”ajiem - izteiciens patÄ«k @daily);
workflow() darÄ«s galveno darbu, bet ne tagad. PagaidÄm mÄs vienkÄrÅ”i iekļausim savu kontekstu žurnÄlÄ.
Un tagad vienkÄrÅ”a uzdevumu izveides burvÄ«ba:
mÄs skrienam cauri saviem avotiem;
palaist PythonOperator, kas izpildÄ«s mÅ«su manekenu workflow(). Neaizmirstiet norÄdÄ«t unikÄlu (dag ietvaros) uzdevuma nosaukumu un piesaistÄ«t paÅ”u dag. Karogs provide_context savukÄrt funkcijÄ iebÄrs papildu argumentus, kurus rÅ«pÄ«gi apkoposim izmantojot **context.
PagaidÄm tas arÄ« viss. Ko mÄs saÅÄmÄm:
jauna diena tÄ«mekļa saskarnÄ,
pusotrs simts uzdevumu, kas tiks izpildÄ«ti paralÄli (ja to ļaus Airflow, Selery iestatÄ«jumi un servera jauda).
Nu, gandrīz sapratu.
KurÅ” instalÄs atkarÄ«bas?
Lai vienkÄrÅ”otu Å”o visu, es ieskrÅ«vÄju docker-compose.yml apstrÄde requirements.txt visos mezglos.
Tagad tas ir pazudis:
PelÄki kvadrÄti ir uzdevumu gadÄ«jumi, ko apstrÄdÄ plÄnotÄjs.
Nedaudz pagaidÄm, darbus Ä·er strÄdnieki:
Zaļie, protams, savu darbu ir veiksmīgi pabeiguŔi. Sarkanie nav īpaŔi veiksmīgi.
Starp citu, mÅ«su prod nav mapes ./dags, nav sinhronizÄcijas starp maŔīnÄm - visi dags atrodas iekÅ”Ä git mÅ«su Gitlab, un Gitlab CI izplata atjauninÄjumus iekÄrtÄm, kad tÄs tiek apvienotas master.
Mazliet par Ziedu
KamÄr strÄdnieki dauza mÅ«su knupÄ«Å”us, atcerÄsimies vÄl vienu rÄ«ku, kas var mums kaut ko parÄdÄ«t - Ziedu.
Pati pirmÄ lapa ar kopsavilkuma informÄciju par darbinieku mezgliem:
VisintensÄ«vÄkÄ lapa ar uzdevumiem, kas tika veikti:
GarlaicÄ«gÄkÄ lapa ar mÅ«su brokera statusu:
SpilgtÄkÄ lapa ir ar uzdevumu statusa grafikiem un to izpildes laiku:
MÄs ielÄdÄjam nepietiekami noslogotos
TÄtad, visi uzdevumi ir izpildÄ«ti, jÅ«s varat aizvest ievainotos.
Un bija daudz ievainoto - viena vai otra iemesla dÄļ. Pareizas Airflow lietoÅ”anas gadÄ«jumÄ tieÅ”i Å”ie kvadrÄti norÄda, ka dati noteikti nav saÅemti.
Jums jÄskatÄs žurnÄls un jÄrestartÄ krituÅ”Äs uzdevumu instances.
NoklikŔķinot uz jebkura kvadrÄta, mÄs redzÄsim mums pieejamÄs darbÄ«bas:
JÅ«s varat Åemt un padarÄ«t Clear krituÅ”o. Tas ir, mÄs aizmirstam, ka tur kaut kas neizdevÄs, un tas pats instances uzdevums tiks nosÅ«tÄ«ts plÄnotÄjam.
Skaidrs, ka to darÄ«t ar peli ar visiem sarkanajiem kvadrÄtiÅiem nav Ä«paÅ”i humÄni ā tas nav tas, ko mÄs sagaidÄm no Airflow. Protams, mums ir masu iznÄ«cinÄÅ”anas ieroÄi: Browse/Task Instances
AtlasÄ«sim visu uzreiz un atiestatÄ«sim uz nulli, noklikŔķiniet uz pareizÄ vienuma:
PÄc tÄ«rÄ«Å”anas mÅ«su taksometri izskatÄs Å”Ädi (viÅi jau gaida, kad plÄnotÄjs tos saplÄnos):
Savienojumi, ÄÄ·i un citi mainÄ«gie
Ir pienÄcis laiks apskatÄ«t nÄkamo DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ŠŠ¾ŃŠæŠ¾Š“Š° Ń Š¾ŃŠ¾ŃŠøŠµ, Š¾ŃŃŠµŃŃ Š¾Š±Š½Š¾Š²Š»ŠµŠ½Ń"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ŠŠ°ŃŠ°Ń, ŠæŃŠ¾ŃŃŠæŠ°Š¹ŃŃ, Š¼Ń {{ dag.dag_id }} ŃŃŠ¾Š½ŠøŠ»Šø
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Vai visi kÄdreiz ir veikuÅ”i pÄrskata atjauninÄÅ”anu? Å Ä« atkal ir viÅa: ir saraksts ar avotiem, no kuriem iegÅ«t datus; ir saraksts, kur likt; neaizmirstiet paburkŔķÄt, kad viss notika vai salÅ«za (nu, tas nav par mums, nÄ).
PÄrskatÄ«sim failu vÄlreiz un apskatÄ«sim jaunos neskaidros materiÄlus:
from commons.operators import TelegramBotSendMessage - nekas neliedz mums izveidot savus operatorus, ko izmantojÄm, izveidojot nelielu iesaiÅojumu ziÅojumu nosÅ«tÄ«Å”anai uz Unbloed. (Par Å”o operatoru vairÄk runÄsim tÄlÄk);
default_args={} - dag var izplatÄ«t vienÄdus argumentus visiem saviem operatoriem;
to='{{ var.value.all_the_kings_men }}' - lauks to mums nebÅ«s kodÄts, bet dinamiski Ä£enerÄts, izmantojot Jinja un mainÄ«go ar e-pasta sarakstu, ko es rÅ«pÄ«gi ievietoju Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS ā nosacÄ«jums operatora palaiÅ”anai. MÅ«su gadÄ«jumÄ vÄstule aizlidos pie priekÅ”niekiem tikai tad, ja visas atkarÄ«bas bÅ«s atrisinÄtas veiksmÄ«gi;
tg_bot_conn_id='tg_main' - argumenti conn_id pieÅemt mÅ«su izveidotos savienojuma ID Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - ziÅojumi telegrammÄ aizlidos tikai tad, ja bÅ«s nokrituÅ”i uzdevumi;
task_concurrency=1 - mÄs aizliedzam vairÄku viena uzdevuma uzdevumu vienlaicÄ«gu palaiÅ”anu. PretÄjÄ gadÄ«jumÄ mÄs saÅemsim vairÄku vienlaicÄ«gu palaiÅ”anu VerticaOperator (skatoties uz vienu galdu);
report_update >> [email, tg] - viss VerticaOperator saplÅ«st vÄstuļu un ziÅojumu sÅ«tÄ«Å”anÄ, piemÄram:
Bet, tÄ kÄ paziÅotÄju operatoriem ir dažÄdi palaiÅ”anas nosacÄ«jumi, darbosies tikai viens. Koka skatÄ viss izskatÄs mazÄk vizuÄli:
Es teikÅ”u dažus vÄrdus par makro un viÅu draugi - mainÄ«gie.
Makro ir Jinja vietturi, kas operatora argumentos var aizstÄt dažÄdu noderÄ«gu informÄciju. PiemÄram, Å”Ädi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} tiks paplaÅ”inÄts lÄ«dz konteksta mainÄ«gÄ saturam execution_date formÄtÄ YYYY-MM-DD: 2020-07-14. LabÄkais ir tas, ka konteksta mainÄ«gie tiek pienagloti konkrÄtam uzdevuma gadÄ«jumam (kvadrÄtiÅam koka skatÄ), un, restartÄjot, vietturi tiks paplaÅ”inÄti lÄ«dz tÄdÄm paÅ”Äm vÄrtÄ«bÄm.
PieŔķirtÄs vÄrtÄ«bas var apskatÄ«t, izmantojot pogu Rendered katrÄ uzdevuma instancÄ. Å is ir uzdevums ar vÄstules nosÅ«tÄ«Å”anu:
Un tÄ uzdevumÄ ar ziÅojuma nosÅ«tÄ«Å”anu:
Pilns jaunÄkÄs pieejamÄs versijas iebÅ«vÄto makro saraksts ir pieejams Å”eit: makro atsauce
TurklÄt ar spraudÅu palÄ«dzÄ«bu mÄs varam deklarÄt savus makro, bet tas ir cits stÄsts.
Papildus iepriekÅ” definÄtajÄm lietÄm mÄs varam aizstÄt mÅ«su mainÄ«go vÄrtÄ«bas (es to jau izmantoju iepriekÅ” minÄtajÄ kodÄ). Izveidosim iekÅ”Ä Admin/Variables pÄris lietas:
vienkÄrÅ”i izmantojiet ceļu uz vajadzÄ«go atslÄgu: {{ var.json.bot_config.bot.token }}.
Es burtiski teikÅ”u vienu vÄrdu un parÄdÄ«Å”u vienu ekrÄnuzÅÄmumu par savienojumi. Å eit viss ir elementÄri: lapÄ Admin/Connections mÄs izveidojam savienojumu, pievienojam savus pieteikumvÄrdus / paroles un specifiskÄkus parametrus. KÄ Å”is:
Paroles var Å”ifrÄt (pamatÄ«gÄk nekÄ noklusÄjuma), vai arÄ« varat nepievienot savienojuma veidu (kÄ es to darÄ«ju tg_main) - fakts ir tÄds, ka Airflow modeļos tipu saraksts ir savienots un to nevar paplaÅ”inÄt, neiedziļinoties avota kodos (ja pÄkÅ”Åi kaut ko nemeklÄju googlÄ, lÅ«dzu, izlabojiet mani), taÄu nekas netraucÄs mums iegÅ«t kredÄ«tus. nosaukums.
Varat arÄ« izveidot vairÄkus savienojumus ar tÄdu paÅ”u nosaukumu: Å”ajÄ gadÄ«jumÄ metode BaseHook.get_connection(), kas iegÅ«st mums savienojumus pÄc nosaukuma, dos nejauÅ”i no vairÄkiem vÄrdabrÄliem (loÄ£iskÄk bÅ«tu uztaisÄ«t Round Robin, bet atstÄsim to uz Airflow izstrÄdÄtÄju sirdsapziÅas).
MainÄ«gie un savienojumi noteikti ir lieliski rÄ«ki, taÄu ir svarÄ«gi nezaudÄt lÄ«dzsvaru: kuras plÅ«smas daļas jÅ«s saglabÄjat paÅ”Ä kodÄ un kuras daļas nododat glabÄÅ”anai Airflow. No vienas puses, var bÅ«t Ärti Ätri mainÄ«t vÄrtÄ«bu, piemÄram, pasta kastÄ«ti, izmantojot lietotÄja saskarni. No otras puses, Ŕī joprojÄm ir atgrieÅ”anÄs pie peles klikŔķa, no kura mÄs (es) gribÄjÄm atbrÄ«voties.
Darbs ar savienojumiem ir viens no uzdevumiem ÄÄ·i. KopumÄ Airflow ÄÄ·i ir punkti, lai to savienotu ar treÅ”o puÅ”u pakalpojumiem un bibliotÄkÄm. PiemÄram, JiraHook atvÄrs klientu, lai mÄs varÄtu sadarboties ar Jira (jÅ«s varat pÄrvietot uzdevumus uz priekÅ”u un atpakaļ), un ar SambaHook varat nosÅ«tÄ«t vietÄjo failu smb- punkts.
PielÄgotÄ operatora parsÄÅ”ana
Un mÄs tuvojÄmies tam, lai apskatÄ«tu, kÄ tas ir izgatavots TelegramBotSendMessage
Kods commons/operators.py ar faktisko operatoru:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Å eit, tÄpat kÄ viss pÄrÄjais Airflow, viss ir ļoti vienkÄrÅ”s:
Mantojums no BaseOperator, kas ievieŔ diezgan daudz ar gaisa plūsmu raksturīgu lietu (skatieties uz savu atpūtu)
DeklarÄtie lauki template_fields, kurÄ Jinja meklÄs makro, ko apstrÄdÄt.
SakÄrtoja pareizos argumentus par __init__(), iestatiet noklusÄjuma iestatÄ«jumus, ja nepiecieÅ”ams.
NeaizmirsÄm arÄ« par senÄa inicializÄciju.
AtvÄra atbilstoÅ”o ÄÄ·i TelegramBotHooksaÅÄma no tÄ klienta objektu.
IgnorÄta (pÄrdefinÄta) metode BaseOperator.execute(), kuru Airfow raustÄ«s, kad pienÄks laiks palaist operatoru - tajÄ mÄs Ä«stenosim galveno darbÄ«bu, aizmirstot pieteikties. (Starp citu, mÄs piesakÄmies tieÅ”i stdout Šø stderr - Gaisa plÅ«sma visu pÄrtvers, skaisti iesaiÅos, sadalÄ«s, kur nepiecieÅ”ams.)
PaskatÄ«simies, kas mums ir commons/hooks.py. PirmÄ faila daļa ar paÅ”u ÄÄ·i:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Es pat nezinu, ko Å”eit paskaidrot, es tikai atzÄ«mÄÅ”u svarÄ«gos punktus:
MÄs mantojam, domÄjam par argumentiem - vairumÄ gadÄ«jumu tas bÅ«s viens: conn_id;
Standartmetožu ignorÄÅ”ana: es sevi ierobežoju get_conn(), kurÄ es iegÅ«stu savienojuma parametrus pÄc nosaukuma un vienkÄrÅ”i iegÅ«stu sadaļu extra (tas ir JSON lauks), kurÄ es (saskaÅÄ ar saviem norÄdÄ«jumiem!) ievietoju Telegram robota pilnvaru: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Es izveidoju mÅ«su piemÄru TelegramBot, pieŔķirot tam Ä«paÅ”u pilnvaru.
Tas ir viss. JÅ«s varat iegÅ«t klientu no ÄÄ·a, izmantojot TelegramBotHook().clent vai TelegramBotHook().get_conn().
Un faila otrÄ daļa, kurÄ es izveidoju mikroiesaiÅojumu Telegram REST API, lai nevilktu to paÅ”u python-telegram-bot vienai metodei sendMessage.
Pareizais veids ir to visu saskaitÄ«t: TelegramBotSendMessage, TelegramBotHook, TelegramBot - spraudnÄ« ievietojiet publiskÄ repozitorijÄ un nododiet to atvÄrtajam pirmkodam.
KamÄr mÄs to visu pÄtÄ«jÄm, mÅ«su pÄrskatu atjauninÄjumi veiksmÄ«gi neizdevÄs un man kanÄlÄ nosÅ«tÄ«ja kļūdas ziÅojumu. Es ieÅ”u pÄrbaudÄ«t, vai tas nav kÄrtÄ«bÄ...
MÅ«su dogÄ kaut kas salÅ«za! Vai tas nav tas, ko mÄs gaidÄ«jÄm? tieÅ”i tÄ!
Vai tu taisies liet?
Vai tev liekas, ka es kaut ko palaidu garÄm? Å Ä·iet, ka viÅÅ” solÄ«ja pÄrsÅ«tÄ«t datus no SQL servera uz Vertica un tad ÅÄma un aizgÄja no tÄmas, nelietis!
Å Ä« zvÄrÄ«ba bija tÄ«Å”a, man vienkÄrÅ”i bija jÄatÅ”ifrÄ kÄda terminoloÄ£ija jÅ«su vietÄ. Tagad jÅ«s varat doties tÄlÄk.
MÅ«su plÄns bija Å”Äds:
Do dag
Ä¢enerÄjiet uzdevumus
Redziet, cik viss ir skaisti
PieŔķiriet aizpildījumam sesijas numurus
Iegūstiet datus no SQL Server
Ievietojiet datus Vertica
SavÄkt statistiku
TÄpÄc, lai tas viss sÄktu darboties, es veicu nelielu papildinÄjumu mÅ«su docker-compose.yml:
Vertica kÄ saimniekdators dwh ar visvairÄk noklusÄjuma iestatÄ«jumiem,
trīs SQL Server gadījumi,
mÄs aizpildÄm pÄdÄjÄs esoÅ”Äs datu bÄzes ar dažiem datiem (nekÄdÄ gadÄ«jumÄ neieskatieties mssql_init.py!)
MÄs palaižam visu labo, izmantojot nedaudz sarežģītÄku komandu nekÄ pagÄjuÅ”ajÄ reizÄ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Varat izmantot Å”o vienumu, ko Ä£enerÄja mÅ«su brÄ«numu nejauÅ”inÄtÄjs Data Profiling/Ad Hoc Query:
Galvenais to nerÄdÄ«t analÄ«tiÄ·iem
sÄ«kÄk izstrÄdÄt ETL sesijas Es nedarÄ«Å”u, tur viss ir triviÄli: mÄs izveidojam pamatni, tajÄ ir zÄ«me, mÄs visu aptinam ar konteksta pÄrvaldnieku, un tagad mÄs darÄm Å”Ädi:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Laiks ir pienÄcis apkopot mÅ«su datus no mÅ«su pusotra simta galdiÅiem. DarÄ«sim to ar ļoti nepretenciozu lÄ«niju palÄ«dzÄ«bu:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Ar ÄÄ·a palÄ«dzÄ«bu tiekam no Airflow pymssql- savienot
Tiek izpildÄ«ts mÅ«su pieprasÄ«jums pandaskurÅ” mÅ«s dabÅ«s DataFrame - tas mums noderÄs nÄkotnÄ.
Es izmantoju aizstÄÅ”anu {dt} pieprasÄ«juma parametra vietÄ %s nevis tÄpÄc, ka es bÅ«tu ļauns Pinokio, bet gan tÄpÄc pandas nevar tikt galÄ pymssql un paslÄ«d pÄdÄjo params: Listlai gan viÅÅ” ļoti vÄlas tuple.
Å emiet vÄrÄ arÄ« to, ka izstrÄdÄtÄjs pymssql nolÄma viÅu vairs neatbalstÄ«t, un ir pienÄcis laiks izvÄkties pyodbc.
ApskatÄ«sim, ar ko Airflow papildinÄja mÅ«su funkciju argumentus:
Ja nav datu, tad nav jÄgas turpinÄt. Bet ir arÄ« dÄ«vaini uzskatÄ«t pildÄ«jumu par veiksmÄ«gu. Bet tÄ nav kļūda. A-ah-ah, ko darÄ«t?! Un, lÅ«k, kas:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException pateiks Airflow, ka kļūdu nav, bet mÄs izlaižam uzdevumu. Interfeisam bÅ«s nevis zaļŔ vai sarkans kvadrÄts, bet gan rozÄ.
DatubÄze, no kuras mÄs saÅÄmÄm pasÅ«tÄ«jumus,
Mūsu plūdu sesijas ID (tas būs atŔķirīgs katram uzdevumam),
Hash no avota un pasÅ«tÄ«juma ID - lai gala datu bÄzÄ (kur viss ir saliets vienÄ tabulÄ) mums bÅ«tu unikÄls pasÅ«tÄ«juma ID.
Atliek priekÅ”pÄdÄjais solis: ielej visu VertikÄ. Un, dÄ«vainÄ kÄrtÄ, viens no iespaidÄ«gÄkajiem un efektÄ«vÄkajiem veidiem, kÄ to izdarÄ«t, ir CSV fails!
- Nu, - sacÄ«ja mazÄ pele, - vai ne, tagad
Vai esat pÄrliecinÄts, ka esmu visbriesmÄ«gÄkais dzÄ«vnieks mežÄ?
Džūlija Donaldsone, Grufalo
Es domÄju, ja man un maniem kolÄÄ£iem bÅ«tu konkurss: kurÅ” Ätri izveidos un sÄks ETL procesu no nulles: viÅi ar savu SSIS un peli un es ar Airflow ... Un tad mÄs arÄ« salÄ«dzinÄtu apkopes vieglumu ... Oho, es domÄju, ka piekritÄ«siet, ka es viÅus pÄrspÄÅ”u visÄs frontÄs!
Ja nedaudz nopietnÄk, tad Apache Airflow - aprakstot procesus programmas koda veidÄ - izdarÄ«ja manu darbu daudz ÄrtÄk un patÄ«kamÄk.
TÄ neierobežotÄ paplaÅ”inÄmÄ«ba gan spraudÅu, gan mÄrogojamÄ«bas ziÅÄ sniedz iespÄju izmantot Airflow gandrÄ«z jebkurÄ jomÄ: pat pilnÄ datu vÄkÅ”anas, sagatavoÅ”anas un apstrÄdes ciklÄ, pat palaižot raÄ·etes (uz Marsu, no kurss).
Daļas noslÄgums, atsauce un informÄcija
GrÄbeklis, ko esam jums savÄkuÅ”i
start_date. JÄ, Ŕī jau ir vietÄja mÄma. Via Doug galvenais arguments start_date viss pÄriet. ÄŖsumÄ, ja norÄdÄt start_date paÅ”reizÄjais datums un schedule_interval - kÄdu dienu, tad DAG sÄksies rÄ«t ne agrÄk.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Un vairs nekÄdu problÄmu.
Ar to ir saistÄ«ta cita izpildlaika kļūda: Task is missing the start_date parameter, kas visbiežÄk norÄda, ka esat aizmirsis saistÄ«t ar dag operatoru.
Viss vienÄ maŔīnÄ. JÄ, un bÄzes (pati Airflow un mÅ«su pÄrklÄjums), un tÄ«mekļa serveris, un plÄnotÄjs, un darbinieki. Un tas pat strÄdÄja. Bet laika gaitÄ pakalpojumu uzdevumu skaits pieauga, un, kad PostgreSQL sÄka reaÄ£Ät uz indeksu 20 s, nevis 5 ms, mÄs to paÅÄmÄm un aiznesÄm.
VietÄjais izpildÄ«tÄjs. JÄ, mÄs joprojÄm uz tÄ sÄžam, un jau esam nonÄkuÅ”i bezdibeÅa malÄ. Ar LocalExecutor mums lÄ«dz Å”im ir bijis pietiekami, bet tagad ir pienÄcis laiks paplaÅ”inÄties ar vismaz vienu darbinieku, un mums bÅ«s smagi jÄstrÄdÄ, lai pÄrietu uz CeleryExecutor. Un, Åemot vÄrÄ to, ka jÅ«s varat strÄdÄt ar to vienÄ maŔīnÄ, nekas neliedz jums izmantot Selery pat serverÄ«, kas, "protams, nekad nenonÄks ražoÅ”anÄ, godÄ«gi sakot!"
NelietoÅ”ana iebÅ«vÄtie instrumenti:
savienojumi uzglabÄt pakalpojumu akreditÄcijas datus,
SLA Miss reaÄ£Ät uz uzdevumiem, kas nav izdevies laikÄ,
xcom metadatu apmaiÅai (es teicu mÄrÄ·isdati!) starp dag uzdevumiem.
Pasta ļaunprÄtÄ«ga izmantoÅ”ana. Nu ko es varu teikt? Par visiem krituÅ”o uzdevumu atkÄrtojumiem tika iestatÄ«ti brÄ«dinÄjumi. Tagad manÄ darba pakalpojumÄ Gmail ir vairÄk nekÄ 90 100 e-pasta ziÅojumu no Airflow, un tÄ«mekļa pasta uzgalis atsakÄs vienlaikus uztvert un dzÄst vairÄk nekÄ XNUMX.
Lai mÄs vÄl vairÄk strÄdÄtu ar galvu, nevis ar rokÄm, Airflow mums ir sagatavojis sekojoÅ”o:
REST API - viÅam joprojÄm ir EksperimentÄla statuss, kas viÅam netraucÄ strÄdÄt. Ar to jÅ«s varat ne tikai iegÅ«t informÄciju par dagiem un uzdevumiem, bet arÄ« apturÄt/sÄkt dag, izveidot DAG Run vai pÅ«lu.
CLI - KomandrindÄ ir pieejami daudzi rÄ«ki, kurus ir ne tikai neÄrti lietot, izmantojot WebUI, bet arÄ« parasti to nav. PiemÄram:
backfill nepiecieÅ”ams, lai restartÄtu uzdevumu gadÄ«jumus.
PiemÄram, atnÄca analÄ«tiÄ·i un teica: āUn jums, biedri, ir muļķības datos no 1. lÄ«dz 13. janvÄrim! Labojiet, labojiet, labojiet, labojiet!" Un tu esi tÄda plÄ«ts virsma:
run, kas ļauj izpildÄ«t vienu gadÄ«jumu uzdevumu un pat iegÅ«t punktus par visÄm atkarÄ«bÄm. TurklÄt jÅ«s varat to palaist, izmantojot LocalExecutor, pat ja jums ir seleriju kopa.
Dara gandrÄ«z to paÅ”u test, tikai arÄ« bÄzÄs neko neraksta.
connections ļauj masveidÄ izveidot savienojumus no Äaulas.
python api - diezgan stingrs mijiedarbÄ«bas veids, kas paredzÄts spraudÅiem, nevis spieto tajÄ ar mazÄm rociÅÄm. Bet kas mums liedz iet /home/airflow/dags, palaist ipython un sÄkt jaukties? Varat, piemÄram, eksportÄt visus savienojumus ar Å”Ädu kodu:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Savienojuma izveide ar Airflow metadatu bÄzi. Es neiesaku tai rakstÄ«t, taÄu uzdevumu stÄvokļu iegÅ«Å”ana dažÄdiem specifiskiem rÄdÄ«tÄjiem var bÅ«t daudz ÄtrÄka un vienkÄrÅ”Äka nekÄ ar kÄdu no API.
PieÅemsim, ka ne visi mÅ«su uzdevumi ir idempotenti, bet dažreiz tie var nokrist, un tas ir normÄli. Bet daži aizsprostojumi jau ir aizdomÄ«gi, un tas bÅ«tu jÄpÄrbauda.
Uzmanieties no SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
atsauces
Un, protams, pirmÄs desmit saites no Google izdoÅ”anas ir mapes Airflow saturs no manÄm grÄmatzÄ«mÄm.
Python un Apache gaisa plÅ«smas dzens - netieÅ”a DAG pÄrsÅ«tÄ«Å”ana, konteksta ievadÄ«Å”ana funkcijÄs, atkal par atkarÄ«bÄm un arÄ« par uzdevumu palaiÅ”anas izlaiÅ”anu.