Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Nyob zoo, Habr! Hauv tsab xov xwm no kuv xav tham txog ib qho cuab yeej zoo rau kev tsim cov txheej txheem ua cov ntaub ntawv batch, piv txwv li, hauv kev tsim kho vaj tse ntawm tuam txhab DWH lossis koj DataLake. Peb yuav tham txog Apache Airflow (tom qab no hu ua Airflow). Nws yog qhov tsis ncaj ncees ntawm kev saib xyuas ntawm Habré, thiab hauv qhov tseem ceeb kuv yuav sim ua kom koj ntseeg tias tsawg kawg Airflow tsim nyog saib thaum xaiv tus teem sijhawm rau koj cov txheej txheem ETL / ELT.

Yav dhau los, kuv tau sau ib tsab xov xwm hais txog DWH thaum kuv ua haujlwm ntawm Tinkoff Bank. Tam sim no kuv tau dhau los ua ib feem ntawm pab pawg Mail.Ru thiab tab tom tsim lub platform rau kev txheeb xyuas cov ntaub ntawv hauv cheeb tsam kev ua si. Tiag tiag, raws li xov xwm thiab nthuav cov kev daws teeb meem tshwm sim, kuv pab neeg thiab kuv yuav tham ntawm no txog peb lub platform rau cov ntaub ntawv analytics.

Prologue

Yog li, cia peb pib. Airflow yog dab tsi? Qhov no yog lub tsev qiv ntawv (los yog teeb tsev qiv ntawv) tsim, npaj thiab saib xyuas cov txheej txheem ua haujlwm. Lub ntsiab feature ntawm Airflow: Python code yog siv los piav qhia (tsim) cov txheej txheem. Qhov no muaj ntau qhov zoo rau kev teeb tsa koj qhov project thiab kev txhim kho: hauv qhov tseem ceeb, koj (piv txwv li) qhov project ETL tsuas yog ib txoj haujlwm Python xwb, thiab koj tuaj yeem npaj nws raws li koj xav tau, suav nrog qhov tshwj xeeb ntawm cov txheej txheem, pab pawg loj thiab lwm yam kev xav tau. Instrumentally txhua yam yog yooj yim. Siv piv txwv li PyCharm + Git. Nws zoo heev thiab yooj yim heev!

Tam sim no cia peb saib cov chaw tseem ceeb ntawm Airflow. Los ntawm kev nkag siab txog lawv cov ntsiab lus thiab lub hom phiaj, koj tuaj yeem txhim kho koj cov txheej txheem architecture zoo. Tej zaum qhov chaw tseem ceeb yog Directed Acyclic Graph (tom qab no hu ua DAG).

DAG

DAG yog qee qhov tseem ceeb ntawm koj cov dej num uas koj xav ua kom tiav hauv cov txheej txheem nruj raws li lub sijhawm tshwj xeeb. Airflow muab lub vev xaib yooj yim rau kev ua haujlwm nrog DAGs thiab lwm qhov chaw:

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Lub DAG yuav zoo li no:

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Tus tsim tawm, thaum tsim lub DAG, nteg ib txheej ntawm cov neeg ua haujlwm ntawm cov haujlwm hauv DAG yuav tsim. Ntawm no peb tuaj rau lwm qhov chaw tseem ceeb: Airflow Operator.

Cov

Tus neeg teb xov tooj yog ib qho chaw nyob ntawm lub hauv paus ntawm cov haujlwm ua haujlwm raug tsim, uas piav qhia txog dab tsi yuav tshwm sim thaum ua haujlwm ua haujlwm. Airflow tso tawm los ntawm GitHub twb muaj ib pawg neeg khiav dej num npaj siv. Piv txwv:

  • BashOperator - tus neeg teb xov tooj rau kev ua tiav cov lus txib bash.
  • PythonOperator - tus neeg teb xov tooj hu rau Python code.
  • EmailOperator - tus neeg teb xov tooj rau xa email.
  • HTTPOperator - tus neeg teb xov tooj rau kev ua haujlwm nrog http thov.
  • SqlOperator - tus neeg teb xov tooj rau kev ua tiav SQL code.
  • Sensor yog tus neeg teb xov tooj tos rau ib qho kev tshwm sim (qhov tuaj txog ntawm lub sijhawm xav tau, qhov pom ntawm cov ntaub ntawv xav tau, kab hauv cov ntaub ntawv, cov lus teb los ntawm API, thiab lwm yam).

