Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Silav Habr! Di vê gotarê de ez dixwazim li ser yek amûrek girîng a ji bo pêşvebirina pêvajoyên hilberandina daneya berhevokê biaxivim, mînakî, di binesaziya pargîdaniyek DWH an DataLake-ya we de. Em ê li ser Apache Airflow biaxivin (li vir wekî Airflow tê binav kirin). Ew bi neheqî ji balê li ser Habré bêpar e, û di beşa bingehîn de ez ê hewl bidim ku we razî bikim ku bi kêmanî Airflow hêja ye ku dema ku nexşerêyek ji bo pêvajoyên ETL / ELT hilbijêrin lê binihêrin.

Berê, dema ku ez li Tinkoff Bank xebitîm, min rêzek gotar li ser mijara DWH nivîsand. Naha ez bûm beşek ji tîmê Koma Mail.Ru û ez platformek ji bo analîzkirina daneyê di qada lîstikê de pêş dixe. Bi rastî, her ku nûçe û çareseriyên balkêş xuya dibin, tîmê min û ez ê li vir li ser platforma xwe ya ji bo analîtîka daneyan biaxivim.

Prologue

Ji ber vê yekê, em dest pê bikin. Airflow çi ye? Ev pirtûkxaneyek e (an set of pirtûkxaneyan) pêşveçûn, plankirin û şopandina pêvajoyên xebatê. Taybetmendiya sereke ya Airflow: Koda Python ji bo danasîna (pêşvebirina) pêvajoyan tê bikar anîn. Vê yekê ji bo organîzekirina proje û pêşkeftina we gelek avantajên xwe hene: di eslê xwe de, projeya weya ETL (mînak) tenê projeyek Python e, û hûn dikarin wê wekî ku hûn dixwazin birêxistin bikin, li gorî taybetmendiyên binesaziyê, mezinahiya tîmê û pêdiviyên din. Instrumental her tişt hêsan e. Mînakî PyCharm + Git bikar bînin. Ew ecêb û pir hêsan e!

Naha em li saziyên sereke yên Airflow binêrin. Bi têgihîştina cewher û armanca wan, hûn dikarin mîmariya pêvajoya xwe bi rengek çêtirîn organîze bikin. Dibe ku sazûmana sereke Grafika Acyclic Directed (li vir wekî DAG tê binav kirin) be.

DAG

DAG hin komeleyek watedar a peywirên we ye ku hûn dixwazin li gorî nexşeyek taybetî bi rêzek hişk diyarkirî temam bikin. Airflow ji bo xebata bi DAG û saziyên din re navgînek webê ya hêsan peyda dike:

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

DAG dibe ku bi vî rengî xuya bike:

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Pêşdebir, dema ku DAG-ê dîzayn dike, komek operator destnîşan dike ku dê karên di nav DAG-ê de li ser bêne çêkirin. Li vir em werin ser saziyek girîng a din: Operatorê hewayê.

Operator

Operator saziyek e ku li ser bingeha wê mînakên kar têne afirandin, ku diyar dike ka dê di dema pêkanîna mînakek kar de çi bibe. Herikîna hewayê ji GitHub derdikeve jixwe komek operatorên ku ji bo karanîna amade ne hene. Nimûne:

  • BashOperator - operator ji bo pêkanîna fermanek bash.
  • PythonOperator - operator ji bo banga koda Python.
  • EmailOperator - operator ji bo şandina e-nameyê.
  • HTTPOperator - operator ji bo xebata bi daxwazên http.
  • SqlOperator - operator ji bo pêkanîna koda SQL.
  • Sensor operatorek e ku li benda bûyerek e (hatina dema pêwîst, xuyangkirina pelê hewce, rêzek di databasê de, bersivek ji API, hwd., hwd.).

Operatorên taybetî hene: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Her weha hûn dikarin operatoran li ser bingeha taybetmendiyên xwe pêşve bibin û wan di projeya xwe de bikar bînin. Mînakî, me MongoDBToHiveViaHdfsTransfer, operatorek ji bo hinardekirina belgeyan ji MongoDB bo Hive, û çend operatorên ji bo xebatê bi clickhouse: CHLoadFromHiveOperator û CHTableLoaderOperator. Di bingeh de, gava ku projeyek pir caran kodek ku li ser daxuyaniyên bingehîn hatî çêkirin bikar anî, hûn dikarin li ser avakirina wê di daxuyaniyek nû de bifikirin. Ev ê pêşkeftina bêtir hêsan bike, û hûn ê pirtûkxaneya xwe ya operatorên di projeyê de berfireh bikin.

