Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Hello, Habr! Niini nga artikulo gusto nakong hisgutan ang usa ka maayo nga himan alang sa pagpalambo sa mga proseso sa pagproseso sa batch data, pananglitan, sa imprastraktura sa usa ka corporate DWH o sa imong DataLake. Maghisgot kami bahin sa Apache Airflow (gitawag dinhi nga Airflow). Dili patas nga gihikawan sa atensyon sa Habré, ug sa panguna nga bahin sulayan nako nga kombinsihon ka nga labing menos Airflow angay tan-awon kung nagpili usa ka scheduler alang sa imong mga proseso sa ETL / ELT.

Kaniadto, nagsulat ako usa ka serye sa mga artikulo bahin sa hilisgutan sa DWH sa dihang nagtrabaho ako sa Tinkoff Bank. Karon nahimo na ako nga bahin sa Mail.Ru Group nga grupo ug nagpalambo sa usa ka plataporma alang sa pagtuki sa datos sa lugar sa pagdula. Sa tinuud, samtang nagpakita ang mga balita ug makapaikag nga mga solusyon, ang akong team ug ako maghisgot dinhi bahin sa among plataporma alang sa data analytics.

Prologue

Busa, magsugod kita. Unsa ang Airflow? Kini usa ka librarya (o set sa mga librarya) sa pagpalambo, pagplano ug pagmonitor sa mga proseso sa trabaho. Ang nag-unang bahin sa Airflow: Python code gigamit sa paghulagway (pagpalambo) mga proseso. Kini adunay daghang mga bentaha alang sa pag-organisar sa imong proyekto ug pag-uswag: sa tinuud, ang imong (pananglitan) nga proyekto sa ETL usa lamang ka proyekto sa Python, ug mahimo nimo kini maorganisar kung gusto nimo, nga gikonsiderar ang mga detalye sa imprastraktura, gidak-on sa team ug ubang mga kinahanglanon. Sa instrumento ang tanan yano ra. Gamita pananglitan ang PyCharm + Git. Nindot kaayo ug kombenyente kaayo!

Karon atong tan-awon ang mga nag-unang entidad sa Airflow. Pinaagi sa pagsabut sa ilang diwa ug katuyoan, mahimo nimong maorganisar ang imong arkitektura sa proseso. Tingali ang nag-unang entidad mao ang Directed Acyclic Graph (human niini gitawag nga DAG).

Dag

Ang DAG usa ka makahuluganon nga asosasyon sa imong mga buluhaton nga gusto nimong tapuson sa usa ka estrikto nga pagkasunod-sunod sumala sa usa ka piho nga iskedyul. Ang Airflow naghatag usa ka kombenyente nga web interface alang sa pagtrabaho kauban ang mga DAG ug uban pang mga entidad:

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Ang DAG mahimong ingon niini:

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Ang nag-develop, kung nagdesinyo sa usa ka DAG, nagbutang usa ka set sa mga operator kung diin ang mga buluhaton sa sulod sa DAG pagatukuron. Ania kita sa laing importante nga entidad: Airflow Operator.

Mga operator

Ang operator usa ka entidad nga gibase sa kung unsang mga higayon sa trabaho ang gihimo, nga naghulagway kung unsa ang mahitabo sa panahon sa pagpatuman sa usa ka pananglitan sa trabaho. Ang airflow gipagawas gikan sa GitHub naa nay set sa mga operator nga andam gamiton. Mga pananglitan:

  • BashOperator - operator alang sa pagpatuman sa usa ka bash command.
  • PythonOperator - operator alang sa pagtawag sa Python code.
  • EmailOperator — operator sa pagpadala sa email.
  • HTTPOperator - operator alang sa pagtrabaho sa mga hangyo sa http.
  • SqlOperator - operator alang sa pagpatuman sa SQL code.
  • Ang sensor usa ka operator sa paghulat sa usa ka panghitabo (ang pag-abot sa gikinahanglan nga oras, ang dagway sa gikinahanglan nga file, usa ka linya sa database, usa ka tubag gikan sa API, ug uban pa, ug uban pa).

Adunay mas piho nga mga operator: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Mahimo ka usab nga makahimo og mga operator base sa imong kaugalingon nga mga kinaiya ug gamiton kini sa imong proyekto. Pananglitan, gibuhat namo ang MongoDBToHiveViaHdfsTransfer, usa ka operator sa pag-eksport sa mga dokumento gikan sa MongoDB ngadto sa Hive, ug daghang mga operator para sa pagtrabaho uban sa clickhouse: CHLoadFromHiveOperator ug CHTableLoaderOperator. Sa tinuud, sa diha nga ang usa ka proyekto kanunay nga gigamit ang code nga gitukod sa sukaranan nga mga pahayag, mahimo nimong hunahunaon ang paghimo niini nga usa ka bag-ong pahayag. Kini makapasayon ​​sa dugang nga kalamboan, ug imong palapdan ang imong librarya sa mga operator sa proyekto.

Sunod, kining tanan nga mga higayon sa mga buluhaton kinahanglan nga ipatuman, ug karon maghisgot kami bahin sa scheduler.

