Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Mhoro, Habr! Muchinyorwa chino ndinoda kutaura nezve chishandiso chimwe chikuru chekugadzira batch data process process, semuenzaniso, mune zvivakwa zvekambani DWH kana yako DataLake. Tichataura nezve Apache Airflow (inozonzi Airflow). Izvo zvisina kufanira kunyimwa kutariswa paHabré, uye muchikamu chikuru ini ndichaedza kukugonesa kuti zvirinani Airflow inofanirwa kutarisa kana uchisarudza scheduler yako ETL/ELT maitiro.

Kare, ndakanyora nhevedzano yezvinyorwa zvine musoro weDWH pandakashanda kuTinkoff Bank. Iye zvino ndave chikamu cheboka reMail.Ru Group uye ndiri kugadzira chikuva chekuongorora data munzvimbo yekutamba. Chaizvoizvo, sezvo nhau uye dzinonakidza mhinduro dzinoonekwa, ini nechikwata changu tichataura pano nezve yedu chikuva che data analytics.

Nhungamidzo

Saka, ngatitange. Chii chinonzi Airflow? Iyi iraibhurari (kana seti yemaraibhurari) kukudziridza, kuronga uye kutarisa maitiro ebasa. Chinhu chikuru cheAirflow: Python kodhi inoshandiswa kutsanangura (kuvandudza) maitiro. Izvi zvine akawanda mabhenefiti ekuronga purojekiti yako nekusimudzira: muchidimbu, yako (semuenzaniso) ETL purojekiti inongova chirongwa chePython, uye unogona kuironga sezvaunoda, uchifunga nezve izvo zvezvivakwa, saizi yechikwata uye. zvimwe zvinodiwa. Nezviridzwa zvese zviri nyore. Shandisa semuenzaniso PyCharm + Git. Zvinoshamisa uye zviri nyore kwazvo!

Zvino ngatitarisei kune makuru masangano eAirflow. Nekunzwisisa hunhu hwavo uye chinangwa, iwe unogona kunyatso kurongeka maitiro ako ekuvaka. Zvichida chikamu chikuru iDirected Acyclic Graph (inozonzi DAG).

Dag

A DAG ishamwari ine musoro yemabasa ako aunoda kupedzisa mukutevedzana kwakanyatso kutsanangurwa maererano nehurongwa hwakati. Airflow inopa yakanakira webhu interface yekushanda nemaDAG uye mamwe masangano:

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Iyo DAG inogona kutaridzika seizvi:

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Mugadziri, paanenge achigadzira DAG, anoisa pasi seti yevashandisi pachavakirwa mabasa mukati meDAG. Pano tinouya kune chimwe chinhu chakakosha: Airflow Operator.

Operators

Mushandi inhengo pahwaro hwekuti zviitiko zvebasa zvinogadzirwa sei, izvo zvinotsanangura zvichaitika panguva yekuitwa kwechiitiko chebasa. Airflow inoburitswa kubva kuGitHub yatova neseti yevashandi vakagadzirira kushandisa. Mienzaniso:

  • BashOperator - mushandisi wekuita bash command.
  • PythonOperator - mushandisi wekufonera Python kodhi.
  • EmailOperator - mushandisi wekutumira email.
  • HTTPOperator - opareta yekushanda ne http zvikumbiro.
  • SqlOperator - mushandisi wekuita SQL kodhi.
  • Sensor inyanzvi yekumirira chiitiko (kusvika kwenguva inodiwa, kutaridzika kwefaira inodiwa, mutsara mudhatabhesi, mhinduro kubva kuAPI, nezvimwewo, nezvimwewo).

Kune mamwe chaiwo anoshanda: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Iwe unogona zvakare kuvandudza vashandisi zvinoenderana neako maitiro uye wovashandisa mupurojekiti yako. Semuenzaniso, takagadzira MongoDBToHiveViaHdfsTransfer, opareta wekutumira kunze magwaro kubva kuMongoDB kuenda kuHive, uye akati wandei anoshanda nawo. DzvanyaImba: CLoadFromHiveOperator uye CHTableLoaderOperator. Chaizvoizvo, kana purojekiti yagara ichishandisa kodhi yakavakirwa pazvirevo zvekutanga, unogona kufunga nezve kuivaka kuita chirevo chitsva. Izvi zvinorerutsa imwe budiriro, uye iwe uchawedzera raibhurari yako yevashandisi mupurojekiti.

