Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

Sveiki, Habr! Šiame straipsnyje noriu pakalbėti apie vieną puikų įrankį paketinių duomenų apdorojimo procesų kūrimui, pavyzdžiui, įmonės DWH arba jūsų DataLake infrastruktūroje. Kalbėsime apie Apache Airflow (toliau – Airflow). Habré nesąžiningai atimamas dėmesys, o pagrindinėje dalyje pabandysiu jus įtikinti, kad bent jau Airflow verta pasidomėti renkantis planuotoją savo ETL/ELT procesams.

Anksčiau, kai dirbau Tinkoff banke, rašiau straipsnių ciklą DWH tema. Dabar tapau Mail.Ru Group komandos dalimi ir kuriu platformą duomenų analizei žaidimų srityje. Tiesą sakant, pasirodžius naujienoms ir įdomiems sprendimams, mes su komanda čia kalbėsime apie mūsų duomenų analizės platformą.

Prologas

Taigi, pradėkime. Kas yra oro srautas? Tai biblioteka (arba bibliotekų rinkinys) plėtoti, planuoti ir stebėti darbo procesus. Pagrindinė Airflow savybė: Python kodas naudojamas procesams apibūdinti (plėtoti). Tai turi daug privalumų organizuojant jūsų projektą ir plėtrą: iš esmės jūsų (pavyzdžiui) ETL projektas yra tik Python projektas, kurį galite organizuoti taip, kaip norite, atsižvelgdami į infrastruktūros specifiką, komandos dydį ir kiti reikalavimai. Instrumentiniu požiūriu viskas paprasta. Naudokite, pavyzdžiui, PyCharm + Git. Tai nuostabu ir labai patogu!

Dabar pažvelkime į pagrindinius „Airflow“ objektus. Suprasdami jų esmę ir paskirtį, galite optimaliai organizuoti savo proceso architektūrą. Galbūt pagrindinis subjektas yra nukreiptas aciklinis grafikas (toliau – DAG).

DAG

DAG yra tam tikras prasmingas jūsų užduočių, kurias norite atlikti griežtai apibrėžta seka pagal konkretų tvarkaraštį, susiejimas. „Airflow“ suteikia patogią žiniatinklio sąsają darbui su DAG ir kitais objektais:

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

DAG gali atrodyti taip:

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

Kūrėjas, kurdamas DAG, nustato operatorių rinkinį, pagal kurį bus kuriamos DAG užduotys. Čia mes ateiname į kitą svarbų subjektą: oro srauto operatorių.

Operatoriai

Operatorius – tai subjektas, kurio pagrindu kuriami darbo egzemplioriai, aprašantis, kas atsitiks vykdant užduoties egzempliorių. Oro srautas išleidžiamas iš „GitHub“. jau yra paruoštų naudoti operatorių rinkinys. Pavyzdžiai:

  • BashOperator – operatorius, skirtas vykdyti bash komandą.
  • PythonOperator – operatorius, skirtas iškviesti Python kodą.
  • EmailOperator – el. laiškų siuntimo operatorius.
  • HTTPOperator – operatorius, skirtas darbui su http užklausomis.
  • SqlOperator – SQL kodo vykdymo operatorius.
  • Jutiklis yra įvykio laukimo operatorius (reikiamo laiko atvykimas, reikiamo failo atsiradimas, eilutė duomenų bazėje, atsakymas iš API ir pan. ir t.t.).

Yra ir konkretesnių operatorių: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Taip pat galite kurti operatorius pagal savo savybes ir naudoti juos savo projekte. Pavyzdžiui, sukūrėme MongoDBToHiveViaHdfsTransfer – operatorių, skirtą dokumentų eksportavimui iš MongoDB į Hive, ir keletą operatorių, skirtų darbui su „ClickHouse“: CHLoadFromHiveOperator ir CHTableLoaderOperator. Iš esmės, kai tik projekte dažnai naudojamas kodas, pagrįstas pagrindiniais teiginiais, galite galvoti apie tai, kaip jį sudėti į naują teiginį. Tai supaprastins tolesnę plėtrą, o jūs praplėste savo operatorių biblioteką projekte.

