Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

Hello, Habr! F'dan l-artikolu nixtieq nitkellem dwar għodda waħda kbira għall-iżvilupp ta 'proċessi ta' pproċessar ta 'dejta tal-lott, pereżempju, fl-infrastruttura ta' DWH korporattiv jew DataLake tiegħek. Se nitkellmu dwar Apache Airflow (minn hawn 'il quddiem imsejħa Airflow). Huwa mċaħħad b'mod inġust mill-attenzjoni fuq Habré, u fil-parti prinċipali ser nipprova nikkonvinċik li għall-inqas Airflow ta 'min iħares lejh meta tagħżel Scheduler għall-proċessi ETL/ELT tiegħek.

Preċedentement, ktibt sensiela ta’ artikli dwar is-suġġett tad-DWH meta kont naħdem f’Tinkoff Bank. Issa sirt parti mit-tim tal-Grupp Mail.Ru u qed niżviluppa pjattaforma għall-analiżi tad-dejta fil-qasam tal-logħob. Fil-fatt, hekk kif jidhru aħbarijiet u soluzzjonijiet interessanti, jien u t-tim tiegħi se nitkellmu hawn dwar il-pjattaforma tagħna għall-analiżi tad-dejta.

Prologu

Allura, ejja nibdew. X'inhu Airflow? Din hija librerija (jew sett ta’ libreriji) biex tiżviluppa, tippjana u timmonitorja l-proċessi tax-xogħol. Il-karatteristika ewlenija tal-Airflow: Il-kodiċi Python jintuża biex jiddeskrivi (jiżviluppa) proċessi. Dan għandu ħafna vantaġġi għall-organizzazzjoni tal-proġett u l-iżvilupp tiegħek: essenzjalment, il-proġett ETL tiegħek (pereżempju) huwa biss proġett Python, u tista 'torganizzah kif tixtieq, filwaqt li tqis l-ispeċifiċitajiet tal-infrastruttura, id-daqs tat-tim u rekwiżiti oħra. Strumentalment kollox huwa sempliċi. Uża pereżempju PyCharm + Git. Huwa isbaħ u konvenjenti ħafna!

