Hello, Habr! F'dan l-artikolu nixtieq nitkellem dwar għodda waħda kbira għall-iżvilupp ta 'proċessi ta' pproċessar ta 'dejta tal-lott, pereżempju, fl-infrastruttura ta' DWH korporattiv jew DataLake tiegħek. Se nitkellmu dwar Apache Airflow (minn hawn 'il quddiem imsejħa Airflow). Huwa mċaħħad b'mod inġust mill-attenzjoni fuq Habré, u fil-parti prinċipali ser nipprova nikkonvinċik li għall-inqas Airflow ta 'min iħares lejh meta tagħżel Scheduler għall-proċessi ETL/ELT tiegħek.
Preċedentement, ktibt sensiela ta’ artikli dwar is-suġġett tad-DWH meta kont naħdem f’Tinkoff Bank. Issa sirt parti mit-tim tal-Grupp Mail.Ru u qed niżviluppa pjattaforma għall-analiżi tad-dejta fil-qasam tal-logħob. Fil-fatt, hekk kif jidhru aħbarijiet u soluzzjonijiet interessanti, jien u t-tim tiegħi se nitkellmu hawn dwar il-pjattaforma tagħna għall-analiżi tad-dejta.
Prologu
Allura, ejja nibdew. X'inhu Airflow? Din hija librerija (jew
Issa ejja nħarsu lejn l-entitajiet ewlenin ta 'Airflow. Billi tifhem l-essenza u l-iskop tagħhom, tista 'torganizza bl-aħjar mod l-arkitettura tal-proċess tiegħek. Forsi l-entità ewlenija hija l-Grafika Aċiklika Diretta (minn hawn 'il quddiem imsejħa DAG).
DAG
DAG hija xi assoċjazzjoni sinifikanti tal-kompiti tiegħek li trid tlesti f'sekwenza strettament definita skont skeda speċifika. Airflow jipprovdi interface tal-web konvenjenti biex taħdem ma' DAGs u entitajiet oħra:
Id-DAG jista' jidher bħal dan:
L-iżviluppatur, meta jfassal DAG, jistabbilixxi sett ta' operaturi li fuqhom se jinbnew il-kompiti fi ħdan id-DAG. Hawnhekk naslu għal entità importanti oħra: Airflow Operator.
Operaturi
Operatur huwa entità li fuq il-bażi tagħha jinħolqu istanzi tax-xogħol, li tiddeskrivi x'se jiġri waqt l-eżekuzzjoni ta' istanza tax-xogħol.
- BashOperator - operatur għall-eżekuzzjoni ta 'kmand bash.
- PythonOperator - operatur għas-sejħa tal-kodiċi Python.
- EmailOperator — operatur biex jibgħat email.
- HTTPOperator - operatur biex jaħdem b'talbiet http.
- SqlOperator - operatur għall-eżekuzzjoni tal-kodiċi SQL.
- Sensor huwa operatur għall-istennija għal avveniment (il-wasla tal-ħin meħtieġ, id-dehra tal-fajl meħtieġ, linja fid-database, rispons mill-API, eċċ., eċċ.).
Hemm operaturi aktar speċifiċi: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Tista 'wkoll tiżviluppa operaturi bbażati fuq il-karatteristiċi tiegħek stess u tużahom fil-proġett tiegħek. Pereżempju, ħloqna MongoDBToHiveViaHdfsTransfer, operatur għall-esportazzjoni ta' dokumenti minn MongoDB għal Hive, u diversi operaturi biex jaħdmu ma'
Sussegwentement, dawn l-istanzi kollha ta 'kompiti jeħtieġ li jiġu eżegwiti, u issa se nitkellmu dwar l-iskedar.
Scheduler
L-iskedar tal-kompitu tal-fluss tal-arja huwa mibni fuq
Kull pool għandu limitu fuq in-numru ta 'slots. Meta toħloq DAG, jingħata ġabra:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Pool definit fil-livell tad-DAG jista' jiġi sostitwit fil-livell tal-kompitu.
Proċess separat, Scheduler, huwa responsabbli għall-iskedar tal-kompiti kollha fl-Airflow. Fil-fatt, Scheduler jittratta l-mekkaniżmi kollha tal-iffissar tal-kompiti għall-eżekuzzjoni. Il-kompitu jgħaddi minn diversi stadji qabel ma jiġi eżegwit:
- Il-kompiti preċedenti tlestew fid-DAG; wieħed ġdid jista' jitqiegħed fil-kju.
- Il-kju jiġi magħżul skont il-prijorità tal-kompiti (il-prijoritajiet jistgħu wkoll jiġu kkontrollati), u jekk ikun hemm slot ħieles fil-pool, il-kompitu jista 'jitħaddem.
- Jekk ikun hemm karfus ħaddiem b'xejn, il-kompitu jintbagħat lilha; jibda x-xogħol li pprogrammajt fil-problema, billi tuża operatur wieħed jew ieħor.
Sempliċi biżżejjed.
Scheduler jaħdem fuq is-sett tad-DAGs kollha u l-kompiti kollha fi ħdan id-DAGs.
Biex Scheduler jibda jaħdem mad-DAG, id-DAG jeħtieġ li jistabbilixxi skeda:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Hemm sett ta 'presets lesti: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Tista 'wkoll tuża espressjonijiet cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Data ta' Esekuzzjoni
Biex tifhem kif taħdem Airflow, huwa importanti li tifhem x'inhi d-Data ta' Eżekuzzjoni għal DAG. Fl-Airflow, DAG għandu dimensjoni tad-Data ta’ Eżekuzzjoni, jiġifieri, skont l-iskeda tax-xogħol tad-DAG, jinħolqu istanzi ta’ kompitu għal kull Data ta’ Esekuzzjoni. U għal kull Data ta 'Eżekuzzjoni, il-kompiti jistgħu jerġgħu jiġu eżegwiti - jew, pereżempju, DAG jista' jaħdem simultanjament f'diversi Dati ta 'Eżekuzzjoni. Dan jidher b'mod ċar hawnhekk:
Sfortunatament (jew forsi fortunatament: jiddependi mis-sitwazzjoni), jekk l-implimentazzjoni tal-kompitu fid-DAG tiġi kkoreġuta, allura l-eżekuzzjoni fid-Data ta 'Eżekuzzjoni preċedenti tipproċedi b'kont meħud tal-aġġustamenti. Dan huwa tajjeb jekk ikollok bżonn tikkalkula mill-ġdid id-dejta f'perjodi tal-passat bl-użu ta 'algoritmu ġdid, iżda huwa ħażin minħabba li r-riproduċibbiltà tar-riżultat tintilef (naturalment, ħadd ma jiddejjaqk biex tirritorna l-verżjoni meħtieġa tal-kodiċi sors minn Git u tikkalkula dak għandek bżonn darba waħda, il-mod kif għandek bżonnha).
Ħidmiet ta 'ġenerazzjoni
L-implimentazzjoni tad-DAG hija kodiċi f'Python, għalhekk għandna mod konvenjenti ħafna biex innaqqsu l-ammont ta 'kodiċi meta naħdmu, pereżempju, b'sorsi sharded. Ejja ngħidu li għandek tliet shards MySQL bħala sors, trid titla 'f'kull waħda u tieħu xi dejta. Barra minn hekk, b'mod indipendenti u b'mod parallel. Il-kodiċi Python fid-DAG jista' jidher bħal dan:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
Id-DAG jidher bħal dan:
F'dan il-każ, tista 'żżid jew tneħħi shard billi sempliċement taġġusta s-settings u taġġorna d-DAG. Komdu!
Tista 'wkoll tuża ġenerazzjoni ta' kodiċi aktar kumplessa, pereżempju, taħdem ma 'sorsi fil-forma ta' database jew tiddeskrivi struttura ta 'tabella, algoritmu biex taħdem ma' tabella, u, b'kont meħud tal-karatteristiċi tal-infrastruttura DWH, tiġġenera proċess. għat-tagħbija N tabelli fil-ħażna tiegħek. Jew, pereżempju, taħdem ma 'API li ma tappoġġjax ix-xogħol b'parametru fil-forma ta' lista, tista 'tiġġenera N kompiti f'DAG minn din il-lista, tillimita l-paralleliżmu ta' talbiet fl-API għal ġabra, u tinbarax id-dejta meħtieġa mill-API. Flessibbli!
repożitorju
Airflow għandha repożitorju backend tagħha stess, database (jista 'jkun MySQL jew Postgres, għandna Postgres), li jaħżen l-istati tal-kompiti, DAGs, settings ta' konnessjoni, varjabbli globali, eċċ, eċċ. Hawnhekk nixtieq nista 'ngħid li l- repożitorju fl-Airflow huwa sempliċi ħafna (madwar 20 tabella) u konvenjenti jekk trid tibni xi proċess tiegħek fuqu. Niftakar il-100500 tabella fir-repożitorju tal-Informatica, li kellhom jiġu studjati għal żmien twil qabel ma jifhmu kif tinbena mistoqsija.
Monitoraġġ
Minħabba s-sempliċità tar-repożitorju, tista 'tibni proċess ta' monitoraġġ tal-kompiti li huwa konvenjenti għalik. Aħna nużaw notepad f'Zeppelin, fejn inħarsu lejn l-istatus tal-kompiti:
Dan jista 'jkun ukoll l-interface tal-web ta' Airflow innifsu:
Il-kodiċi Airflow huwa sors miftuħ, għalhekk żidna twissija ma 'Telegram. Kull istanza ta 'ħidma ta' ħidma, jekk iseħħ żball, spam lill-grupp f'Telegram, fejn jikkonsisti t-tim kollu ta 'żvilupp u appoġġ.
Nirċievu tweġiba fil-pront permezz ta’ Telegram (jekk meħtieġ), u permezz ta’ Zeppelin nirċievu stampa ġenerali tal-kompiti fl-Airflow.
B'kollox
Il-fluss tal-arja huwa primarjament sors miftuħ, u m'għandekx tistenna mirakli minnha. Kun lest li tagħmel il-ħin u l-isforz biex tibni soluzzjoni li taħdem. L-għan jista 'jinkiseb, emminni, worth it. Veloċità ta 'żvilupp, flessibilità, faċilità ta' żieda ta 'proċessi ġodda - int se togħġobha. Naturalment, għandek bżonn tagħti ħafna attenzjoni lill-organizzazzjoni tal-proġett, l-istabbiltà tal-Airflow innifsu: il-mirakli ma jseħħux.
Issa għandna Airflow jaħdem kuljum madwar 6,5 elf kompiti. Huma pjuttost differenti fil-karattru. Hemm kompiti ta 'tagħbija ta' data fid-DWH prinċipali minn ħafna sorsi differenti u speċifiċi ħafna, hemm kompiti ta 'kalkolu ta' storefronts ġewwa d-DWH prinċipali, hemm kompiti ta 'pubblikazzjoni ta' data f'DWH veloċi, hemm ħafna, ħafna kompiti differenti - u Airflow tomgħodhom kollha jum wara jum. Taħdit fin-numri, dan hu 2,3 elf Ħidmiet ELT ta' kumplessità varja fi ħdan DWH (Hadoop), madwar. 2,5 mitt database sorsi, dan huwa tim minn 4 żviluppaturi ETL, li huma maqsuma fi proċessar tad-dejta ETL fid-DWH u ipproċessar tad-dejta ELT ġewwa DWH u naturalment aktar wieħed admin, li jittratta l-infrastruttura tas-servizz.
Pjanijiet għall-futur
In-numru ta 'proċessi qed jikber inevitabbilment, u l-ħaġa prinċipali li se nkunu qed nagħmlu f'termini tal-infrastruttura tal-Airflow hija l-iskala. Irridu nibnu cluster Airflow, nallokaw par saqajn għall-ħaddiema tal-Kfus, u nagħmlu ras li tiduplika lilha nnifisha bi proċessi ta 'skedar tax-xogħol u repożitorju.
Epilogue
Dan, ovvjament, mhuwiex dak kollu li nixtieq ngħid dwar Airflow, imma ppruvajt nenfasizza l-punti ewlenin. L-aptit jiġi mal-ikel, ipprova u togħġobkom :)
Sors: www.habr.com