Nyob zoo, Habr! Hauv tsab xov xwm no kuv xav tham txog ib qho cuab yeej zoo rau kev tsim cov txheej txheem ua cov ntaub ntawv batch, piv txwv li, hauv kev tsim kho vaj tse ntawm tuam txhab DWH lossis koj DataLake. Peb yuav tham txog Apache Airflow (tom qab no hu ua Airflow). Nws yog qhov tsis ncaj ncees ntawm kev saib xyuas ntawm Habré, thiab hauv qhov tseem ceeb kuv yuav sim ua kom koj ntseeg tias tsawg kawg Airflow tsim nyog saib thaum xaiv tus teem sijhawm rau koj cov txheej txheem ETL / ELT.
Yav dhau los, kuv tau sau ib tsab xov xwm hais txog DWH thaum kuv ua haujlwm ntawm Tinkoff Bank. Tam sim no kuv tau dhau los ua ib feem ntawm pab pawg Mail.Ru thiab tab tom tsim lub platform rau kev txheeb xyuas cov ntaub ntawv hauv cheeb tsam kev ua si. Tiag tiag, raws li xov xwm thiab nthuav cov kev daws teeb meem tshwm sim, kuv pab neeg thiab kuv yuav tham ntawm no txog peb lub platform rau cov ntaub ntawv analytics.
Prologue
Yog li, cia peb pib. Airflow yog dab tsi? Qhov no yog lub tsev qiv ntawv (los yog
Tam sim no cia peb saib cov chaw tseem ceeb ntawm Airflow. Los ntawm kev nkag siab txog lawv cov ntsiab lus thiab lub hom phiaj, koj tuaj yeem txhim kho koj cov txheej txheem architecture zoo. Tej zaum qhov chaw tseem ceeb yog Directed Acyclic Graph (tom qab no hu ua DAG).
DAG
DAG yog qee qhov tseem ceeb ntawm koj cov dej num uas koj xav ua kom tiav hauv cov txheej txheem nruj raws li lub sijhawm tshwj xeeb. Airflow muab lub vev xaib yooj yim rau kev ua haujlwm nrog DAGs thiab lwm qhov chaw:
Lub DAG yuav zoo li no:
Tus tsim tawm, thaum tsim lub DAG, nteg ib txheej ntawm cov neeg ua haujlwm ntawm cov haujlwm hauv DAG yuav tsim. Ntawm no peb tuaj rau lwm qhov chaw tseem ceeb: Airflow Operator.
Cov
Tus neeg teb xov tooj yog ib qho chaw nyob ntawm lub hauv paus ntawm cov haujlwm ua haujlwm raug tsim, uas piav qhia txog dab tsi yuav tshwm sim thaum ua haujlwm ua haujlwm.
- BashOperator - tus neeg teb xov tooj rau kev ua tiav cov lus txib bash.
- PythonOperator - tus neeg teb xov tooj hu rau Python code.
- EmailOperator - tus neeg teb xov tooj rau xa email.
- HTTPOperator - tus neeg teb xov tooj rau kev ua haujlwm nrog http thov.
- SqlOperator - tus neeg teb xov tooj rau kev ua tiav SQL code.
- Sensor yog tus neeg teb xov tooj tos rau ib qho kev tshwm sim (qhov tuaj txog ntawm lub sijhawm xav tau, qhov pom ntawm cov ntaub ntawv xav tau, kab hauv cov ntaub ntawv, cov lus teb los ntawm API, thiab lwm yam).
Muaj ntau tus neeg ua haujlwm tshwj xeeb: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Koj tuaj yeem tsim cov neeg ua haujlwm raws li koj tus kheej cov yam ntxwv thiab siv lawv hauv koj qhov project. Piv txwv li, peb tsim MongoDBToHiveViaHdfsTransfer, tus neeg teb xov tooj rau xa cov ntaub ntawv los ntawm MongoDB mus rau Hive, thiab ntau tus neeg ua haujlwm rau kev ua haujlwm nrog
Tom ntej no, tag nrho cov xwm txheej ntawm cov haujlwm no yuav tsum tau ua, thiab tam sim no peb yuav tham txog tus teem sijhawm.
Teem sijhawm
Airflow lub sijhawm ua haujlwm tau ua rau
Txhua lub pas dej muaj qhov txwv ntawm tus lej ntawm qhov qhib. Thaum tsim DAG, nws muab lub pas dej:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Ib lub pas dej tau teev tseg ntawm qib DAG tuaj yeem hla dhau ntawm qib ua haujlwm.
Ib qho txheej txheem cais, Scheduler, yog lub luag haujlwm rau kev teem sijhawm txhua txoj haujlwm hauv Airflow. Qhov tseeb, Scheduler cuam tshuam nrog txhua yam ntawm kev teeb tsa kev ua haujlwm rau kev ua tiav. Txoj hauj lwm mus dhau ntau theem ua ntej ua tiav:
- Cov dej num yav dhau los tau ua tiav hauv DAG; ib qho tshiab tuaj yeem raug teem caij.
- Cov kab yog txheeb raws qhov tseem ceeb ntawm cov dej num (qhov tseem ceeb kuj tuaj yeem tswj tau), thiab yog tias muaj qhov dawb hauv lub pas dej ua ke, txoj haujlwm tuaj yeem ua haujlwm.
- Yog tias muaj cov neeg ua haujlwm dawb celery, txoj haujlwm raug xa mus rau nws; txoj haujlwm uas koj tau teeb tsa hauv qhov teeb meem pib, siv ib lossis lwm tus neeg ua haujlwm.
Yooj yim txaus.
Scheduler khiav ntawm tag nrho DAGs thiab tag nrho cov haujlwm hauv DAGs.
Rau Scheduler pib ua hauj lwm nrog DAG, lub DAG yuav tsum teem ib lub sij hawm:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Muaj cov txheej txheem npaj ua ntej: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Koj tseem tuaj yeem siv cov kab lus cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Hnub Ua Haujlwm
Txhawm rau nkag siab tias Airflow ua haujlwm li cas, nws yog ib qho tseem ceeb kom nkag siab tias Hnub Ua Haujlwm yog dab tsi rau DAG. Hauv Airflow, DAG muaj qhov Execution Date dimension, piv txwv li, nyob ntawm DAG lub sijhawm ua haujlwm, cov haujlwm ua haujlwm raug tsim rau txhua Hnub Ua Haujlwm. Thiab rau txhua Hnub Ua Haujlwm, cov haujlwm tuaj yeem rov ua tiav - lossis, piv txwv li, DAG tuaj yeem ua haujlwm ib txhij hauv ob peb Hnub Ua Haujlwm. Qhov no qhia meej meej ntawm no:
Hmoov tsis zoo (los yog tej zaum muaj hmoo: nws nyob ntawm qhov xwm txheej), yog tias kev ua haujlwm ntawm DAG raug kho, tom qab ntawd kev ua tiav hauv Hnub Ua Haujlwm dhau los yuav ua rau kev hloov kho. Qhov no yog qhov zoo yog tias koj xav rov xam cov ntaub ntawv nyob rau lub sijhawm dhau los siv lub algorithm tshiab, tab sis nws yog qhov phem vim tias qhov kev rov ua dua ntawm qhov tshwm sim tau ploj lawm (tau kawg, tsis muaj leej twg thab koj rov qab qhov yuav tsum tau muaj ntawm qhov chaws los ntawm Git thiab xam dab tsi. koj xav tau ib zaug, txoj kev koj xav tau).
Tsim cov haujlwm
Qhov kev siv ntawm DAG yog code hauv Python, yog li peb muaj txoj hauv kev yooj yim heev los txo tus lej code thaum ua haujlwm, piv txwv li, nrog cov khoom siv sharded. Cia peb hais tias koj muaj peb MySQL shards ua qhov chaw, koj yuav tsum tau nce mus rau txhua tus thiab khaws qee cov ntaub ntawv. Ntxiv mus, ywj siab thiab nyob rau hauv parallel. Python code hauv DAG tej zaum yuav zoo li no:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
Lub DAG zoo li no:
Hauv qhov no, koj tuaj yeem ntxiv lossis tshem tawm cov shard los ntawm kev kho qhov chaw thiab hloov kho DAG. Yooj yim!
Koj tuaj yeem siv ntau txoj cai tsim, piv txwv li, ua haujlwm nrog cov peev txheej hauv daim ntawv ntawm cov ntaub ntawv lossis piav qhia lub rooj qauv, cov txheej txheem rau kev ua haujlwm nrog lub rooj, thiab, suav nrog cov yam ntxwv ntawm DWH infrastructure, tsim cov txheej txheem. rau kev thauj khoom N rooj rau hauv koj qhov chaw cia. Los yog, piv txwv li, ua hauj lwm nrog API uas tsis txhawb kev ua hauj lwm nrog ib tug parameter nyob rau hauv daim ntawv teev npe, koj muaj peev xwm tsim N ua hauj lwm nyob rau hauv ib tug DAG los ntawm cov npe no, txwv tsis pub cov parallelism ntawm kev thov nyob rau hauv API rau ib tug pas dej ua ke, thiab khawb. cov ntaub ntawv tsim nyog los ntawm API. Yooj yim!
chaw cia khoom
Airflow muaj nws tus kheej backend repository, ib tug database (yuav ua tau MySQL los yog Postgres, peb muaj Postgres), uas khaws cov xeev ntawm cov hauj lwm, DAGs, kev twb kev txuas chaw, ntiaj teb no variables, thiab lwm yam. Ntawm no kuv xav hais tias cov repository hauv Airflow yog qhov yooj yim heev (txog 20 lub rooj) thiab yooj yim yog tias koj xav tsim ib qho ntawm koj tus kheej cov txheej txheem rau saum nws. Kuv nco txog 100500 lub rooj hauv Informatica repository, uas yuav tsum tau kawm ntev ua ntej nkag siab yuav ua li cas los tsim cov lus nug.
Saib xyuas
Muab qhov yooj yim ntawm qhov chaw cia khoom, koj tuaj yeem tsim cov txheej txheem saib xyuas haujlwm uas yooj yim rau koj. Peb siv lub notepad hauv Zeppelin, qhov twg peb saib cov xwm txheej ntawm cov haujlwm:
Qhov no tuaj yeem yog qhov web interface ntawm Airflow nws tus kheej:
Lub Airflow code yog qhib qhov chaw, yog li peb tau ntxiv ceeb toom rau Telegram. Txhua qhov kev khiav haujlwm ntawm ib txoj haujlwm, yog tias muaj qhov yuam kev tshwm sim, spams cov pab pawg hauv Telegram, qhov twg tag nrho cov kev txhim kho thiab pab pawg txhawb nqa.
Peb tau txais cov lus teb tam sim ntawd los ntawm Telegram (yog tias xav tau), thiab dhau ntawm Zeppelin peb tau txais daim duab tag nrho ntawm cov haujlwm hauv Airflow.
Tag nrho
Airflow feem ntau yog qhib qhov chaw, thiab koj yuav tsum tsis txhob xav tias muaj txuj ci tseem ceeb los ntawm nws. Npaj kom muab sijhawm thiab siv zog los tsim kom muaj kev daws teeb meem uas ua haujlwm. Lub hom phiaj yog ua tiav, ntseeg kuv, nws tsim nyog. Kev ceev ntawm txoj kev loj hlob, yoog raws, yooj yim ntawm kev ntxiv cov txheej txheem tshiab - koj yuav nyiam nws. Yog lawm, koj yuav tsum tau them nyiaj ntau rau lub koom haum ntawm qhov project, kev ruaj ntseg ntawm Airflow nws tus kheej: txuj ci tseem ceeb tsis tshwm sim.
Tam sim no peb muaj Airflow ua haujlwm txhua hnub txog 6,5 txhiab txoj haujlwm. Lawv txawv heev ntawm tus cwj pwm. Muaj cov dej num ntawm kev thauj cov ntaub ntawv mus rau hauv lub ntsiab DWH los ntawm ntau qhov sib txawv thiab tshwj xeeb heev, muaj cov dej num ntawm kev suav cov khw muag khoom hauv lub ntsiab DWH, muaj cov dej num ntawm kev tshaj tawm cov ntaub ntawv mus rau hauv DWH ceev, muaj ntau, ntau yam haujlwm sib txawv - thiab Airflow chews lawv ib hnub dhau ib hnub. Hais txog tus lej, qhov no yog 2,3 ua ELT cov haujlwm ntawm qhov sib txawv ntawm qhov nyuaj hauv DWH (Hadoop), kwv yees li. 2,5 puas databases qhov chaw, qhov no yog ib pab neeg los ntawm 4 ETL cov neeg tsim khoom, uas tau muab faib ua ETL cov ntaub ntawv ua hauv DWH thiab ELT cov ntaub ntawv ua hauv DWH thiab tau kawg ntxiv ib admin, leej twg deals nrog cov infrastructure ntawm cov kev pab cuam.
Cov tswv yim rau yav tom ntej
Tus naj npawb ntawm cov txheej txheem yog inevitably loj hlob, thiab qhov tseem ceeb uas peb yuav tau ua nyob rau hauv cov nqe lus ntawm Airflow infrastructure yog scaling. Peb xav tsim kom muaj Airflow pawg, faib ob txhais ceg rau cov neeg ua haujlwm Celery, thiab ua tus kheej lub taub hau nrog cov txheej txheem teem caij ua haujlwm thiab chaw cia khoom.
Epilogue
Qhov no, ntawm chav kawm, tsis yog txhua yam uas kuv xav qhia txog Airflow, tab sis kuv sim qhia cov ntsiab lus tseem ceeb. Kev qab los noj mov, sim nws thiab koj yuav nyiam nws :)
Tau qhov twg los: www.hab.com