Tere, Habr! Selles artiklis tahan rääkida ühest suurepärasest tööriistast pakettandmete töötlemise protsesside arendamiseks, näiteks ettevõtte DWH või teie DataLake'i infrastruktuuris. Räägime Apache Airflow-st (edaspidi Airflow). See on Habré puhul ebaõiglaselt tähelepanust ilma jäetud ja põhiosas püüan teid veenda, et vähemalt Airflowle tasub oma ETL/ELT protsesside planeerijat valides vaadata.
Varem kirjutasin DWH teemal mitmeid artikleid, kui töötasin Tinkoff pangas. Nüüd olen saanud osa Mail.Ru Groupi meeskonnast ja töötan välja platvormi andmete analüüsimiseks mänguvaldkonnas. Tegelikult, kui ilmuvad uudised ja huvitavad lahendused, räägime oma meeskonnaga siin meie andmeanalüütika platvormist.
Proloog
Niisiis, alustame. Mis on õhuvool? See on raamatukogu (või
Vaatame nüüd Airflow peamisi üksusi. Mõistes nende olemust ja eesmärki, saate oma protsessi arhitektuuri optimaalselt korraldada. Võib-olla on peamine üksus suunatud atsükliline graafik (edaspidi DAG).
DAG
DAG on teie ülesannete tähendusrikas seos, mida soovite täita rangelt määratletud järjestuses vastavalt konkreetsele ajakavale. Airflow pakub mugavat veebiliidest DAG-ide ja muude üksustega töötamiseks:
DAG võib välja näha selline:
DAG-i kavandamisel määrab arendaja operaatorite komplekti, millele DAG-i ülesanded üles ehitatakse. Siin jõuame teise olulise üksuseni: õhuvoolu operaator.
Ettevõtjad
Operaator on olem, mille alusel luuakse tööeksemplarid, mis kirjeldab, mis tööeksemplari täitmisel juhtub.
- BashOperator – operaator bash-käsu täitmiseks.
- PythonOperator – Pythoni koodi helistamise operaator.
- EmailOperator – operaator meili saatmiseks.
- HTTPOperator – operaator http-päringutega töötamiseks.
- SqlOperator – operaator SQL-koodi täitmiseks.
- Sensor on operaator sündmuse ootamiseks (vajaliku aja saabumine, vajaliku faili ilmumine, rida andmebaasis, API vastus jne jne).
Konkreetsemaid operaatoreid on: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Samuti saate arendada operaatoreid oma omaduste põhjal ja kasutada neid oma projektis. Näiteks lõime MongoDBToHiveViaHdfsTransferi, operaatori dokumentide eksportimiseks MongoDB-st tarusse ja mitu operaatorit
Järgmisena tuleb kõik need ülesannete eksemplarid täita ja nüüd räägime planeerijast.
Planeerija
Airflow'i ülesannete ajakava on üles ehitatud
Igal basseinil on teenindusaegade arvu piirang. DAG-i loomisel antakse sellele bassein:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG-i tasemel määratletud kogumi saab ülesande tasemel alistada.
Eraldi protsess Scheduler vastutab Airflow kõigi toimingute ajastamise eest. Tegelikult tegeleb Scheduler kõigi ülesannete täitmiseks seadistamise mehhanismidega. Enne täitmist läbib ülesanne mitu etappi:
- Eelmised ülesanded on DAG-is täidetud, uue saab järjekorda panna.
- Järjekorda sorteeritakse sõltuvalt ülesannete prioriteedist (prioriteedid saab ka juhtida) ja kui basseinis on vaba pesa, saab ülesande töösse võtta.
- Kui on vaba töötaja seller, saadetakse ülesanne talle; algab töö, mille te probleemisse programmeerisite, kasutades üht või teist operaatorit.
Piisavalt lihtne.
Planeerija töötab kõigi DAG-ide komplektis ja kõigis DAG-ides sisalduvates ülesannetes.
Selleks, et plaanija saaks DAG-iga koostööd alustada, peab DAG määrama ajakava:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Seal on valmis eelseadistuste komplekt: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Võite kasutada ka cron-avaldisi:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Täitmise kuupäev
Et mõista, kuidas Airflow töötab, on oluline mõista, mis on DAG-i täitmise kuupäev. Airflow-s on DAG-il täitmiskuupäeva dimensioon, st sõltuvalt DAG-i töögraafikust luuakse iga täitmiskuupäeva jaoks ülesande eksemplarid. Ja iga täitmiskuupäeva jaoks saab ülesandeid uuesti täita – või näiteks DAG võib töötada samaaegselt mitmel täitmiskuupäeval. See on siin selgelt näidatud:
Kahjuks (või võib-olla õnneks: oleneb olukorrast) kui ülesande täitmine DAG-s on parandatud, siis täitmine eelmisel Täitmiskuupäeval kulgeb korrigeerimisi arvesse võttes. See on hea, kui on vaja eelmiste perioodide andmeid uue algoritmi abil ümber arvutada, kuid see on halb, sest tulemuse reprodutseeritavus läheb kaotsi (loomulikult ei sega keegi Gitist lähtekoodi vajalikku versiooni tagastamast ja arvutama, mis vajate ühekordset, nii nagu vajate).
Ülesannete genereerimine
DAG-i juurutus on Pythonis kood, seega on meil väga mugav viis koodi hulka vähendada näiteks killustatud allikatega töötamisel. Oletame, et teil on allikana kolm MySQL-i killu, peate igasse ronima ja koguma andmeid. Pealegi iseseisvalt ja paralleelselt. Pythoni kood DAG-is võib välja näha selline:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG näeb välja selline:
Sel juhul saate killu lisada või eemaldada lihtsalt seadeid kohandades ja DAG-i värskendades. Mugav!
Võite kasutada ka keerukamat koodi genereerimist, näiteks töötada allikatega andmebaasi kujul või kirjeldada tabeli struktuuri, tabeliga töötamise algoritmi ja DWH infrastruktuuri omadusi arvestades genereerida protsessi N tabeli laadimiseks oma salvestusruumi. Või näiteks töötades API-ga, mis ei toeta loendikujulise parameetriga töötamist, saate sellest loendist genereerida DAG-is N ülesannet, piirata API päringute paralleelsust kogumiga ja kraapida vajalikud andmed API-st. Paindlik!
hoidla
Airflow'l on oma tagavarahoidla, andmebaas (võib olla MySQL või Postgres, meil on Postgres), mis salvestab ülesannete olekud, DAG-id, ühenduse seaded, globaalsed muutujad jne jne. Siinkohal tahaksin öelda, et Airflow repositoorium on väga lihtne (umbes 20 tabelit) ja mugav, kui soovite selle peale ehitada mõnda oma protsessi. Mäletan Informatica hoidlas olevaid 100500 XNUMX tabelit, mida tuli pikka aega uurida, enne kui aru sai, kuidas päringut koostada.
Jälgimine
Arvestades hoidla lihtsust, saate luua endale sobiva ülesannete jälgimise protsessi. Kasutame Zeppelinis märkmikku, kus vaatame ülesannete olekut:
See võib olla ka Airflow enda veebiliides:
Airflow kood on avatud lähtekoodiga, seega oleme Telegramile lisanud hoiatuse. Iga töötav ülesande eksemplar saadab vea ilmnemisel rämpspostiks Telegrami rühma, kuhu kuulub kogu arendus- ja tugimeeskond.
Saame kiire vastuse Telegrami kaudu (vajadusel) ja Zeppelini kaudu saame Airflow ülesannetest üldpildi.
Kogusummas
Airflow on peamiselt avatud lähtekoodiga ja te ei tohiks sellelt imesid oodata. Olge valmis töötava lahenduse loomiseks kulutama aega ja vaeva. Eesmärk on saavutatav, uskuge mind, see on seda väärt. Arengukiirus, paindlikkus, uute protsesside lisamise lihtsus – see meeldib teile. Muidugi peate palju tähelepanu pöörama projekti korraldusele, Airflow enda stabiilsusele: imesid ei juhtu.
Nüüd töötab Airflow iga päev umbes 6,5 tuhat ülesannet. Nad on iseloomult üsna erinevad. On ülesandeid andmete laadimiseks põhi DWH-sse paljudest erinevatest ja väga spetsiifilistest allikatest, on ülesandeid peamise DWH sees olevate kaupluste väljaarvutamise ülesanded, on andmete kiiresse DWH-sse avaldamise ülesanded, on palju-palju erinevaid ülesandeid - ja Airflow närib neid kõiki päevast päeva. Kui rääkida numbrites, siis see on nii Xnumx tuhat Erineva keerukusega ELT-ülesanded DWH-s (Hadoop), ca. 2,5 sada andmebaasi allikatest, see on meeskond 4 ETL-i arendajat, mis jagunevad ETL andmetöötluseks DWH-s ja ELT andmetöötluseks DWH-s ja loomulikult muuks üks admin, kes tegeleb teenuse infrastruktuuriga.
Tuleviku plaanid
Protsesside arv paratamatult kasvab ja peamine, mida me Airflow taristu osas teeme, on skaleerimine. Soovime luua Airflow klastri, eraldada selleri töötajatele paar jalga ning teha isepaljuneva pea koos tööplaanimisprotsesside ja hoidlaga.
Epiloog
See pole muidugi veel kõik, mida tahaksin Airflow kohta rääkida, kuid püüdsin esile tuua peamised punktid. Süües tuleb isu, proovi ja maitseb :)
Allikas: www.habr.com