Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Bonjou, Habr! Nan atik sa a mwen vle pale sou yon gwo zouti pou devlope pwosesis done pakèt, pou egzanp, nan enfrastrikti yon DWH antrepriz oswa DataLake ou. Nou pral pale sou Apache Airflow (ki refere yo kòm Airflow). Li se enjisteman prive de atansyon sou Habré, ak nan pati prensipal la mwen pral eseye konvenk ou ke omwen Airflow vo gade lè w ap chwazi yon orè pou pwosesis ETL / ELT ou yo.

Anvan sa, mwen te ekri yon seri de atik sou sijè DWH lè mwen te travay nan Tinkoff Bank. Koulye a, mwen te vin yon pati nan ekip Mail.Ru Group la epi mwen devlope yon platfòm pou analiz done nan zòn nan jwèt. Aktyèlman, kòm nouvèl ak solisyon enteresan parèt, ekip mwen an ak mwen pral pale isit la sou platfòm nou an pou analiz done.

Prologue

Se konsa, ann kòmanse. Ki sa ki Airflow? Sa a se yon bibliyotèk (oswa seri bibliyotèk) pou devlope, planifye ak kontwole pwosesis travay yo. Karakteristik prensipal Airflow: Kòd Python yo itilize pou dekri (devlope) pwosesis yo. Sa a gen anpil avantaj pou òganize pwojè ou ak devlopman: nan sans, (pa egzanp) pwojè ETL ou a se jis yon pwojè Python, epi ou ka òganize li jan ou vle, pran an kont spesifik yo nan enfrastrikti a, gwosè ekip la ak lòt kondisyon. Instrumentally tout bagay se senp. Sèvi ak pou egzanp PyCharm + Git. Li se bèl bagay ak trè pratik!

Koulye a, kite a gade nan antite prensipal yo nan Airflow. Lè w konprann sans yo ak objektif yo, ou ka byen òganize achitekti pwosesis ou a. Petèt antite prensipal la se graf Acyclic Dirije (ki refere yo kòm DAG).

Dag

Yon DAG se kèk asosyasyon siyifikatif nan travay ou ke ou vle ranpli nan yon sekans entèdi defini dapre yon orè espesifik. Airflow bay yon koòdone entènèt pratik pou travay ak DAG ak lòt antite:

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

DAG a ta ka sanble sa a:

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Devlopè a, lè li ap desine yon DAG, mete yon seri operatè sou ki travay ki nan DAG la pral bati. Isit la nou rive nan yon lòt antite enpòtan: Operatè Airflow.

Operatè

Yon operatè se yon antite ki baze sou ki enstans travay yo kreye, ki dekri sa ki pral rive pandan ekzekisyon an nan yon egzanp travay. Airflow lage soti nan GitHub deja genyen yon seri operatè ki pare pou itilize. Egzanp:

  • BashOperator - operatè pou egzekite yon lòd bash.
  • PythonOperator - operatè pou rele kòd Python.
  • EmailOperator - operatè pou voye imèl.
  • HTTPOperator - operatè pou travay ak demann http.
  • SqlOperator - operatè pou egzekite kòd SQL.
  • Sensor se yon operatè pou tann yon evènman (rive nan tan ki nesesè yo, aparans nan fichye obligatwa a, yon liy nan baz done a, yon repons soti nan API a, elatriye, elatriye).

Gen plis operatè espesifik: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Ou kapab tou devlope operatè ki baze sou karakteristik pwòp ou yo epi sèvi ak yo nan pwojè ou a. Pou egzanp, nou te kreye MongoDBToHiveViaHdfsTransfer, yon operatè pou ekspòte dokiman soti nan MongoDB nan Hive, ak plizyè operatè pou travay avèk Klike sou Kay: CHLoadFromHiveOperator ak CHTableLoaderOperator. Esansyèlman, le pli vit ke yon pwojè te itilize souvan kòd bati sou deklarasyon debaz, ou ka panse sou bati li nan yon nouvo deklarasyon. Sa a pral senplifye plis devlopman, epi ou pral elaji bibliyotèk ou nan operatè nan pwojè a.