Zvadaro, zviitiko zvose izvi zvemabasa zvinoda kuitwa, uye zvino tichataura pamusoro pemugadziri.

scheduler

Airflow's task scheduler inovakwa pairi Celery. Celery iraibhurari yePython iyo inokutendera kuti uronge mutsara pamwe neasynchronous uye kugoverwa kwekuita kwemabasa. Padivi reAirflow, mabasa ese akakamurwa kuita madziva. Madziva anogadzirwa nemaoko. Kazhinji, chinangwa chavo ndechekudzikamisa basa rekushanda nekwakabva kana kutaipa mabasa ari mukati meDWH. Madziva anogona kutarisirwa kuburikidza newebhu interface:

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Dziva rega rega rine muganhu pahuwandu hwema slots. Paunenge uchigadzira DAG, inopiwa dziva:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Dziva rinotsanangurwa padanho reDAG rinogona kudhindwa padanho rebasa.
Maitiro akaparadzana, Scheduler, ane basa rekuronga ese mabasa muAirflow. Chaizvoizvo, Scheduler inobata nemakanika ese ekuseta mabasa ekuita. Basa racho rinopfuura nematanho akati wandei risati raitwa:

  1. Iwo ekare mabasa akapedzwa muDAG; nyowani inogona kuiswa mumutsara.
  2. Mutsara wakarongedzwa zvichienderana nekukosha kwemabasa (zvakakosha zvinogonawo kudzorwa), uye kana paine mahara slot mudziva, basa rinogona kutorwa kushanda.
  3. Kana paine mushandi akasununguka celery, basa racho rinotumirwa kwariri; basa rawakaronga mudambudziko rinotanga, uchishandisa mumwe kana mumwe mushandisi.

Nyore zvakakwana.

Scheduler inomhanya pane seti yeese maDAG uye ese mabasa mukati meDAGs.

Kuti Scheduler atange kushanda neDAG, iyo DAG inoda kuseta chirongwa:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Pane seti yezvakagadzirirwa-yakagadzirwa presets: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Iwe unogona zvakare kushandisa cron mazwi:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Eecution Date

Kuti unzwisise kuti Airflow inoshanda sei, zvakakosha kuti unzwisise kuti Zuva Rekuita ndeyei reDAG. MuAirflow, DAG ine Dimension Date Dimension, kureva, zvichienderana nehurongwa hwebasa reDAG, zviitiko zvebasa zvinogadzirwa kune rimwe nerimwe Zuva Rekuita. Uye kune rimwe nerimwe Zuva reKuurayiwa, mabasa anogona kuitwa zvakare - kana, semuenzaniso, DAG inogona kushanda panguva imwe chete muMazuva akati wandei. Izvi zvinonyatsoratidzwa pano:

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Nehurombo (kana pamwe nerombo rakanaka: zvinoenderana nemamiriro ezvinhu), kana kuitwa kweiyo basa muDAG kwakagadziriswa, ipapo kuurayiwa muZuva Rokuurayiwa kwapfuura kuchaenderera mberi uchifunga nezvekugadzirisa. Izvi zvakanaka kana iwe uchida kuverengera zvakare data munguva dzakapfuura uchishandisa algorithm nyowani, asi zvakaipa nekuti kuberekazve kwemhedzisiro kwakarasika (zvechokwadi, hapana anokunetsa kuti udzorere iyo inodiwa vhezheni yekodhi kodhi kubva kuGit uye kuverenga kuti chii. unoda imwe nguva, nenzira yaunoida).

Kugadzira mabasa

Kuitwa kweDAG kodhi muPython, saka isu tine nzira iri nyore yekudzikisa huwandu hwekodhi kana uchishanda, semuenzaniso, ine sharded masosi. Ngatiti iwe une matatu MySQL shards sesosi, iwe unofanirwa kukwira mune yega yega uye kutora imwe data. Uyezve, zvakasununguka uye zvakafanana. Iyo Python kodhi muDAG inogona kutaridzika seizvi:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Iyo DAG inoita seizvi:

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Muchiitiko ichi, unogona kuwedzera kana kubvisa shard nekungogadzirisa marongero uye nekuvandudza iyo DAG. Comfortable!

