Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

Pozdravljeni, Habr! V tem članku želim govoriti o enem odličnem orodju za razvoj procesov paketne obdelave podatkov, na primer v infrastrukturi DWH podjetja ali vašega DataLake. Govorili bomo o Apache Airflow (v nadaljevanju Airflow). Na Habréju je neupravičeno prikrajšan za pozornost in v osrednjem delu vas bom skušal prepričati, da se vsaj Airflow splača pogledati, ko izbirate planer za vaše ETL/ELT procese.

Prej sem napisal vrsto člankov na temo DWH, ko sem delal v banki Tinkoff. Zdaj sem postal del skupine Mail.Ru Group in razvijam platformo za analizo podatkov na področju iger. Pravzaprav, ko se bodo pojavile novice in zanimive rešitve, bomo tukaj z mojo ekipo govorili o naši platformi za analizo podatkov.

Prolog

Torej, začnimo. Kaj je Airflow? To je knjižnica (oz nabor knjižnic) razvijati, načrtovati in spremljati delovne procese. Glavna značilnost Airflow: koda Python se uporablja za opis (razvoj) procesov. To ima veliko prednosti za organizacijo vašega projekta in razvoja: v bistvu je vaš (na primer) ETL projekt samo projekt Python in ga lahko organizirate po želji, pri čemer upoštevate posebnosti infrastrukture, velikosti ekipe in druge zahteve. Instrumentalno je vse preprosto. Uporabite na primer PyCharm + Git. Čudovito je in zelo priročno!

Zdaj pa si poglejmo glavne entitete Airflow. Z razumevanjem njihovega bistva in namena lahko optimalno organizirate svojo procesno arhitekturo. Morda je glavna entiteta usmerjeni aciklični graf (v nadaljevanju DAG).

DAG

DAG je neka smiselna povezava vaših nalog, ki jih želite dokončati v strogo določenem zaporedju v skladu z določenim urnikom. Airflow ponuja priročen spletni vmesnik za delo z skupinami DAG in drugimi entitetami:

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

DAG bi lahko izgledal takole:

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

Razvijalec pri oblikovanju DAG-a določi niz operaterjev, na podlagi katerih bodo zgrajene naloge znotraj DAG-a. Tu pridemo do še enega pomembnega subjekta: operaterja pretoka zraka.

Operaterji

Operater je entiteta, na podlagi katere se kreirajo instance opravila, ki opisuje, kaj se bo dogajalo med izvajanjem primerka opravila. Izdaje Airflow iz GitHub že vsebujejo nabor operatorjev, pripravljenih za uporabo. Primeri:

  • BashOperator - operater za izvajanje ukaza bash.
  • PythonOperator - operater za klic kode Python.
  • EmailOperator — operater za pošiljanje elektronske pošte.
  • HTTPOperator - operater za delo s http zahtevami.
  • SqlOperator - operater za izvajanje kode SQL.
  • Senzor je operater za čakanje na dogodek (prihod zahtevanega časa, pojav zahtevane datoteke, vrstica v bazi podatkov, odziv API-ja itd. itd.).

Obstajajo bolj specifični operaterji: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Operaterje lahko razvijete tudi na podlagi lastnih značilnosti in jih uporabite v svojem projektu. Ustvarili smo na primer MongoDBToHiveViaHdfsTransfer, operater za izvoz dokumentov iz MongoDB v Hive, in več operaterjev za delo z KlikniteHouse: CHLoadFromHiveOperator in CHTableLoaderOperator. V bistvu, takoj ko je projekt pogosto uporabljal kodo, zgrajeno na osnovnih stavkih, lahko razmislite o tem, da bi jo zgradili v nov stavek. To bo poenostavilo nadaljnji razvoj, vi pa boste razširili svojo knjižnico operaterjev v projektu.

Nato je treba izvesti vse te primerke nalog, zdaj pa bomo govorili o razporejevalniku.

Razporejevalnik

Razporejevalnik opravil Airflow temelji na Zelena. Celery je knjižnica Python, ki vam omogoča organiziranje čakalne vrste ter asinhrono in porazdeljeno izvajanje nalog. Na strani Airflow so vse naloge razdeljene v skupine. Bazeni so ustvarjeni ročno. Običajno je njihov namen omejiti delovno obremenitev dela z virom ali tipizirati naloge znotraj DWH. Bazene lahko upravljate preko spletnega vmesnika:

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

Vsak bazen ima omejitev števila slotov. Ko ustvarite DAG, dobi bazen:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Področje, definirano na ravni DAG, je mogoče preglasiti na ravni opravila.
Ločen proces, Scheduler, je odgovoren za načrtovanje vseh nalog v Airflow. Pravzaprav se Scheduler ukvarja z vsemi mehaniki postavljanja nalog za izvedbo. Naloga gre skozi več stopenj, preden se izvede:

  1. Prejšnje naloge so bile dokončane v DAG; novo lahko postavite v čakalno vrsto.
  2. Čakalna vrsta je razvrščena glede na prioriteto opravil (prioritete je mogoče tudi nadzirati) in če je v bazenu prosto mesto, se opravilo lahko sproži.
  3. Če obstaja prosta delovna zelena, se ji naloga pošlje; se začne delo, ki ste ga programirali v problemu, z uporabo enega ali drugega operatorja.

Dovolj preprosto.

Razporejevalnik deluje na naboru vseh DAG-ov in vseh opravil znotraj DAG-ov.

Da lahko Scheduler začne delovati z DAG, mora DAG nastaviti urnik:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Obstaja nabor že pripravljenih prednastavitev: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Uporabite lahko tudi cron izraze:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Datum izvršitve