Apre sa, tout ka sa yo nan travay bezwen egzekite, epi kounye a nou pral pale sou orè a.

Orè

Se planifikasyon travay Airflow a bati sou Seleri. Seleri se yon bibliyotèk Python ki pèmèt ou òganize yon keu plis asynchrone ak distribye ekzekisyon travay yo. Sou bò Airflow, tout travay yo divize an pisin. Pisin yo kreye manyèlman. Tipikman, objektif yo se limite kantite travay la nan travay ak sous la oswa tipifye travay nan DWH la. Pisin yo ka jere atravè koòdone entènèt la:

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Chak pisin gen yon limit sou kantite fant. Lè w ap kreye yon DAG, yo bay li yon pisin:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Yon pisin ki defini nan nivo DAG yo ka anile nan nivo travay la.
Yon pwosesis separe, Scheduler, responsab pou pwograme tout travay nan Airflow. Aktyèlman, Scheduler kontra ak tout mekanik yo nan mete travay pou ekzekisyon. Travay la ale nan plizyè etap anvan yo egzekite:

  1. Travay anvan yo te konplete nan DAG la; yon nouvo ka mete nan keu.
  2. Nat la klase depann sou priyorite nan travay (priyorite yo kapab tou kontwole), epi si gen yon plas gratis nan pisin lan, travay la ka pran an operasyon.
  3. Si gen yon seleri travayè gratis, travay la voye ba li; travay la ke ou pwograme nan pwoblèm nan kòmanse, lè l sèvi avèk youn oswa yon lòt operatè.

Senp ase.

Planifikatè kouri sou seri tout DAG yo ak tout travay ki nan DAG yo.

Pou Scheduler kòmanse travay ak DAG, DAG la bezwen fikse yon orè:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Gen yon seri prereglaj pare yo: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Ou ka itilize ekspresyon cron tou:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Dat Egzekisyon

Pou konprann kijan Airflow fonksyone, li enpòtan pou w konprann ki Dat Egzekisyon yon DAG. Nan Airflow, DAG gen yon dimansyon Dat Egzekisyon, sa vle di, depann sou orè travay DAG la, yo kreye egzanp travay pou chak Dat Egzekisyon. Ak pou chak Dat Egzekisyon, travay yo ka re-egzekisyon - oswa, pou egzanp, yon DAG ka travay ansanm nan plizyè Dat Egzekisyon. Sa a montre klèman isit la:

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Malerezman (oswa petèt erezman: sa depann de sitiyasyon an), si aplikasyon an nan travay la nan DAG la korije, Lè sa a, ekzekisyon nan dat egzekisyon anvan an ap kontinye pran an kont ajisteman yo. Sa a se yon bon bagay si ou bezwen rekalkile done nan peryòd ki sot pase yo lè l sèvi avèk yon nouvo algorithm, men li nan move paske repwodibilite nan rezilta a pèdi (nan kou, pèsonn pa deranje ou retounen vèsyon ki nesesè nan kòd sous la soti nan Git epi kalkile sa ki). ou bezwen yon sèl fwa, jan ou bezwen li).

Jenerasyon travay

Aplikasyon DAG la se kòd nan Python, kidonk nou gen yon fason trè pratik pou redwi kantite kòd lè w ap travay, pou egzanp, ak sous sharded. Ann di ou gen twa shards MySQL kòm yon sous, ou bezwen monte nan chak youn epi ranmase kèk done. Anplis, poukont yo ak nan paralèl. Kòd Python nan DAG la ta ka sanble sa a:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG la sanble sa a:

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Nan ka sa a, ou ka ajoute oswa retire yon shard pa senpleman ajiste anviwònman yo ak mete ajou DAG la. Konfòtab!