Iwe unogona zvakare kushandisa zvakanyanya kuomarara kugadzirwa kwekodhi, semuenzaniso, shanda nezviwanikwa muchimiro chedhatabhesi kana kutsanangura chimiro chetafura, algorithm yekushanda netafura, uye, uchifunga nezve maficha eDWH, gadzira maitiro. yekurodha N matafura mudura rako. Kana, semuenzaniso, kushanda neAPI isingatsigire kushanda neparameter muchimiro cherondedzero, unogona kugadzira N mabasa muDAG kubva pane iyi runyorwa, kudzikisira kufanana kwezvikumbiro muAPI kune dziva uye kukwenya iyo data inodiwa kubva kuAPI. Flexible!

repository

Airflow ine yayo backend repository, dhatabhesi (inogona kuva MySQL kana Postgres, isu tine Postgres), iyo inochengeta nyika dzemabasa, maDAGs, marongero ekubatanidza, global variables, etc., etc. Pano ndingada kuti nditaure kuti iyo repository muAirflow iri nyore kwazvo (anenge makumi maviri matafura) uye zviri nyore kana iwe uchida kuvaka chero yako maitiro pamusoro payo. Ini ndinorangarira 20 matafura muInformatica repository, iyo yaifanira kudzidzwa kwenguva refu isati yanzwisisa nzira yekuvaka mubvunzo.

Kuongorora

Tichifunga nezvekureruka kweiyo repository, unogona kuvaka basa rekutarisa maitiro rakakunakira iwe. Isu tinoshandisa notepad muZeppelin, patinotarisa mamiriro emabasa:

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Iyi inogona zvakare kuve iyo yewebhu interface ye Airflow pachayo:

Airflow chishandiso chiri nyore uye nekukurumidza kugadzira uye kuchengetedza batch data kugadzirisa maitiro

Iyo Airflow kodhi yakavhurika sosi, saka isu tawedzera yambiro kuTeregiramu. Imwe neimwe inomhanya chiitiko chebasa, kana chikanganiso chikaitika, spams boka muTeregiramu, uko kune yese yekuvandudza uye timu yekutsigira.

Isu tinogashira mhinduro nekukurumidza kuburikidza neTeregiramu (kana zvichidikanwa), uye kuburikidza neZeppelin tinogashira mufananidzo wakazara wemabasa muAirflow.

Total

Airflow inonyanya kuvhurika sosi, uye haufanirwe kutarisira zvishamiso kubva kwairi. Iva wakagadzirira kuisa munguva uye simba rekugadzira mhinduro inoshanda. Chinangwa chinokwanisika, nditende, zvakakosha. Kumhanyisa kwekusimudzira, kuchinjika, kusununguka kwekuwedzera maitiro matsva - iwe uchazvida. Zvechokwadi, iwe unoda kubhadhara zvakanyanya kurongeka kweprojekti, kugadzikana kweAirflow pachayo: zvishamiso hazviitike.

Iye zvino tine Airflow inoshanda zuva nezuva anenge 6,5 zviuru mabasa. Vakasiyana chaizvo paunhu. Pane mabasa ekurodha data muDWH huru kubva kune akawanda akasiyana uye chaiwo masosi, kune mabasa ekuverenga matura ezvitoro mukati meDWH huru, pane mabasa ekuburitsa data muDWH inokurumidza, kune akawanda, akawanda akasiyana mabasa - uye Airflow. anodzitsenga dzose zuva nezuva. Kutaura munhamba, izvi ndizvo 2,3 zviuru ELT mabasa ekusiyana kwakaoma mukati meDWH (Hadoop), approx. 2,5 mazana edhatabhesi masosi, ichi ichikwata kubva 4 ETL vagadziri, iyo yakakamurwa kuita ETL data processing muDWH uye ELT data processing mukati meDWH uyezve zvimwe admin umwe, uyo anobata nehupfumi hwebasa.

Zvirongwa zvemangwana

Huwandu hwematanho huri kukura zvisingaite, uye chinhu chikuru chatichange tichiita maererano neiyo Airflow zvivakwa zviri kuwedzera. Tinoda kuvaka sumbu reAirflow, kugovera makumbo maviri evashandi veCelery, uye kugadzira musoro unozvidzokorodza nemaitiro ekuronga basa uye repository.

Epilogue

Izvi, hongu, hazvisi zvese zvandinoda kutaura nezve Airflow, asi ndakaedza kuratidza iwo makuru mapoinzi. Kudya kunouya nekudya, edza uye iwe uchazvifarira :)

Source: www.habr.com

Voeg