Toliau visi šie užduočių atvejai turi būti įvykdyti, o dabar kalbėsime apie planuoklį.

Tvarkaraštis

„Airflow“ užduočių planavimo priemonė yra sukurta Salierai. „Celery“ yra „Python“ biblioteka, leidžianti organizuoti eilę ir asinchroninį bei paskirstytą užduočių vykdymą. Oro srauto pusėje visos užduotys suskirstytos į baseinus. Baseinai kuriami rankiniu būdu. Paprastai jų tikslas yra apriboti darbo su šaltiniu krūvį arba tipizuoti užduotis DWH. Baseinas gali būti valdomas per žiniatinklio sąsają:

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

Kiekviename baseine yra ribotas laiko tarpsnių skaičius. Kuriant DAG, jam suteikiamas baseinas:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG lygiu apibrėžtas telkinys gali būti nepaisomas užduoties lygiu.
Atskiras procesas, planuoklis, yra atsakingas už visų Airflow užduočių planavimą. Tiesą sakant, planuoklis nagrinėja visą vykdymo užduočių nustatymo mechaniką. Prieš įvykdant užduotis praeina kelis etapus:

  1. Ankstesnės užduotys DAG buvo atliktos, galima kelti naują.
  2. Eilė rūšiuojama priklausomai nuo užduočių prioriteto (prioritetai taip pat gali būti valdomi), o jei telkinyje yra laisva vieta, užduotį galima pradėti eksploatuoti.
  3. Jei yra laisvas darbininkas salieras, užduotis siunčiama jam; prasideda darbas, kurį užprogramavote užduotyje, naudojant vieną ar kitą operatorių.

Pakankamai paprasta.

Planuoklis veikia visų DAG rinkinyje ir visomis DAG užduotimis.

Kad planuoklis pradėtų dirbti su DAG, DAG turi nustatyti grafiką:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Yra paruoštų išankstinių nustatymų rinkinys: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Taip pat galite naudoti cron išraiškas:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Vykdymo data

Norint suprasti, kaip veikia „Airflow“, svarbu suprasti, kokia yra DAG vykdymo data. „Airflow“ sistemoje DAG turi vykdymo datos dimensiją, t. y., atsižvelgiant į DAG darbo grafiką, kiekvienai vykdymo datai sukuriami užduočių egzemplioriai. Ir kiekvienai vykdymo datai užduotys gali būti vykdomos iš naujo – arba, pavyzdžiui, DAG gali dirbti vienu metu keliomis vykdymo datomis. Tai aiškiai parodyta čia:

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

Deja (o gal laimei: tai priklauso nuo situacijos), jei užduoties įgyvendinimas DAG bus pataisytas, vykdymas ankstesnėje vykdymo datoje bus tęsiamas atsižvelgiant į patikslinimus. Tai gerai, jei reikia perskaičiuoti praėjusių laikotarpių duomenis naudojant naują algoritmą, bet blogai, nes prarandamas rezultato atkuriamumas (žinoma, niekas netrukdo grąžinti reikiamą šaltinio kodo versiją iš Git ir paskaičiuoti, kas jums reikia vienkartinio, taip, kaip jums reikia).

Užduočių generavimas

DAG įdiegimas yra kodas Python, todėl turime labai patogų būdą sumažinti kodo kiekį dirbant, pavyzdžiui, su susmulkintais šaltiniais. Tarkime, kad turite tris „MySQL“ skeveldras kaip šaltinį, turite įeiti į kiekvieną ir pasiimti tam tikrų duomenų. Be to, savarankiškai ir lygiagrečiai. Python kodas DAG gali atrodyti taip:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG atrodo taip:

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

Tokiu atveju galite pridėti arba pašalinti skeveldrą tiesiog pakoreguodami nustatymus ir atnaujindami DAG. Patogus!

