Pozdravljeni, Habr! V tem članku želim govoriti o enem odličnem orodju za razvoj procesov paketne obdelave podatkov, na primer v infrastrukturi DWH podjetja ali vašega DataLake. Govorili bomo o Apache Airflow (v nadaljevanju Airflow). Na Habréju je neupravičeno prikrajšan za pozornost in v osrednjem delu vas bom skušal prepričati, da se vsaj Airflow splača pogledati, ko izbirate planer za vaše ETL/ELT procese.
Prej sem napisal vrsto člankov na temo DWH, ko sem delal v banki Tinkoff. Zdaj sem postal del skupine Mail.Ru Group in razvijam platformo za analizo podatkov na področju iger. Pravzaprav, ko se bodo pojavile novice in zanimive rešitve, bomo tukaj z mojo ekipo govorili o naši platformi za analizo podatkov.
Prolog
Torej, začnimo. Kaj je Airflow? To je knjižnica (oz
Zdaj pa si poglejmo glavne entitete Airflow. Z razumevanjem njihovega bistva in namena lahko optimalno organizirate svojo procesno arhitekturo. Morda je glavna entiteta usmerjeni aciklični graf (v nadaljevanju DAG).
DAG
DAG je neka smiselna povezava vaših nalog, ki jih želite dokončati v strogo določenem zaporedju v skladu z določenim urnikom. Airflow ponuja priročen spletni vmesnik za delo z skupinami DAG in drugimi entitetami:
DAG bi lahko izgledal takole:
Razvijalec pri oblikovanju DAG-a določi niz operaterjev, na podlagi katerih bodo zgrajene naloge znotraj DAG-a. Tu pridemo do še enega pomembnega subjekta: operaterja pretoka zraka.
Operaterji
Operater je entiteta, na podlagi katere se kreirajo instance opravila, ki opisuje, kaj se bo dogajalo med izvajanjem primerka opravila.
- BashOperator - operater za izvajanje ukaza bash.
- PythonOperator - operater za klic kode Python.
- EmailOperator — operater za pošiljanje elektronske pošte.
- HTTPOperator - operater za delo s http zahtevami.
- SqlOperator - operater za izvajanje kode SQL.
- Senzor je operater za čakanje na dogodek (prihod zahtevanega časa, pojav zahtevane datoteke, vrstica v bazi podatkov, odziv API-ja itd. itd.).
Obstajajo bolj specifični operaterji: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Operaterje lahko razvijete tudi na podlagi lastnih značilnosti in jih uporabite v svojem projektu. Ustvarili smo na primer MongoDBToHiveViaHdfsTransfer, operater za izvoz dokumentov iz MongoDB v Hive, in več operaterjev za delo z
Nato je treba izvesti vse te primerke nalog, zdaj pa bomo govorili o razporejevalniku.
Razporejevalnik
Razporejevalnik opravil Airflow temelji na
Vsak bazen ima omejitev števila slotov. Ko ustvarite DAG, dobi bazen:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Področje, definirano na ravni DAG, je mogoče preglasiti na ravni opravila.
Ločen proces, Scheduler, je odgovoren za načrtovanje vseh nalog v Airflow. Pravzaprav se Scheduler ukvarja z vsemi mehaniki postavljanja nalog za izvedbo. Naloga gre skozi več stopenj, preden se izvede:
- Prejšnje naloge so bile dokončane v DAG; novo lahko postavite v čakalno vrsto.
- Čakalna vrsta je razvrščena glede na prioriteto opravil (prioritete je mogoče tudi nadzirati) in če je v bazenu prosto mesto, se opravilo lahko sproži.
- Če obstaja prosta delovna zelena, se ji naloga pošlje; se začne delo, ki ste ga programirali v problemu, z uporabo enega ali drugega operatorja.
Dovolj preprosto.
Razporejevalnik deluje na naboru vseh DAG-ov in vseh opravil znotraj DAG-ov.
Da lahko Scheduler začne delovati z DAG, mora DAG nastaviti urnik:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Obstaja nabor že pripravljenih prednastavitev: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Uporabite lahko tudi cron izraze:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Datum izvršitve
Da bi razumeli, kako deluje Airflow, je pomembno razumeti, kaj je datum izvedbe za DAG. V Airflow ima DAG dimenzijo datuma izvedbe, kar pomeni, da se primerki opravil ustvarijo za vsak datum izvedbe, odvisno od urnika dela DAG-a. In za vsak datum izvedbe je mogoče naloge znova izvesti - ali pa lahko na primer DAG deluje hkrati v več datumih izvedbe. To je jasno prikazano tukaj:
Na žalost (ali morda na srečo: odvisno od situacije), če je izvajanje naloge v DAG popravljeno, se bo izvedba v prejšnjem datumu izvedbe nadaljevala ob upoštevanju prilagoditev. To je dobro, če morate podatke v preteklih obdobjih preračunati z novim algoritmom, vendar je slabo, ker se izgubi ponovljivost rezultata (seveda vas nihče ne moti, da vrnete zahtevano različico izvorne kode iz Gita in izračunate, kaj potrebujete enkrat, tako kot potrebujete).
Ustvarjanje nalog
Implementacija DAG je koda v Pythonu, tako da imamo zelo priročen način za zmanjšanje količine kode pri delu, na primer, z razdeljenimi viri. Recimo, da imate kot vir tri delce MySQL, splezati morate v vsakega in pobrati nekaj podatkov. Še več, samostojno in vzporedno. Koda Python v DAG bi lahko izgledala takole:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG izgleda takole:
V tem primeru lahko dodate ali odstranite delček tako, da preprosto prilagodite nastavitve in posodobite DAG. Udobno!
Uporabite lahko tudi bolj zapleteno generiranje kode, na primer delo z viri v obliki baze podatkov ali opisovanje strukture tabele, algoritem za delo s tabelo in ob upoštevanju značilnosti infrastrukture DWH ustvarite proces za nalaganje N tabel v vašo shrambo. Ali pa na primer pri delu z API-jem, ki ne podpira dela s parametrom v obliki seznama, lahko iz tega seznama ustvarite N nalog v DAG-ju, omejite vzporednost zahtev v API-ju na bazen in strgate potrebne podatke iz API-ja. Prilagodljiv!
repozitorij
Airflow ima lastno zaledno skladišče, bazo podatkov (lahko MySQL ali Postgres, mi imamo Postgres), ki shranjuje stanja opravil, DAG, nastavitve povezav, globalne spremenljivke itd., itd. Tukaj bi rad lahko rekel, da Repozitorij v Airflowu je zelo preprost (približno 20 tabel) in priročen, če želite na njem zgraditi katerega od lastnih procesov. Spomnim se 100500 tabel v repozitoriju Informatica, ki jih je bilo treba dolgo preučevati, preden sem razumel, kako sestaviti poizvedbo.
Spremljanje
Glede na preprostost repozitorija lahko zgradite postopek spremljanja opravil, ki vam ustreza. V Zeppelinu uporabljamo beležko, kjer pogledamo stanje nalog:
To bi lahko bil tudi spletni vmesnik samega Airflowa:
Koda Airflow je odprtokodna, zato smo v Telegram dodali opozorila. Vsaka zagnana instanca naloge, če pride do napake, spama skupino v Telegramu, kjer je sestavljena celotna razvojna in podporna ekipa.
Preko Telegrama prejmemo hiter odgovor (po potrebi), preko Zeppelina pa dobimo celotno sliko nalog v Airflowu.
Skupno
Airflow je predvsem odprtokoden in od njega ne smete pričakovati čudežev. Bodite pripravljeni vložiti čas in trud, da ustvarite rešitev, ki deluje. Cilj je dosegljiv, verjemite, vredno je. Hitrost razvoja, prilagodljivost, enostavnost dodajanja novih procesov - všeč vam bo. Seveda morate veliko pozornosti posvetiti organizaciji projekta, stabilnosti samega Airflowa: čudeži se ne zgodijo.
Zdaj Airflow deluje vsak dan približno 6,5 tisoč nalog. Karakterno sta si precej različna. Obstajajo naloge nalaganja podatkov v glavni DWH iz številnih različnih in zelo specifičnih virov, obstajajo naloge izračuna izložb znotraj glavnega DWH, obstajajo naloge objave podatkov v hitrem DWH, obstaja veliko, veliko različnih opravil - in Airflow jih vse žveči dan za dnem. Če govorimo v številkah, je to 2,3 tisoč ELT naloge različnih zahtevnosti znotraj DWH (Hadoop), cca. 2,5 sto baz podatkov virov, to je ekipa iz 4 ETL razvijalci, ki se delijo na ETL obdelavo podatkov v DWH in ELT obdelavo podatkov znotraj DWH in seveda še kaj en admin, ki se ukvarja z infrastrukturo storitve.
Načrti za prihodnost
Število procesov neizogibno narašča in glavna stvar, ki jo bomo počeli v zvezi z infrastrukturo Airflow, je skaliranje. Želimo zgraditi gručo Airflow, dodeliti par nog za delavce Celery in narediti samopodvajajočo glavo s procesi razporejanja opravil in skladiščem.
Epilog
To seveda ni vse, kar bi rad povedal o Airflowu, vendar sem poskušal izpostaviti glavne točke. Apetit pride z jedjo, poskusite in všeč vam bo :)
Vir: www.habr.com