scheduler

Ang scheduler sa buluhaton sa Airflow gitukod Celery. Ang Celery usa ka librarya sa Python nga nagtugot kanimo sa pag-organisar sa usa ka pila plus asynchronous ug giapod-apod nga pagpatuman sa mga buluhaton. Sa bahin sa Airflow, ang tanan nga mga buluhaton gibahin sa mga pool. Ang mga pool gihimo sa mano-mano. Kasagaran, ang ilang katuyoan mao ang limitahan ang buluhaton sa pagtrabaho kauban ang gigikanan o ang tipo nga mga buluhaton sa sulod sa DWH. Ang mga pool mahimong madumala pinaagi sa web interface:

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Ang matag pool adunay limitasyon sa gidaghanon sa mga slots. Kung maghimo usa ka DAG, gihatagan kini usa ka pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Ang usa ka pool nga gihubit sa lebel sa DAG mahimong ma-override sa lebel sa buluhaton.
Ang usa ka bulag nga proseso, ang Scheduler, maoy responsable sa pag-iskedyul sa tanang buluhaton sa Airflow. Sa tinuud, ang Scheduler naghisgot sa tanan nga mga mekaniko sa pagtakda sa mga buluhaton alang sa pagpatuman. Ang buluhaton moagi sa daghang mga yugto sa wala pa ipatuman:

  1. Ang mga nangaging mga buluhaton nahuman na sa DAG; ang usa ka bag-o mahimong mapila.
  2. Ang pila gihan-ay depende sa prayoridad sa mga buluhaton (mahimo usab nga kontrolon ang mga prayoridad), ug kung adunay usa ka libre nga slot sa pool, ang buluhaton mahimong magamit.
  3. Kung adunay usa ka libre nga trabahador nga celery, ang buluhaton gipadala niini; ang trabaho nga imong giprograma sa problema magsugod, gamit ang usa o lain nga operator.

Simple nga igo.

Ang scheduler midagan sa set sa tanang DAGs ug tanang buluhaton sulod sa DAGs.

Alang sa Scheduler nga magsugod sa pagtrabaho uban sa DAG, ang DAG kinahanglan nga magtakda og iskedyul:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Adunay usa ka set sa andam na nga mga preset: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Mahimo usab nimo gamiton ang mga ekspresyon sa cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Petsa sa Pagpatay

Aron masabtan kung giunsa paglihok ang Airflow, importante nga masabtan kung unsa ang Petsa sa Pagpatuman alang sa usa ka DAG. Sa Airflow, ang DAG adunay dimensyon sa Petsa sa Pagpatuman, i.e., depende sa iskedyul sa trabaho sa DAG, ang mga higayon sa buluhaton gihimo alang sa matag Petsa sa Pagpatuman. Ug alang sa matag Petsa sa Pagpatuman, ang mga buluhaton mahimong ipatuman pag-usab - o, pananglitan, ang usa ka DAG mahimong dungan nga magtrabaho sa daghang Petsa sa Pagpatuman. Kini klaro nga gipakita dinhi:

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Ikasubo (o tingali swerte: nagdepende kini sa sitwasyon), kung gitul-id ang pagpatuman sa buluhaton sa DAG, nan ang pagpatuman sa miaging Petsa sa Pagpatuman magpadayon nga gikonsiderar ang mga pagbag-o. Maayo kini kung kinahanglan nimo nga kalkulahon pag-usab ang datos sa nangaging mga panahon gamit ang usa ka bag-ong algorithm, apan kini dili maayo tungod kay nawala ang pag-reproducibility sa resulta (siyempre, walay usa nga nagsamok kanimo sa pagbalik sa gikinahanglan nga bersyon sa source code gikan sa Git ug kuwentaha kung unsa kinahanglan nimo usa ka higayon, sa paagi nga kinahanglan nimo kini).

Pagmugna og mga buluhaton

Ang pagpatuman sa DAG mao ang code sa Python, mao nga kita adunay usa ka kombenyente nga paagi sa pagpakunhod sa gidaghanon sa code sa diha nga nagtrabaho, alang sa panig-ingnan, uban sa sharded tinubdan. Ingnon ta nga ikaw adunay tulo ka MySQL shards isip usa ka tinubdan, kinahanglan nimo nga mosaka sa matag usa ug magkuha og pipila ka datos. Dugang pa, independente ug managsama. Ang Python code sa DAG mahimong ingon niini:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Ang DAG ingon niini:

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Sa kini nga kaso, mahimo nimong idugang o kuhaon ang usa ka shard pinaagi lamang sa pag-adjust sa mga setting ug pag-update sa DAG. Komportable!

