Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

Tere, Habr! Selles artiklis tahan rääkida ühest suurepärasest tööriistast pakettandmete töötlemise protsesside arendamiseks, näiteks ettevõtte DWH või teie DataLake'i infrastruktuuris. Räägime Apache Airflow-st (edaspidi Airflow). See on Habré puhul ebaõiglaselt tähelepanust ilma jäetud ja põhiosas püüan teid veenda, et vähemalt Airflowle tasub oma ETL/ELT protsesside planeerijat valides vaadata.

Varem kirjutasin DWH teemal mitmeid artikleid, kui töötasin Tinkoff pangas. Nüüd olen saanud osa Mail.Ru Groupi meeskonnast ja töötan välja platvormi andmete analüüsimiseks mänguvaldkonnas. Tegelikult, kui ilmuvad uudised ja huvitavad lahendused, räägime oma meeskonnaga siin meie andmeanalüütika platvormist.

Proloog

Niisiis, alustame. Mis on õhuvool? See on raamatukogu (või raamatukogude komplekt) arendada, planeerida ja jälgida tööprotsesse. Airflow põhiomadus: Pythoni koodi kasutatakse protsesside kirjeldamiseks (arendamiseks). Sellel on sinu projekti ja arenduse korraldamisel palju eeliseid: sisuliselt on sinu (näiteks) ETL-i projekt vaid Pythoni projekt ja seda saad korraldada vastavalt oma soovile, võttes arvesse infrastruktuuri eripära, meeskonna suurust ja muud nõuded. Instrumentaalselt on kõik lihtne. Kasutage näiteks PyCharm + Git. See on imeline ja väga mugav!

Vaatame nüüd Airflow peamisi üksusi. Mõistes nende olemust ja eesmärki, saate oma protsessi arhitektuuri optimaalselt korraldada. Võib-olla on peamine üksus suunatud atsükliline graafik (edaspidi DAG).

DAG

DAG on teie ülesannete tähendusrikas seos, mida soovite täita rangelt määratletud järjestuses vastavalt konkreetsele ajakavale. Airflow pakub mugavat veebiliidest DAG-ide ja muude üksustega töötamiseks:

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

DAG võib välja näha selline:

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

DAG-i kavandamisel määrab arendaja operaatorite komplekti, millele DAG-i ülesanded üles ehitatakse. Siin jõuame teise olulise üksuseni: õhuvoolu operaator.

Ettevõtjad

Operaator on olem, mille alusel luuakse tööeksemplarid, mis kirjeldab, mis tööeksemplari täitmisel juhtub. Airflow vabastab GitHubist sisaldavad juba kasutusvalmis operaatorite komplekti. Näited:

  • BashOperator – operaator bash-käsu täitmiseks.
  • PythonOperator – Pythoni koodi helistamise operaator.
  • EmailOperator – operaator meili saatmiseks.
  • HTTPOperator – operaator http-päringutega töötamiseks.
  • SqlOperator – operaator SQL-koodi täitmiseks.
  • Sensor on operaator sündmuse ootamiseks (vajaliku aja saabumine, vajaliku faili ilmumine, rida andmebaasis, API vastus jne jne).

Konkreetsemaid operaatoreid on: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Samuti saate arendada operaatoreid oma omaduste põhjal ja kasutada neid oma projektis. Näiteks lõime MongoDBToHiveViaHdfsTransferi, operaatori dokumentide eksportimiseks MongoDB-st tarusse ja mitu operaatorit Klõpsake nuppu Maja: CHLoadFromHiveOperator ja CHTableLoaderOperator. Põhimõtteliselt, niipea, kui projektis on sageli kasutatud põhilausetele üles ehitatud koodi, võite mõelda selle uueks lauseks ehitamisele. See lihtsustab edasist arengut ja laiendate oma projekti operaatorite raamatukogu.

Järgmisena tuleb kõik need ülesannete eksemplarid täita ja nüüd räägime planeerijast.

Planeerija

Airflow'i ülesannete ajakava on üles ehitatud Seller. Seller on Pythoni teek, mis võimaldab teil korraldada järjekorda ning ülesannete asünkroonset ja hajutatud täitmist. Õhuvoolu poolel on kõik ülesanded jagatud basseinideks. Basseinid luuakse käsitsi. Tavaliselt on nende eesmärk piirata allikaga töötamise töökoormust või tüpiseerida ülesandeid DWH-s. Pooli saab hallata veebiliidese kaudu:

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

Igal basseinil on teenindusaegade arvu piirang. DAG-i loomisel antakse sellele bassein:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG-i tasemel määratletud kogumi saab ülesande tasemel alistada.
Eraldi protsess Scheduler vastutab Airflow kõigi toimingute ajastamise eest. Tegelikult tegeleb Scheduler kõigi ülesannete täitmiseks seadistamise mehhanismidega. Enne täitmist läbib ülesanne mitu etappi:

  1. Eelmised ülesanded on DAG-is täidetud, uue saab järjekorda panna.
  2. Järjekorda sorteeritakse sõltuvalt ülesannete prioriteedist (prioriteedid saab ka juhtida) ja kui basseinis on vaba pesa, saab ülesande töösse võtta.
  3. Kui on vaba töötaja seller, saadetakse ülesanne talle; algab töö, mille te probleemisse programmeerisite, kasutades üht või teist operaatorit.

Piisavalt lihtne.

Planeerija töötab kõigi DAG-ide komplektis ja kõigis DAG-ides sisalduvates ülesannetes.

Selleks, et plaanija saaks DAG-iga koostööd alustada, peab DAG määrama ajakava:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Seal on valmis eelseadistuste komplekt: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Võite kasutada ka cron-avaldisi:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Täitmise kuupäev