Issa ejja nħarsu lejn l-entitajiet ewlenin ta 'Airflow. Billi tifhem l-essenza u l-iskop tagħhom, tista 'torganizza bl-aħjar mod l-arkitettura tal-proċess tiegħek. Forsi l-entità ewlenija hija l-Grafika Aċiklika Diretta (minn hawn 'il quddiem imsejħa DAG).

DAG

DAG hija xi assoċjazzjoni sinifikanti tal-kompiti tiegħek li trid tlesti f'sekwenza strettament definita skont skeda speċifika. Airflow jipprovdi interface tal-web konvenjenti biex taħdem ma' DAGs u entitajiet oħra:

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

Id-DAG jista' jidher bħal dan:

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

L-iżviluppatur, meta jfassal DAG, jistabbilixxi sett ta' operaturi li fuqhom se jinbnew il-kompiti fi ħdan id-DAG. Hawnhekk naslu għal entità importanti oħra: Airflow Operator.

Operaturi

Operatur huwa entità li fuq il-bażi tagħha jinħolqu istanzi tax-xogħol, li tiddeskrivi x'se jiġri waqt l-eżekuzzjoni ta' istanza tax-xogħol. Il-fluss tal-arja joħroġ minn GitHub diġà fihom sett ta' operaturi lesti għall-użu. Eżempji:

  • BashOperator - operatur għall-eżekuzzjoni ta 'kmand bash.
  • PythonOperator - operatur għas-sejħa tal-kodiċi Python.
  • EmailOperator — operatur biex jibgħat email.
  • HTTPOperator - operatur biex jaħdem b'talbiet http.
  • SqlOperator - operatur għall-eżekuzzjoni tal-kodiċi SQL.
  • Sensor huwa operatur għall-istennija għal avveniment (il-wasla tal-ħin meħtieġ, id-dehra tal-fajl meħtieġ, linja fid-database, rispons mill-API, eċċ., eċċ.).

Hemm operaturi aktar speċifiċi: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Tista 'wkoll tiżviluppa operaturi bbażati fuq il-karatteristiċi tiegħek stess u tużahom fil-proġett tiegħek. Pereżempju, ħloqna MongoDBToHiveViaHdfsTransfer, operatur għall-esportazzjoni ta' dokumenti minn MongoDB għal Hive, u diversi operaturi biex jaħdmu ma' IkklikkjaHouse: CHLoadFromHiveOperator u CHTableLoaderOperator. Essenzjalment, hekk kif proġett ikun uża spiss kodiċi mibni fuq dikjarazzjonijiet bażiċi, tista 'taħseb biex tibnih f'dikjarazzjoni ġdida. Dan se jissimplifika aktar żvilupp, u inti se tespandi l-librerija tiegħek ta 'operaturi fil-proġett.

Sussegwentement, dawn l-istanzi kollha ta 'kompiti jeħtieġ li jiġu eżegwiti, u issa se nitkellmu dwar l-iskedar.

Scheduler

L-iskedar tal-kompitu tal-fluss tal-arja huwa mibni fuq Karfus. Il-karfus huwa librerija Python li tippermettilek torganizza kju flimkien ma 'eżekuzzjoni asinkronika u mqassma tal-kompiti. Fuq in-naħa Airflow, il-kompiti kollha huma maqsuma f'pools. Pools huma maħluqa manwalment. Tipikament, l-iskop tagħhom huwa li jillimitaw l-ammont ta 'xogħol ta' ħidma mas-sors jew li jiktipifikaw il-kompiti fi ħdan id-DWH. Il-pools jistgħu jiġu ġestiti permezz tal-interface tal-web:

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

Kull pool għandu limitu fuq in-numru ta 'slots. Meta toħloq DAG, jingħata ġabra:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Pool definit fil-livell tad-DAG jista' jiġi sostitwit fil-livell tal-kompitu.
Proċess separat, Scheduler, huwa responsabbli għall-iskedar tal-kompiti kollha fl-Airflow. Fil-fatt, Scheduler jittratta l-mekkaniżmi kollha tal-iffissar tal-kompiti għall-eżekuzzjoni. Il-kompitu jgħaddi minn diversi stadji qabel ma jiġi eżegwit:

  1. Il-kompiti preċedenti tlestew fid-DAG; wieħed ġdid jista' jitqiegħed fil-kju.
  2. Il-kju jiġi magħżul skont il-prijorità tal-kompiti (il-prijoritajiet jistgħu wkoll jiġu kkontrollati), u jekk ikun hemm slot ħieles fil-pool, il-kompitu jista 'jitħaddem.
  3. Jekk ikun hemm karfus ħaddiem b'xejn, il-kompitu jintbagħat lilha; jibda x-xogħol li pprogrammajt fil-problema, billi tuża operatur wieħed jew ieħor.

Sempliċi biżżejjed.

Scheduler jaħdem fuq is-sett tad-DAGs kollha u l-kompiti kollha fi ħdan id-DAGs.

Biex Scheduler jibda jaħdem mad-DAG, id-DAG jeħtieġ li jistabbilixxi skeda:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Hemm sett ta 'presets lesti: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Tista 'wkoll tuża espressjonijiet cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data ta' Esekuzzjoni

Biex tifhem kif taħdem Airflow, huwa importanti li tifhem x'inhi d-Data ta' Eżekuzzjoni għal DAG. Fl-Airflow, DAG għandu dimensjoni tad-Data ta’ Eżekuzzjoni, jiġifieri, skont l-iskeda tax-xogħol tad-DAG, jinħolqu istanzi ta’ kompitu għal kull Data ta’ Esekuzzjoni. U għal kull Data ta 'Eżekuzzjoni, il-kompiti jistgħu jerġgħu jiġu eżegwiti - jew, pereżempju, DAG jista' jaħdem simultanjament f'diversi Dati ta 'Eżekuzzjoni. Dan jidher b'mod ċar hawnhekk:

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

Sfortunatament (jew forsi fortunatament: jiddependi mis-sitwazzjoni), jekk l-implimentazzjoni tal-kompitu fid-DAG tiġi kkoreġuta, allura l-eżekuzzjoni fid-Data ta 'Eżekuzzjoni preċedenti tipproċedi b'kont meħud tal-aġġustamenti. Dan huwa tajjeb jekk ikollok bżonn tikkalkula mill-ġdid id-dejta f'perjodi tal-passat bl-użu ta 'algoritmu ġdid, iżda huwa ħażin minħabba li r-riproduċibbiltà tar-riżultat tintilef (naturalment, ħadd ma jiddejjaqk biex tirritorna l-verżjoni meħtieġa tal-kodiċi sors minn Git u tikkalkula dak għandek bżonn darba waħda, il-mod kif għandek bżonnha).

Ħidmiet ta 'ġenerazzjoni

L-implimentazzjoni tad-DAG hija kodiċi f'Python, għalhekk għandna mod konvenjenti ħafna biex innaqqsu l-ammont ta 'kodiċi meta naħdmu, pereżempju, b'sorsi sharded. Ejja ngħidu li għandek tliet shards MySQL bħala sors, trid titla 'f'kull waħda u tieħu xi dejta. Barra minn hekk, b'mod indipendenti u b'mod parallel. Il-kodiċi Python fid-DAG jista' jidher bħal dan:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Id-DAG jidher bħal dan:

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

F'dan il-każ, tista 'żżid jew tneħħi shard billi sempliċement taġġusta s-settings u taġġorna d-DAG. Komdu!

Tista 'wkoll tuża ġenerazzjoni ta' kodiċi aktar kumplessa, pereżempju, taħdem ma 'sorsi fil-forma ta' database jew tiddeskrivi struttura ta 'tabella, algoritmu biex taħdem ma' tabella, u, b'kont meħud tal-karatteristiċi tal-infrastruttura DWH, tiġġenera proċess. għat-tagħbija N tabelli fil-ħażna tiegħek. Jew, pereżempju, taħdem ma 'API li ma tappoġġjax ix-xogħol b'parametru fil-forma ta' lista, tista 'tiġġenera N kompiti f'DAG minn din il-lista, tillimita l-paralleliżmu ta' talbiet fl-API għal ġabra, u tinbarax id-dejta meħtieġa mill-API. Flessibbli!

repożitorju

Airflow għandha repożitorju backend tagħha stess, database (jista 'jkun MySQL jew Postgres, għandna Postgres), li jaħżen l-istati tal-kompiti, DAGs, settings ta' konnessjoni, varjabbli globali, eċċ, eċċ. Hawnhekk nixtieq nista 'ngħid li l- repożitorju fl-Airflow huwa sempliċi ħafna (madwar 20 tabella) u konvenjenti jekk trid tibni xi proċess tiegħek fuqu. Niftakar il-100500 tabella fir-repożitorju tal-Informatica, li kellhom jiġu studjati għal żmien twil qabel ma jifhmu kif tinbena mistoqsija.

Monitoraġġ

Minħabba s-sempliċità tar-repożitorju, tista 'tibni proċess ta' monitoraġġ tal-kompiti li huwa konvenjenti għalik. Aħna nużaw notepad f'Zeppelin, fejn inħarsu lejn l-istatus tal-kompiti:

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

Dan jista 'jkun ukoll l-interface tal-web ta' Airflow innifsu:

Airflow hija għodda għall-iżvilupp u ż-żamma ta 'proċessi ta' pproċessar ta 'dejta tal-lott b'mod konvenjenti u malajr

Il-kodiċi Airflow huwa sors miftuħ, għalhekk żidna twissija ma 'Telegram. Kull istanza ta 'ħidma ta' ħidma, jekk iseħħ żball, spam lill-grupp f'Telegram, fejn jikkonsisti t-tim kollu ta 'żvilupp u appoġġ.

Nirċievu tweġiba fil-pront permezz ta’ Telegram (jekk meħtieġ), u permezz ta’ Zeppelin nirċievu stampa ġenerali tal-kompiti fl-Airflow.

B'kollox

Il-fluss tal-arja huwa primarjament sors miftuħ, u m'għandekx tistenna mirakli minnha. Kun lest li tagħmel il-ħin u l-isforz biex tibni soluzzjoni li taħdem. L-għan jista 'jinkiseb, emminni, worth it. Veloċità ta 'żvilupp, flessibilità, faċilità ta' żieda ta 'proċessi ġodda - int se togħġobha. Naturalment, għandek bżonn tagħti ħafna attenzjoni lill-organizzazzjoni tal-proġett, l-istabbiltà tal-Airflow innifsu: il-mirakli ma jseħħux.

Issa għandna Airflow jaħdem kuljum madwar 6,5 elf kompiti. Huma pjuttost differenti fil-karattru. Hemm kompiti ta 'tagħbija ta' data fid-DWH prinċipali minn ħafna sorsi differenti u speċifiċi ħafna, hemm kompiti ta 'kalkolu ta' storefronts ġewwa d-DWH prinċipali, hemm kompiti ta 'pubblikazzjoni ta' data f'DWH veloċi, hemm ħafna, ħafna kompiti differenti - u Airflow tomgħodhom kollha jum wara jum. Taħdit fin-numri, dan hu 2,3 elf Ħidmiet ELT ta' kumplessità varja fi ħdan DWH (Hadoop), madwar. 2,5 mitt database sorsi, dan huwa tim minn 4 żviluppaturi ETL, li huma maqsuma fi proċessar tad-dejta ETL fid-DWH u ipproċessar tad-dejta ELT ġewwa DWH u naturalment aktar wieħed admin, li jittratta l-infrastruttura tas-servizz.

Pjanijiet għall-futur

In-numru ta 'proċessi qed jikber inevitabbilment, u l-ħaġa prinċipali li se nkunu qed nagħmlu f'termini tal-infrastruttura tal-Airflow hija l-iskala. Irridu nibnu cluster Airflow, nallokaw par saqajn għall-ħaddiema tal-Kfus, u nagħmlu ras li tiduplika lilha nnifisha bi proċessi ta 'skedar tax-xogħol u repożitorju.

Epilogue

Dan, ovvjament, mhuwiex dak kollu li nixtieq ngħid dwar Airflow, imma ppruvajt nenfasizza l-punti ewlenin. L-aptit jiġi mal-ikel, ipprova u togħġobkom :)

Sors: www.habr.com

Żid kumment