Muaj ntau tus neeg ua haujlwm tshwj xeeb: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Koj tuaj yeem tsim cov neeg ua haujlwm raws li koj tus kheej cov yam ntxwv thiab siv lawv hauv koj qhov project. Piv txwv li, peb tsim MongoDBToHiveViaHdfsTransfer, tus neeg teb xov tooj rau xa cov ntaub ntawv los ntawm MongoDB mus rau Hive, thiab ntau tus neeg ua haujlwm rau kev ua haujlwm nrog Nyem Tsev: CHLoadFromHiveOperator thiab CHTableLoaderOperator. Qhov tseem ceeb, sai li sai tau raws li qhov project tau nquag siv cov cai tsim los ntawm cov nqe lus yooj yim, koj tuaj yeem xav txog kev tsim nws rau hauv nqe lus tshiab. Qhov no yuav yooj yim rau kev txhim kho ntxiv, thiab koj yuav nthuav koj lub tsev qiv ntawv ntawm cov neeg ua haujlwm hauv qhov project.

Tom ntej no, tag nrho cov xwm txheej ntawm cov haujlwm no yuav tsum tau ua, thiab tam sim no peb yuav tham txog tus teem sijhawm.

Teem sijhawm

Airflow lub sijhawm ua haujlwm tau ua rau Celery. Celery yog lub tsev qiv ntawv Python uas tso cai rau koj los teeb tsa ib kab ntxiv rau asynchronous thiab faib ua tiav cov haujlwm. Ntawm Airflow sab, txhua txoj haujlwm tau muab faib ua cov pas dej. Pas dej yog tsim manually. Feem ntau, lawv lub hom phiaj yog txwv kev ua haujlwm ntawm kev ua haujlwm nrog lub hauv paus lossis txhawm rau txheeb xyuas cov haujlwm hauv DWH. Cov pas dej tuaj yeem tswj hwm ntawm lub vev xaib interface:

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Txhua lub pas dej muaj qhov txwv ntawm tus lej ntawm qhov qhib. Thaum tsim DAG, nws muab lub pas dej:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Ib lub pas dej tau teev tseg ntawm qib DAG tuaj yeem hla dhau ntawm qib ua haujlwm.
Ib qho txheej txheem cais, Scheduler, yog lub luag haujlwm rau kev teem sijhawm txhua txoj haujlwm hauv Airflow. Qhov tseeb, Scheduler cuam tshuam nrog txhua yam ntawm kev teeb tsa kev ua haujlwm rau kev ua tiav. Txoj hauj lwm mus dhau ntau theem ua ntej ua tiav:

  1. Cov dej num yav dhau los tau ua tiav hauv DAG; ib qho tshiab tuaj yeem raug teem caij.
  2. Cov kab yog txheeb raws qhov tseem ceeb ntawm cov dej num (qhov tseem ceeb kuj tuaj yeem tswj tau), thiab yog tias muaj qhov dawb hauv lub pas dej ua ke, txoj haujlwm tuaj yeem ua haujlwm.
  3. Yog tias muaj cov neeg ua haujlwm dawb celery, txoj haujlwm raug xa mus rau nws; txoj haujlwm uas koj tau teeb tsa hauv qhov teeb meem pib, siv ib lossis lwm tus neeg ua haujlwm.

Yooj yim txaus.

Scheduler khiav ntawm tag nrho DAGs thiab tag nrho cov haujlwm hauv DAGs.

Rau Scheduler pib ua hauj lwm nrog DAG, lub DAG yuav tsum teem ib lub sij hawm:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Muaj cov txheej txheem npaj ua ntej: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Koj tseem tuaj yeem siv cov kab lus cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Hnub Ua Haujlwm