Et mõista, kuidas Airflow töötab, on oluline mõista, mis on DAG-i täitmise kuupäev. Airflow-s on DAG-il täitmiskuupäeva dimensioon, st sõltuvalt DAG-i töögraafikust luuakse iga täitmiskuupäeva jaoks ülesande eksemplarid. Ja iga täitmiskuupäeva jaoks saab ülesandeid uuesti täita – või näiteks DAG võib töötada samaaegselt mitmel täitmiskuupäeval. See on siin selgelt näidatud:

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

Kahjuks (või võib-olla õnneks: oleneb olukorrast) kui ülesande täitmine DAG-s on parandatud, siis täitmine eelmisel Täitmiskuupäeval kulgeb korrigeerimisi arvesse võttes. See on hea, kui on vaja eelmiste perioodide andmeid uue algoritmi abil ümber arvutada, kuid see on halb, sest tulemuse reprodutseeritavus läheb kaotsi (loomulikult ei sega keegi Gitist lähtekoodi vajalikku versiooni tagastamast ja arvutama, mis vajate ühekordset, nii nagu vajate).

Ülesannete genereerimine

DAG-i juurutus on Pythonis kood, seega on meil väga mugav viis koodi hulka vähendada näiteks killustatud allikatega töötamisel. Oletame, et teil on allikana kolm MySQL-i killu, peate igasse ronima ja koguma andmeid. Pealegi iseseisvalt ja paralleelselt. Pythoni kood DAG-is võib välja näha selline:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG näeb välja selline:

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

Sel juhul saate killu lisada või eemaldada lihtsalt seadeid kohandades ja DAG-i värskendades. Mugav!

Võite kasutada ka keerukamat koodi genereerimist, näiteks töötada allikatega andmebaasi kujul või kirjeldada tabeli struktuuri, tabeliga töötamise algoritmi ja DWH infrastruktuuri omadusi arvestades genereerida protsessi N tabeli laadimiseks oma salvestusruumi. Või näiteks töötades API-ga, mis ei toeta loendikujulise parameetriga töötamist, saate sellest loendist genereerida DAG-is N ülesannet, piirata API päringute paralleelsust kogumiga ja kraapida vajalikud andmed API-st. Paindlik!

hoidla

Airflow'l on oma tagavarahoidla, andmebaas (võib olla MySQL või Postgres, meil on Postgres), mis salvestab ülesannete olekud, DAG-id, ühenduse seaded, globaalsed muutujad jne jne. Siinkohal tahaksin öelda, et Airflow repositoorium on väga lihtne (umbes 20 tabelit) ja mugav, kui soovite selle peale ehitada mõnda oma protsessi. Mäletan Informatica hoidlas olevaid 100500 XNUMX tabelit, mida tuli pikka aega uurida, enne kui aru sai, kuidas päringut koostada.

Jälgimine

Arvestades hoidla lihtsust, saate luua endale sobiva ülesannete jälgimise protsessi. Kasutame Zeppelinis märkmikku, kus vaatame ülesannete olekut:

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

See võib olla ka Airflow enda veebiliides:

Airflow on tööriist pakettandmete töötlemise protsesside mugavaks ja kiireks arendamiseks ja hooldamiseks

Airflow kood on avatud lähtekoodiga, seega oleme Telegramile lisanud hoiatuse. Iga töötav ülesande eksemplar saadab vea ilmnemisel rämpspostiks Telegrami rühma, kuhu kuulub kogu arendus- ja tugimeeskond.

Saame kiire vastuse Telegrami kaudu (vajadusel) ja Zeppelini kaudu saame Airflow ülesannetest üldpildi.

Kogusummas

Airflow on peamiselt avatud lähtekoodiga ja te ei tohiks sellelt imesid oodata. Olge valmis töötava lahenduse loomiseks kulutama aega ja vaeva. Eesmärk on saavutatav, uskuge mind, see on seda väärt. Arengukiirus, paindlikkus, uute protsesside lisamise lihtsus – see meeldib teile. Muidugi peate palju tähelepanu pöörama projekti korraldusele, Airflow enda stabiilsusele: imesid ei juhtu.

Nüüd töötab Airflow iga päev umbes 6,5 tuhat ülesannet. Nad on iseloomult üsna erinevad. On ülesandeid andmete laadimiseks põhi DWH-sse paljudest erinevatest ja väga spetsiifilistest allikatest, on ülesandeid peamise DWH sees olevate kaupluste väljaarvutamise ülesanded, on andmete kiiresse DWH-sse avaldamise ülesanded, on palju-palju erinevaid ülesandeid - ja Airflow närib neid kõiki päevast päeva. Kui rääkida numbrites, siis see on nii Xnumx tuhat Erineva keerukusega ELT-ülesanded DWH-s (Hadoop), ca. 2,5 sada andmebaasi allikatest, see on meeskond 4 ETL-i arendajat, mis jagunevad ETL andmetöötluseks DWH-s ja ELT andmetöötluseks DWH-s ja loomulikult muuks üks admin, kes tegeleb teenuse infrastruktuuriga.

Tuleviku plaanid

Protsesside arv paratamatult kasvab ja peamine, mida me Airflow taristu osas teeme, on skaleerimine. Soovime luua Airflow klastri, eraldada selleri töötajatele paar jalga ning teha isepaljuneva pea koos tööplaanimisprotsesside ja hoidlaga.

Epiloog

See pole muidugi veel kõik, mida tahaksin Airflow kohta rääkida, kuid püüdsin esile tuua peamised punktid. Süües tuleb isu, proovi ja maitseb :)

Allikas: www.habr.com

Lisa kommentaar