Mahimo usab nimo gamiton ang labi ka komplikado nga henerasyon sa code, pananglitan, pagtrabaho kauban ang mga gigikanan sa porma sa usa ka database o paghulagway sa usa ka istruktura sa lamesa, usa ka algorithm alang sa pagtrabaho sa usa ka lamesa, ug, nga gikonsiderar ang mga bahin sa imprastraktura sa DWH, makamugna usa ka proseso. alang sa pagkarga sa N nga mga lamesa sa imong storage. O, pananglitan, ang pagtrabaho sa usa ka API nga wala nagsuporta sa pagtrabaho sa usa ka parameter sa porma sa usa ka lista, mahimo nimong makamugna ang N nga mga buluhaton sa usa ka DAG gikan sa kini nga lista, limitahan ang paralelismo sa mga hangyo sa API sa usa ka pool, ug pag-scrape ang gikinahanglan nga datos gikan sa API. Flexible!

tipiganan

Ang Airflow adunay kaugalingong backend repository, usa ka database (mahimong MySQL o Postgres, kita adunay mga Postgres), nga nagtipig sa mga estado sa mga buluhaton, DAGs, mga setting sa koneksyon, global nga mga variable, ug uban pa, ug uban pa. Dinhi gusto ko nga makaingon nga Ang repository sa Airflow kay yano kaayo (mga 20 ka lamesa) ug sayon ​​kung gusto nimo nga magtukod ug bisan unsa sa imong kaugalingong mga proseso sa ibabaw niini. Nahinumdom ko sa 100500 ka mga lamesa sa Informatica repository, nga kinahanglang tun-an sa dugay nga panahon sa dili pa masabtan kon unsaon paghimo og pangutana.

Pagbantay

Tungod sa kayano sa repository, makahimo ka og usa ka proseso sa pag-monitor sa buluhaton nga sayon ​​alang kanimo. Gigamit namon ang usa ka notepad sa Zeppelin, diin among gitan-aw ang kahimtang sa mga buluhaton:

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Mahimo usab kini nga web interface sa Airflow mismo:

Ang pag-agos sa hangin usa ka himan alang sa dali ug dali nga pag-uswag ug pagpadayon sa mga proseso sa pagproseso sa datos sa batch

Ang Airflow code kay open source, mao nga gidugang namo ang pag-alerto sa Telegram. Ang matag run nga pananglitan sa usa ka buluhaton, kung adunay usa ka sayup nga mahitabo, nag-spam sa grupo sa Telegram, diin ang tibuok nga development ug support team naglangkob.

Nakadawat kami usa ka dali nga tubag pinaagi sa Telegram (kung gikinahanglan), ug pinaagi sa Zeppelin nakadawat kami usa ka kinatibuk-ang litrato sa mga buluhaton sa Airflow.

Total

Ang pag-agos sa hangin nag-una nga bukas nga gigikanan, ug dili ka kinahanglan magdahum nga mga milagro gikan niini. Pag-andam sa pagbutang sa oras ug paningkamot sa paghimo sa usa ka solusyon nga molihok. Ang tumong makab-ot, tuohi ako, takus kini. Ang katulin sa pag-uswag, pagka-flexible, kadali sa pagdugang bag-ong mga proseso - gusto nimo kini. Siyempre, kinahanglan nimo nga hatagan ug daghang pagtagad ang organisasyon sa proyekto, ang kalig-on sa Airflow mismo: ang mga milagro dili mahitabo.

Karon kami adunay Airflow nga nagtrabaho adlaw-adlaw mga 6,5 ka libo nga buluhaton. Lahi gyud sila ug kinaiya. Adunay mga buluhaton sa pagkarga sa datos ngadto sa nag-unang DWH gikan sa daghang lain-laing ug piho kaayo nga mga tinubdan, adunay mga buluhaton sa pagkalkulo sa mga storefront sulod sa nag-unang DWH, adunay mga buluhaton sa pagmantala sa datos ngadto sa usa ka paspas nga DWH, adunay daghan, daghang lain-laing mga buluhaton - ug Airflow ginausap sila tanan adlaw-adlaw. Sa pagsulti sa mga numero, kini mao 2,3 ka libo Ang mga buluhaton sa ELT nga lainlain ang pagkakomplikado sulod sa DWH (Hadoop), gibanabana. 2,5 ka gatos nga mga database tinubdan, kini usa ka team gikan sa 4 ETL developers, nga gibahin sa pagproseso sa datos sa ETL sa pagproseso sa datos sa DWH ug ELT sa sulod sa DWH ug siyempre daghan pa usa ka admin, nga naghisgot sa imprastraktura sa serbisyo.

Mga plano alang sa umaabot

Ang gidaghanon sa mga proseso dili malikayan nga motubo, ug ang panguna nga butang nga atong buhaton sa mga termino sa imprastraktura sa Airflow mao ang pag-scale. Gusto namong magtukod ug Airflow cluster, maggahin ug parisan sa mga bitiis para sa mga trabahante sa Celery, ug maghimo ug self-duplicate nga ulo nga adunay mga proseso sa pag-iskedyul sa trabaho ug repositoryo.

Epilogo

Kini, siyempre, dili tanan nga gusto nakong isulti bahin sa Airflow, apan gisulayan nako nga ipasiugda ang mga nag-unang punto. Ang gana dala sa pagkaon, sulayi ug ganahan ka :)

Source: www.habr.com

Idugang sa usa ka comment