Txhawm rau nkag siab tias Airflow ua haujlwm li cas, nws yog ib qho tseem ceeb kom nkag siab tias Hnub Ua Haujlwm yog dab tsi rau DAG. Hauv Airflow, DAG muaj qhov Execution Date dimension, piv txwv li, nyob ntawm DAG lub sijhawm ua haujlwm, cov haujlwm ua haujlwm raug tsim rau txhua Hnub Ua Haujlwm. Thiab rau txhua Hnub Ua Haujlwm, cov haujlwm tuaj yeem rov ua tiav - lossis, piv txwv li, DAG tuaj yeem ua haujlwm ib txhij hauv ob peb Hnub Ua Haujlwm. Qhov no qhia meej meej ntawm no:

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Hmoov tsis zoo (los yog tej zaum muaj hmoo: nws nyob ntawm qhov xwm txheej), yog tias kev ua haujlwm ntawm DAG raug kho, tom qab ntawd kev ua tiav hauv Hnub Ua Haujlwm dhau los yuav ua rau kev hloov kho. Qhov no yog qhov zoo yog tias koj xav rov xam cov ntaub ntawv nyob rau lub sijhawm dhau los siv lub algorithm tshiab, tab sis nws yog qhov phem vim tias qhov kev rov ua dua ntawm qhov tshwm sim tau ploj lawm (tau kawg, tsis muaj leej twg thab koj rov qab qhov yuav tsum tau muaj ntawm qhov chaws los ntawm Git thiab xam dab tsi. koj xav tau ib zaug, txoj kev koj xav tau).

Tsim cov haujlwm

Qhov kev siv ntawm DAG yog code hauv Python, yog li peb muaj txoj hauv kev yooj yim heev los txo tus lej code thaum ua haujlwm, piv txwv li, nrog cov khoom siv sharded. Cia peb hais tias koj muaj peb MySQL shards ua qhov chaw, koj yuav tsum tau nce mus rau txhua tus thiab khaws qee cov ntaub ntawv. Ntxiv mus, ywj siab thiab nyob rau hauv parallel. Python code hauv DAG tej zaum yuav zoo li no:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Lub DAG zoo li no:

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Hauv qhov no, koj tuaj yeem ntxiv lossis tshem tawm cov shard los ntawm kev kho qhov chaw thiab hloov kho DAG. Yooj yim!

Koj tuaj yeem siv ntau txoj cai tsim, piv txwv li, ua haujlwm nrog cov peev txheej hauv daim ntawv ntawm cov ntaub ntawv lossis piav qhia lub rooj qauv, cov txheej txheem rau kev ua haujlwm nrog lub rooj, thiab, suav nrog cov yam ntxwv ntawm DWH infrastructure, tsim cov txheej txheem. rau kev thauj khoom N rooj rau hauv koj qhov chaw cia. Los yog, piv txwv li, ua hauj lwm nrog API uas tsis txhawb kev ua hauj lwm nrog ib tug parameter nyob rau hauv daim ntawv teev npe, koj muaj peev xwm tsim N ua hauj lwm nyob rau hauv ib tug DAG los ntawm cov npe no, txwv tsis pub cov parallelism ntawm kev thov nyob rau hauv API rau ib tug pas dej ua ke, thiab khawb. cov ntaub ntawv tsim nyog los ntawm API. Yooj yim!

chaw cia khoom

Airflow muaj nws tus kheej backend repository, ib tug database (yuav ua tau MySQL los yog Postgres, peb muaj Postgres), uas khaws cov xeev ntawm cov hauj lwm, DAGs, kev twb kev txuas chaw, ntiaj teb no variables, thiab lwm yam. Ntawm no kuv xav hais tias cov repository hauv Airflow yog qhov yooj yim heev (txog 20 lub rooj) thiab yooj yim yog tias koj xav tsim ib qho ntawm koj tus kheej cov txheej txheem rau saum nws. Kuv nco txog 100500 lub rooj hauv Informatica repository, uas yuav tsum tau kawm ntev ua ntej nkag siab yuav ua li cas los tsim cov lus nug.

Saib xyuas