Da bi razumeli, kako deluje Airflow, je pomembno razumeti, kaj je datum izvedbe za DAG. V Airflow ima DAG dimenzijo datuma izvedbe, kar pomeni, da se primerki opravil ustvarijo za vsak datum izvedbe, odvisno od urnika dela DAG-a. In za vsak datum izvedbe je mogoče naloge znova izvesti - ali pa lahko na primer DAG deluje hkrati v več datumih izvedbe. To je jasno prikazano tukaj:

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

Na žalost (ali morda na srečo: odvisno od situacije), če je izvajanje naloge v DAG popravljeno, se bo izvedba v prejšnjem datumu izvedbe nadaljevala ob upoštevanju prilagoditev. To je dobro, če morate podatke v preteklih obdobjih preračunati z novim algoritmom, vendar je slabo, ker se izgubi ponovljivost rezultata (seveda vas nihče ne moti, da vrnete zahtevano različico izvorne kode iz Gita in izračunate, kaj potrebujete enkrat, tako kot potrebujete).

Ustvarjanje nalog

Implementacija DAG je koda v Pythonu, tako da imamo zelo priročen način za zmanjšanje količine kode pri delu, na primer, z razdeljenimi viri. Recimo, da imate kot vir tri delce MySQL, splezati morate v vsakega in pobrati nekaj podatkov. Še več, samostojno in vzporedno. Koda Python v DAG bi lahko izgledala takole:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG izgleda takole:

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

V tem primeru lahko dodate ali odstranite delček tako, da preprosto prilagodite nastavitve in posodobite DAG. Udobno!

Uporabite lahko tudi bolj zapleteno generiranje kode, na primer delo z viri v obliki baze podatkov ali opisovanje strukture tabele, algoritem za delo s tabelo in ob upoštevanju značilnosti infrastrukture DWH ustvarite proces za nalaganje N tabel v vašo shrambo. Ali pa na primer pri delu z API-jem, ki ne podpira dela s parametrom v obliki seznama, lahko iz tega seznama ustvarite N nalog v DAG-ju, omejite vzporednost zahtev v API-ju na bazen in strgate potrebne podatke iz API-ja. Prilagodljiv!

repozitorij

Airflow ima lastno zaledno skladišče, bazo podatkov (lahko MySQL ali Postgres, mi imamo Postgres), ki shranjuje stanja opravil, DAG, nastavitve povezav, globalne spremenljivke itd., itd. Tukaj bi rad lahko rekel, da Repozitorij v Airflowu je zelo preprost (približno 20 tabel) in priročen, če želite na njem zgraditi katerega od lastnih procesov. Spomnim se 100500 tabel v repozitoriju Informatica, ki jih je bilo treba dolgo preučevati, preden sem razumel, kako sestaviti poizvedbo.

Spremljanje

Glede na preprostost repozitorija lahko zgradite postopek spremljanja opravil, ki vam ustreza. V Zeppelinu uporabljamo beležko, kjer pogledamo stanje nalog:

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

To bi lahko bil tudi spletni vmesnik samega Airflowa:

Airflow je orodje za udoben in hiter razvoj in vzdrževanje procesov paketne obdelave podatkov

Koda Airflow je odprtokodna, zato smo v Telegram dodali opozorila. Vsaka zagnana instanca naloge, če pride do napake, spama skupino v Telegramu, kjer je sestavljena celotna razvojna in podporna ekipa.

Preko Telegrama prejmemo hiter odgovor (po potrebi), preko Zeppelina pa dobimo celotno sliko nalog v Airflowu.

Skupno

Airflow je predvsem odprtokoden in od njega ne smete pričakovati čudežev. Bodite pripravljeni vložiti čas in trud, da ustvarite rešitev, ki deluje. Cilj je dosegljiv, verjemite, vredno je. Hitrost razvoja, prilagodljivost, enostavnost dodajanja novih procesov - všeč vam bo. Seveda morate veliko pozornosti posvetiti organizaciji projekta, stabilnosti samega Airflowa: čudeži se ne zgodijo.

Zdaj Airflow deluje vsak dan približno 6,5 tisoč nalog. Karakterno sta si precej različna. Obstajajo naloge nalaganja podatkov v glavni DWH iz številnih različnih in zelo specifičnih virov, obstajajo naloge izračuna izložb znotraj glavnega DWH, obstajajo naloge objave podatkov v hitrem DWH, obstaja veliko, veliko različnih opravil - in Airflow jih vse žveči dan za dnem. Če govorimo v številkah, je to 2,3 tisoč ELT naloge različnih zahtevnosti znotraj DWH (Hadoop), cca. 2,5 sto baz podatkov virov, to je ekipa iz 4 ETL razvijalci, ki se delijo na ETL obdelavo podatkov v DWH in ELT obdelavo podatkov znotraj DWH in seveda še kaj en admin, ki se ukvarja z infrastrukturo storitve.

Načrti za prihodnost

Število procesov neizogibno narašča in glavna stvar, ki jo bomo počeli v zvezi z infrastrukturo Airflow, je skaliranje. Želimo zgraditi gručo Airflow, dodeliti par nog za delavce Celery in narediti samopodvajajočo glavo s procesi razporejanja opravil in skladiščem.

Epilog

To seveda ni vse, kar bi rad povedal o Airflowu, vendar sem poskušal izpostaviti glavne točke. Apetit pride z jedjo, poskusite in všeč vam bo :)

Vir: www.habr.com

Dodaj komentar