Taip pat galite naudoti sudėtingesnį kodo generavimą, pavyzdžiui, dirbti su šaltiniais duomenų bazės pavidalu arba aprašyti lentelės struktūrą, darbo su lentele algoritmą ir, atsižvelgdami į DWH infrastruktūros ypatybes, generuoti procesą. N lentelių įkėlimui į savo saugyklą. Arba, pavyzdžiui, dirbdami su API, kuri nepalaiko darbo su parametru sąrašo pavidalu, iš šio sąrašo galite sugeneruoti N užduočių DAG, apriboti užklausų lygiagretumą API iki telkinio ir nubraukti reikiamus duomenis iš API. Lankstus!

saugykla

„Airflow“ turi savo backend saugyklą, duomenų bazę (gali būti „MySQL“ arba „Postgres“, mes turime „Postgres“), kurioje saugomos užduočių būsenos, DAG, ryšio parametrai, globalūs kintamieji ir tt ir tt Čia norėčiau pasakyti, kad „Airflow“ saugykla yra labai paprasta (apie 20 lentelių) ir patogi, jei norite sukurti bet kurį iš savo procesų. Prisimenu 100500 XNUMX lentelių Informatica saugykloje, kurias reikėjo ilgai tyrinėti, kol supratau, kaip sukurti užklausą.

Stebėjimas

Atsižvelgiant į saugyklos paprastumą, galite sukurti jums patogų užduočių stebėjimo procesą. Zeppelin sistemoje naudojame užrašų knygelę, kurioje žiūrime į užduočių būseną:

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

Tai taip pat gali būti pačios „Airflow“ žiniatinklio sąsaja:

Airflow – tai įrankis, leidžiantis patogiai ir greitai kurti ir palaikyti paketinio duomenų apdorojimo procesus

„Airflow“ kodas yra atvirojo kodo, todėl „Telegram“ pridėjome įspėjimą. Kiekvienas vykdomas užduoties egzempliorius, jei įvyksta klaida, išsiunčia „Telegram“ grupę, kurioje yra visa kūrimo ir palaikymo komanda.

Greitai atsakome per Telegram (jei reikia), o per Zeppelin gauname bendrą Airflow užduočių vaizdą.

Iš viso

Oro srautas visų pirma yra atvirojo kodo, ir iš jo nereikėtų tikėtis stebuklų. Būkite pasirengę skirti laiko ir pastangų, kad sukurtumėte tinkamą sprendimą. Tikslas pasiekiamas, patikėkite, verta. Kūrimo greitis, lankstumas, naujų procesų įtraukimo paprastumas – jums patiks. Žinoma, daug dėmesio reikia skirti projekto organizavimui, paties „Airflow“ stabilumui: stebuklų nebūna.

Dabar oro srautas dirba kasdien užduočių apie 6,5 tūkst. Jie yra gana skirtingi savo charakteriu. Yra užduotys įkelti duomenis į pagrindinį DWH iš daugybės skirtingų ir labai specifinių šaltinių, yra užduotys skaičiuoti parduotuvių vitrinas pagrindinėje DWH, yra duomenų paskelbimo į greitą DWH užduotys, yra daug, daug skirtingų užduočių - ir Airflow. kramto juos visus diena iš dienos. Kalbant skaičiais, tai yra Xnumx tūkst Įvairaus sudėtingumo ELT užduotys DWH (Hadoop), apytiksliai. 2,5 šimto duomenų bazių šaltinių, tai komanda iš 4 ETL kūrėjai, kurie skirstomi į ETL duomenų apdorojimą DWH ir ELT duomenų apdorojimą DWH viduje ir, žinoma, dar daugiau vienas adminas, kuris užsiima paslaugos infrastruktūra.

Ateities planai

Procesų skaičius neišvengiamai auga, o pagrindinis dalykas, kurį darysime oro srauto infrastruktūros srityje, yra mastelio keitimas. Norime sukurti „Airflow“ klasterį, paskirti porą kojų „Selery“ darbuotojams ir sukurti savaime besidubliuojančią galvutę su darbų planavimo procesais ir saugykla.

Epilogas

Tai, žinoma, dar ne viskas, ką norėčiau papasakoti apie „Airflow“, bet pabandžiau pabrėžti pagrindinius dalykus. Apetitas atsiranda valgant, išbandykite ir patiks :)

Šaltinis: www.habr.com

Добавить комментарий