Dûv re, hemî van nimûneyên peywiran hewce ne ku bêne darve kirin, û naha em ê li ser plansazker biaxivin.

Scheduler

Bernameya peywirê ya Airflow li ser hatî çêkirin Kerfes. Celery pirtûkxaneyek Python e ku destûrê dide te ku hûn rêzek û pêkanîna peywiran asynchronous û belavkirî organîze bikin. Li aliyê Airflow, hemî peywir li hewzan têne dabeş kirin. Hewz bi destan têne çêkirin. Bi gelemperî, mebesta wan ew e ku barê xebata xebata bi çavkaniyê re sînordar bikin an jî peywirên di nav DWH de binivîsînin. Hewz dikarin bi navgîniya webê ve werin rêvebirin:

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Her hewzek heye sînorekî li ser hejmara slots . Dema afirandina DAG, hewzek jê re tê dayîn:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Hewzeyek ku di asta DAG-ê de hatî destnîşan kirin dikare di asta peywirê de were rakirin.
Pêvajoyek cûda, Scheduler, berpirsiyar e ku hemî peywiran di Airflow de plansaz bike. Bi rastî, Scheduler bi hemî mekanîka danîna peywiran ji bo darvekirinê re mijûl dibe. Kar berî ku were darve kirin di çend qonaxan re derbas dibe:

  1. Karên berê di DAG-ê de hatine qedandin; yekî nû dikare were rêz kirin.
  2. Rêz li gorî pêşaniya karan tê rêz kirin (pêşanî jî dikarin werin kontrol kirin), û heke di hewzê de cîhek belaş hebe, peywir dikare were xebitandin.
  3. Ger kerfeseke xebatkarê belaş hebe, wezîfe jê re tê şandin; karê ku we di pirsgirêkê de bername kiriye, bi karanîna yek an operatorek din dest pê dike.

Hêsan bes.

Scheduler li ser komek hemî DAG-an û hemî peywirên di nav DAG-an de dimeşîne.

Ji bo ku Scheduler bi DAG re dest bi xebatê bike, DAG pêdivî ye ku bernameyek saz bike:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Komek pêşdibistanên amade hene: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Her weha hûn dikarin bêjeyên cron bikar bînin:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Dîroka Îdamê

Ji bo ku fêm bikin ka Airflow çawa dixebite, girîng e ku hûn fêm bikin ka Dîroka Darvekirinê ji bo DAG-ê çi ye. Di Airflow de, DAG xwedan pîvanek Dîroka Bidarvekirinê ye, ango, li gorî nexşeya xebata DAG-ê ve girêdayî, ji bo her Dîroka Bidarvekirinê mînakên peywirê têne afirandin. Û ji bo her Dîroka Bidarvekirinê, peywir dikarin ji nû ve bêne darve kirin - an, mînakî, DAG dikare di çend Dîrokên Bidarvekirinê de bi hevdemî bixebite. Ev li vir bi zelalî tê xuyang kirin:

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Mixabin (an belkî bextewar: ew bi rewşê ve girêdayî ye), heke pêkanîna peywirê di DAG-ê de were rast kirin, wê hingê darvekirin di Dîroka Darvekirinê ya berê de dê li gorî sererastkirinan bimeşe. Ev baş e heke hûn hewce ne ku daneyên di demên borî de bi karanîna algorîtmayek nû ji nû ve hesab bikin, lê ew xirab e ji ber ku ji nû ve hilberîna encamê winda dibe (bê guman, kes we aciz nake ku hûn guhertoya pêdivî ya koda çavkaniyê ji Git vegerînin û çi hesab bikin. hûn yek carî hewce ne, awayê ku hûn hewce ne).

Hilberîna peywiran

Pêkanîna DAG-ê di Python-ê de kod e, ji ber vê yekê me rêyek pir hêsan heye ku em dema ku dixebitin, mînakî, bi çavkaniyên şikestî, mêjera kodê kêm bikin. Ka em bibêjin ku we sê şûşeyên MySQL wekî çavkaniyek heye, hûn hewce ne ku li her yekê hilkişin û hin daneyan hilbijêrin. Wekî din, serbixwe û paralel. Koda Python di DAG de dibe ku bi vî rengî xuya bike:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG wiha xuya dike:

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Di vê rewşê de, hûn dikarin bi tenê verastkirina mîhengan û nûvekirina DAG-ê perçeyek lê zêde bikin an jê bikin. Rehet!