Ou kapab tou itilize jenerasyon kòd ki pi konplèks, pou egzanp, travay ak sous nan fòm yon baz done oswa dekri yon estrikti tab, yon algorithm pou travay ak yon tab, epi, pran an kont karakteristik yo nan enfrastrikti DWH la, jenere yon pwosesis. pou chaje N tab nan depo ou. Oswa, pou egzanp, travay ak yon API ki pa sipòte travay ak yon paramèt nan fòm yon lis, ou ka jenere N travay nan yon DAG nan lis sa a, limite paralèl la nan demann nan API a nan yon pisin, ak grate. done ki nesesè nan API a. Fleksib!

depo

Airflow gen pwòp repozitwa backend li yo, yon baz done (kapab MySQL oswa Postgres, nou gen Postgres), ki estoke eta yo nan travay, DAGs, anviwònman koneksyon, varyab mondyal, elatriye, elatriye. Isit la mwen ta renmen mwen ka di ke la depo nan Airflow se trè senp (apeprè 20 tab) ak pratik si ou vle bati nenpòt nan pwosesis pwòp ou yo sou tèt li. Mwen sonje 100500 tab yo nan depo Informatica a, ki te dwe etidye pou yon tan long anvan konprann ki jan yo bati yon rechèch.

Siveyans

Bay senplisite depo a, ou ka bati yon pwosesis siveyans travay ki bon pou ou. Nou itilize yon notepad nan Zeppelin, kote nou gade estati travay yo:

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Sa a ta ka tou koòdone entènèt la nan Airflow tèt li:

Airflow se yon zouti pou devlope ak kenbe pwosesis done pakèt fasil epi byen vit

Kòd Airflow la se sous louvri, kidonk nou te ajoute alèt nan Telegram. Chak egzanp k ap kouri nan yon travay, si yon erè rive, spam gwoup la nan Telegram, kote tout ekip devlopman ak sipò konsiste.

Nou resevwa yon repons rapid atravè Telegram (si sa nesesè), ak atravè Zeppelin nou resevwa yon foto jeneral nan travay nan Airflow.

Nan total

Airflow se prensipalman sous louvri, epi ou pa ta dwe atann mirak nan men li. Prepare w pou w mete tan ak efò pou w konstwi yon solisyon ki mache. Objektif la se possible, kwè m ', li vo li. Vitès devlopman, fleksibilite, fasilite pou ajoute nouvo pwosesis - ou pral renmen li. Natirèlman, ou bezwen peye anpil atansyon sou òganizasyon an nan pwojè a, estabilite nan Airflow nan tèt li: mirak pa rive.

Koulye a, nou gen Airflow k ap travay chak jou apeprè 6,5 mil travay. Yo trè diferan nan karaktè. Gen travay nan chaje done nan DWH prensipal la ki soti nan anpil sous diferan ak trè espesifik, gen travay nan kalkile devan magazen andedan DWH prensipal la, gen travay nan pibliye done nan yon DWH rapid, gen anpil, anpil travay diferan - ak Airflow. moulen yo tout jou apre jou. Pale an nimewo, sa a se Xnumx mil Travay ELT divès kalite konpleksite nan DWH (Hadoop), apeprè. 2,5 san baz done sous, sa a se yon ekip ki soti nan 4 devlopè ETL, ki divize an pwosesis done ETL nan pwosesis done DWH ak ELT andedan DWH ak nan kou plis yon admin, ki fè fas ak enfrastrikti nan sèvis la.

Plan pou lavni

Nimewo a nan pwosesis ap grandi inevitableman, ak bagay prensipal la nou pral fè an tèm de enfrastrikti Airflow la se dekale. Nou vle bati yon gwoup Airflow, asiyen yon pè nan pye pou travayè seleri, epi fè yon tèt oto-duplike ak pwosesis orè travay ak yon depo.

Épilogue

Sa a, nan kou, se pa tout sa mwen ta renmen di sou Airflow, men mwen te eseye mete aksan sou pwen prensipal yo. Apeti vini ak manje, eseye li epi ou pral renmen li :)

Sous: www.habr.com

Add nouvo kòmantè