Muab qhov yooj yim ntawm qhov chaw cia khoom, koj tuaj yeem tsim cov txheej txheem saib xyuas haujlwm uas yooj yim rau koj. Peb siv lub notepad hauv Zeppelin, qhov twg peb saib cov xwm txheej ntawm cov haujlwm:

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Qhov no tuaj yeem yog qhov web interface ntawm Airflow nws tus kheej:

Airflow yog ib qho cuab yeej rau kev yooj yim thiab sai tsim thiab tswj cov txheej txheem ua cov ntaub ntawv batch

Lub Airflow code yog qhib qhov chaw, yog li peb tau ntxiv ceeb toom rau Telegram. Txhua qhov kev khiav haujlwm ntawm ib txoj haujlwm, yog tias muaj qhov yuam kev tshwm sim, spams cov pab pawg hauv Telegram, qhov twg tag nrho cov kev txhim kho thiab pab pawg txhawb nqa.

Peb tau txais cov lus teb tam sim ntawd los ntawm Telegram (yog tias xav tau), thiab dhau ntawm Zeppelin peb tau txais daim duab tag nrho ntawm cov haujlwm hauv Airflow.

Tag nrho

Airflow feem ntau yog qhib qhov chaw, thiab koj yuav tsum tsis txhob xav tias muaj txuj ci tseem ceeb los ntawm nws. Npaj kom muab sijhawm thiab siv zog los tsim kom muaj kev daws teeb meem uas ua haujlwm. Lub hom phiaj yog ua tiav, ntseeg kuv, nws tsim nyog. Kev ceev ntawm txoj kev loj hlob, yoog raws, yooj yim ntawm kev ntxiv cov txheej txheem tshiab - koj yuav nyiam nws. Yog lawm, koj yuav tsum tau them nyiaj ntau rau lub koom haum ntawm qhov project, kev ruaj ntseg ntawm Airflow nws tus kheej: txuj ci tseem ceeb tsis tshwm sim.

Tam sim no peb muaj Airflow ua haujlwm txhua hnub txog 6,5 txhiab txoj haujlwm. Lawv txawv heev ntawm tus cwj pwm. Muaj cov dej num ntawm kev thauj cov ntaub ntawv mus rau hauv lub ntsiab DWH los ntawm ntau qhov sib txawv thiab tshwj xeeb heev, muaj cov dej num ntawm kev suav cov khw muag khoom hauv lub ntsiab DWH, muaj cov dej num ntawm kev tshaj tawm cov ntaub ntawv mus rau hauv DWH ceev, muaj ntau, ntau yam haujlwm sib txawv - thiab Airflow chews lawv ib hnub dhau ib hnub. Hais txog tus lej, qhov no yog 2,3 ua ELT cov haujlwm ntawm qhov sib txawv ntawm qhov nyuaj hauv DWH (Hadoop), kwv yees li. 2,5 puas databases qhov chaw, qhov no yog ib pab neeg los ntawm 4 ETL cov neeg tsim khoom, uas tau muab faib ua ETL cov ntaub ntawv ua hauv DWH thiab ELT cov ntaub ntawv ua hauv DWH thiab tau kawg ntxiv ib admin, leej twg deals nrog cov infrastructure ntawm cov kev pab cuam.

Cov tswv yim rau yav tom ntej

Tus naj npawb ntawm cov txheej txheem yog inevitably loj hlob, thiab qhov tseem ceeb uas peb yuav tau ua nyob rau hauv cov nqe lus ntawm Airflow infrastructure yog scaling. Peb xav tsim kom muaj Airflow pawg, faib ob txhais ceg rau cov neeg ua haujlwm Celery, thiab ua tus kheej lub taub hau nrog cov txheej txheem teem caij ua haujlwm thiab chaw cia khoom.

Epilogue

Qhov no, ntawm chav kawm, tsis yog txhua yam uas kuv xav qhia txog Airflow, tab sis kuv sim qhia cov ntsiab lus tseem ceeb. Kev qab los noj mov, sim nws thiab koj yuav nyiam nws :)

Tau qhov twg los: www.hab.com

Ntxiv ib saib