Her weha hûn dikarin hilberîna kodê ya tevlihevtir bikar bînin, mînakî, bi çavkaniyan re di forma databasê de bixebitin an avahiyek tabloyê, algorîtmayek ji bo xebata bi tabloyê re diyar bikin, û, bi girtina taybetmendiyên binesaziya DWH, pêvajoyek çêbikin. ji bo barkirina N maseyên nav hilanîna we. An jî, mînakî, bi API-yek ku piştgirî nade xebata bi pîvanek di forma navnîşê de, hûn dikarin N peywiran di DAG-ê de ji vê navnîşê biafirînin, paralelîzma daxwaznameyên di API-yê de bi hewzek sînordar bikin, û bişkînin. daneyên pêwîst ji API. Gûherrane!

depo

Airflow depoya xweya paşîn heye, databasek (dibe ku bibe MySQL an Postgres, me Postgres heye), ku rewşên peywiran, DAG, mîhengên girêdanê, guhêrbarên gerdûnî, hwd., hwd hilîne. Li vir ez dixwazim bibêjim ku depoya di Airflow de pir hêsan e (nêzîkî 20 tabloyan) û rehet e heke hûn dixwazin li ser wê yek ji pêvajoyên xwe ava bikin. 100500 tabloyên di depoya Informatica de têne bîra min, yên ku diviyabû demek dirêj were lêkolîn kirin berî ku fêm bikin ka meriv çawa pirsek çawa ava dike.

Ingopandin

Ji ber sadebûna depoyê, hûn dikarin pêvajoyek şopandina peywirê ku ji bo we rehet e ava bikin. Em li Zeppelin notepadek bikar tînin, ku em li rewşa karan dinêrin:

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Ev jî dibe ku navgîniya webê ya Airflow bixwe be:

Airflow amûrek e ku bi hêsanî û zû pêşdebirin û domandina pêvajoyên hilberandina daneya hevîrê ye

Koda hewayê çavkaniyek vekirî ye, ji ber vê yekê me hişyarî li Telegram zêde kiriye. Her mînakek xebitandinê ya karekî, heke xeletiyek çêbibe, koma di Telegram de spam dike, ku tevahiya tîmê pêşkeftin û piştgiriyê pêk tê.

Em bi Telegram re bersivek bilez distînin (heke hewce be), û bi navgîniya Zeppelin em wêneyek giştî ya peywiran di Airflow de digirin.

Tevahî

Herikîna hewayê di serî de çavkaniyek vekirî ye, û divê hûn ji wê kerametan hêvî nekin. Amade bin ku ji bo avakirina çareseriyek ku kar dike dem û hewldan bidin. Armanc pêkan e, ji min bawer bikin, hêjayî wê ye. Leza pêşkeftinê, nermbûn, hêsankirina zêdekirina pêvajoyên nû - hûn ê jê hez bikin. Bê guman, hûn hewce ne ku pir girîngiyê bidin rêxistina projeyê, aramiya hewayê bixwe: keramet çênabin.

Niha em Airflow rojane dixebitin nêzî 6,5 hezar wezîfe. Ew di karakterê de pir cûda ne. Karên barkirina daneyan li DWH-ya sereke ji gelek çavkaniyên cihêreng û pir taybetî hene, peywirên hesabkirina firoşgehan di hundurê DWH-ya sereke de hene, peywirên weşandina daneyan di DWHek bilez de hene, gelek, gelek karên cihêreng hene - û Gerok wan hemûyan roj bi roj diçêrîne. Bi hejmaran dipeyivin, ev e 2,3 hezar Karên ELT yên tevliheviya cihêreng di hundurê DWH (Hadoop), nêzîkê. 2,5 sed databases çavkaniyên, ev tîmek ji 4 pêşdebiran ETL, ku di nav DWH û hilberandina daneya ELT de di hundurê DWH de û bê guman bêtir têne dabeş kirin. yek admin, ku bi binesaziya xizmetê re mijûl dibe.

Plana pêşerojê

Hejmara pêvajoyan bi neçarî mezin dibe, û ya sereke ku em ê di warê binesaziya Airflow de bikin, pîvandin e. Em dixwazin komikek Airflow ava bikin, cotek lingan ji xebatkarên Celery re veqetînin, û bi pêvajoyên plansazkirina kar û depoyek re serê xwe-kober bikin.

Încîlê

Ev, bê guman, ne her tiştê ku ez dixwazim li ser Airflow bibêjim, lê min hewl da ku xalên sereke ronî bikim. Xwarin bi xwarinê re tê, wê biceribîne û hûn ê jê hez bikin :)

Source: www.habr.com

Add a comment