Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Hello, Habr! Sa artikulong ito gusto kong pag-usapan ang tungkol sa isang mahusay na tool para sa pagbuo ng mga proseso ng pagproseso ng batch ng data, halimbawa, sa imprastraktura ng isang corporate DWH o iyong DataLake. Pag-uusapan natin ang tungkol sa Apache Airflow (mula rito ay tinutukoy bilang Airflow). Ito ay hindi patas na pinagkaitan ng pansin sa Habré, at sa pangunahing bahagi ay susubukan kong kumbinsihin ka na hindi bababa sa Airflow ay sulit na tingnan kapag pumipili ng isang scheduler para sa iyong mga proseso ng ETL/ELT.

Dati, sumulat ako ng serye ng mga artikulo sa paksa ng DWH noong nagtrabaho ako sa Tinkoff Bank. Ngayon ay naging bahagi na ako ng pangkat ng Mail.Ru Group at bumubuo ako ng isang platform para sa pagsusuri ng data sa lugar ng paglalaro. Sa totoo lang, habang lumalabas ang mga balita at kawili-wiling solusyon, pag-uusapan namin ng aking team dito ang aming platform para sa data analytics.

Prologue

Kaya, magsimula tayo. Ano ang Airflow? Ito ay isang aklatan (o hanay ng mga aklatan) upang bumuo, magplano at subaybayan ang mga proseso ng trabaho. Ang pangunahing tampok ng Airflow: Python code ay ginagamit upang ilarawan (bumuo) ang mga proseso. Ito ay may maraming mga pakinabang para sa pag-aayos ng iyong proyekto at pag-unlad: sa esensya, ang iyong (halimbawa) ETL na proyekto ay isang proyektong Python lamang, at maaari mo itong ayusin ayon sa gusto mo, na isinasaalang-alang ang mga detalye ng imprastraktura, laki ng koponan at iba pang mga kinakailangan. Sa instrumental lahat ay simple. Gamitin halimbawa PyCharm + Git. Ito ay kahanga-hanga at napaka maginhawa!

Ngayon tingnan natin ang mga pangunahing entity ng Airflow. Sa pamamagitan ng pag-unawa sa kanilang kakanyahan at layunin, maaari mong mahusay na ayusin ang iyong arkitektura ng proseso. Marahil ang pangunahing entity ay ang Directed Acyclic Graph (mula rito ay tinutukoy bilang DAG).

Magdaga

Ang DAG ay ilang makabuluhang pagkakaugnay ng iyong mga gawain na gusto mong tapusin sa isang mahigpit na tinukoy na pagkakasunud-sunod ayon sa isang partikular na iskedyul. Nagbibigay ang Airflow ng maginhawang web interface para sa pagtatrabaho sa mga DAG at iba pang entity:

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Maaaring ganito ang hitsura ng DAG:

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Ang developer, kapag nagdidisenyo ng isang DAG, ay naglalatag ng isang hanay ng mga operator kung saan itatayo ang mga gawain sa loob ng DAG. Narito tayo sa isa pang mahalagang entity: Airflow Operator.

Mga operator

Ang operator ay isang entity batay sa kung aling mga pagkakataon ng trabaho ang nilikha, na naglalarawan kung ano ang mangyayari sa panahon ng pagpapatupad ng isang halimbawa ng trabaho. Airflow release mula sa GitHub naglalaman na ng isang hanay ng mga operator na handang gamitin. Mga halimbawa:

  • BashOperator - operator para sa pagpapatupad ng isang bash command.
  • PythonOperator - operator para sa pagtawag sa Python code.
  • EmailOperator — operator para sa pagpapadala ng email.
  • HTTPOperator - operator para sa pagtatrabaho sa mga kahilingan sa http.
  • SqlOperator - operator para sa pagpapatupad ng SQL code.
  • Ang sensor ay isang operator para sa paghihintay para sa isang kaganapan (ang pagdating ng kinakailangang oras, ang hitsura ng kinakailangang file, isang linya sa database, isang tugon mula sa API, atbp., atbp.).

Mayroong mas tiyak na mga operator: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Maaari ka ring bumuo ng mga operator batay sa iyong sariling mga katangian at gamitin ang mga ito sa iyong proyekto. Halimbawa, lumikha kami ng MongoDBToHiveViaHdfsTransfer, isang operator para sa pag-export ng mga dokumento mula sa MongoDB patungo sa Hive, at ilang mga operator para sa pagtatrabaho sa clickhouse: CHLoadFromHiveOperator at CHTableLoaderOperator. Mahalaga, sa sandaling ang isang proyekto ay madalas na gumamit ng code na binuo sa mga pangunahing pahayag, maaari mong isipin ang tungkol sa pagbuo nito sa isang bagong pahayag. Pasimplehin nito ang karagdagang pag-unlad, at palalawakin mo ang iyong library ng mga operator sa proyekto.

Susunod, ang lahat ng mga pagkakataong ito ng mga gawain ay kailangang isagawa, at ngayon ay pag-uusapan natin ang tungkol sa scheduler.

Tagapag-iskedyul

Naka-buo ang task scheduler ng Airflow Kintsay. Ang Celery ay isang Python library na nagbibigay-daan sa iyong mag-ayos ng isang queue kasama ang asynchronous at distributed execution ng mga gawain. Sa panig ng Airflow, ang lahat ng mga gawain ay nahahati sa mga pool. Ang mga pool ay ginawa nang manu-mano. Karaniwan, ang kanilang layunin ay limitahan ang workload ng pagtatrabaho kasama ang pinanggalingan o ilarawan ang mga gawain sa loob ng DWH. Maaaring pamahalaan ang mga pool sa pamamagitan ng web interface:

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Ang bawat pool ay may limitasyon sa bilang ng mga puwang. Kapag gumagawa ng DAG, binibigyan ito ng pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Ang isang pool na tinukoy sa antas ng DAG ay maaaring ma-override sa antas ng gawain.
Ang isang hiwalay na proseso, ang Scheduler, ay responsable para sa pag-iskedyul ng lahat ng mga gawain sa Airflow. Sa totoo lang, ang Scheduler ay tumatalakay sa lahat ng mga mekanika ng pagtatakda ng mga gawain para sa pagpapatupad. Ang gawain ay dumaan sa ilang yugto bago isagawa:

  1. Ang mga nakaraang gawain ay nakumpleto na sa DAG; ang isang bago ay maaaring i-queue.
  2. Ang pila ay pinagsunod-sunod depende sa priyoridad ng mga gawain (maaari ding kontrolin ang mga priyoridad), at kung mayroong libreng puwang sa pool, ang gawain ay maaaring gawin.
  3. Kung mayroong isang libreng kintsay ng manggagawa, ang gawain ay ipinadala dito; magsisimula ang gawaing na-program mo sa problema, gamit ang isa o ibang operator.

Simple lang.

Ang scheduler ay tumatakbo sa set ng lahat ng DAG at lahat ng gawain sa loob ng DAGs.

Para magsimulang magtrabaho ang Scheduler sa DAG, kailangang magtakda ng iskedyul ang DAG:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Mayroong isang hanay ng mga yari na preset: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Maaari mo ring gamitin ang mga cron expression:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Petsa ng Pagpapatupad

Upang maunawaan kung paano gumagana ang Airflow, mahalagang maunawaan kung ano ang Petsa ng Pagpapatupad para sa isang DAG. Sa Airflow, ang DAG ay may dimensyon ng Petsa ng Pagpapatupad, ibig sabihin, depende sa iskedyul ng trabaho ng DAG, ang mga instance ng gawain ay ginawa para sa bawat Petsa ng Pagpapatupad. At para sa bawat Petsa ng Pagpapatupad, maaaring muling isagawa ang mga gawain - o, halimbawa, ang isang DAG ay maaaring gumana nang sabay-sabay sa ilang Petsa ng Pagpapatupad. Ito ay malinaw na ipinapakita dito:

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Sa kasamaang palad (o marahil sa kabutihang-palad: depende ito sa sitwasyon), kung naitama ang pagpapatupad ng gawain sa DAG, pagkatapos ay magpapatuloy ang pagpapatupad sa nakaraang Petsa ng Pagpapatupad na isinasaalang-alang ang mga pagsasaayos. Ito ay mabuti kung kailangan mong muling kalkulahin ang data sa mga nakaraang panahon gamit ang isang bagong algorithm, ngunit ito ay masama dahil ang reproducibility ng resulta ay nawala (siyempre, walang nag-abala sa iyo na ibalik ang kinakailangang bersyon ng source code mula sa Git at kalkulahin kung ano kailangan mo ng isang beses, sa paraang kailangan mo ito).

Pagbuo ng mga gawain

Ang pagpapatupad ng DAG ay code sa Python, kaya mayroon kaming isang napaka-maginhawang paraan upang mabawasan ang dami ng code kapag nagtatrabaho, halimbawa, sa mga sharded na mapagkukunan. Sabihin nating mayroon kang tatlong MySQL shards bilang pinagmulan, kailangan mong umakyat sa bawat isa at kumuha ng ilang data. Bukod dito, nang nakapag-iisa at kahanay. Maaaring ganito ang hitsura ng Python code sa DAG:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Ang DAG ay ganito ang hitsura:

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Sa kasong ito, maaari kang magdagdag o mag-alis ng shard sa pamamagitan lamang ng pagsasaayos ng mga setting at pag-update ng DAG. Komportable!

Maaari ka ring gumamit ng mas kumplikadong pagbuo ng code, halimbawa, magtrabaho kasama ang mga mapagkukunan sa anyo ng isang database o ilarawan ang isang istraktura ng talahanayan, isang algorithm para sa pagtatrabaho sa isang talahanayan, at, isinasaalang-alang ang mga tampok ng imprastraktura ng DWH, bumuo ng isang proseso. para sa pag-load ng N mga talahanayan sa iyong imbakan. O, halimbawa, nagtatrabaho sa isang API na hindi sumusuporta sa pagtatrabaho sa isang parameter sa anyo ng isang listahan, maaari kang bumuo ng N gawain sa isang DAG mula sa listahang ito, limitahan ang parallelism ng mga kahilingan sa API sa isang pool at i-scrape ang kinakailangang data mula sa API. Nababaluktot!

imbakan

Ang Airflow ay may sariling backend repository, isang database (maaaring MySQL o Postgres, mayroon kaming Postgres), na nag-iimbak ng mga estado ng mga gawain, mga DAG, mga setting ng koneksyon, mga pandaigdigang variable, atbp., atbp. Dito nais kong sabihin na ang Ang repository sa Airflow ay napakasimple (mga 20 table) at maginhawa kung gusto mong bumuo ng alinman sa iyong sariling mga proseso sa ibabaw nito. Naaalala ko ang 100500 na mga talahanayan sa imbakan ng Informatica, na kailangang pag-aralan nang mahabang panahon bago maunawaan kung paano bumuo ng isang query.

Pagsubaybay

Dahil sa pagiging simple ng repository, maaari kang bumuo ng proseso ng pagsubaybay sa gawain na maginhawa para sa iyo. Gumagamit kami ng notepad sa Zeppelin, kung saan tinitingnan namin ang katayuan ng mga gawain:

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Ito rin ay maaaring ang web interface ng Airflow mismo:

Ang airflow ay isang tool para sa maginhawa at mabilis na pagbuo at pagpapanatili ng mga proseso ng pagproseso ng batch data

Open source ang Airflow code, kaya nagdagdag kami ng alerto sa Telegram. Ang bawat running instance ng isang gawain, kung may nangyaring error, ay nag-spam sa grupo sa Telegram, kung saan binubuo ang buong development at support team.

Nakatanggap kami ng agarang tugon sa pamamagitan ng Telegram (kung kinakailangan), at sa pamamagitan ng Zeppelin nakakatanggap kami ng pangkalahatang larawan ng mga gawain sa Airflow.

Sa kabuuan

Pangunahing open source ang airflow, at hindi mo dapat asahan ang mga himala mula dito. Maging handa na maglaan ng oras at pagsisikap upang makabuo ng solusyon na gumagana. Ang layunin ay makakamit, maniwala ka sa akin, sulit ito. Bilis ng pag-unlad, kakayahang umangkop, kadalian ng pagdaragdag ng mga bagong proseso - magugustuhan mo ito. Siyempre, kailangan mong magbayad ng maraming pansin sa organisasyon ng proyekto, ang katatagan ng Airflow mismo: ang mga himala ay hindi nangyayari.

Ngayon ay mayroon kaming Airflow na gumagana araw-araw tungkol sa 6,5 libong mga gawain. Magkaiba talaga sila ng character. May mga gawain sa paglo-load ng data sa pangunahing DWH mula sa maraming iba't ibang at napaka tiyak na mga mapagkukunan, may mga gawain sa pagkalkula ng mga storefront sa loob ng pangunahing DWH, may mga gawain ng pag-publish ng data sa isang mabilis na DWH, mayroong marami, maraming iba't ibang mga gawain - at Airflow ngumunguya sa kanila lahat araw-araw. Sa pagsasalita sa mga numero, ito ay 2,3 libo Mga gawain sa ELT na may iba't ibang kumplikado sa loob ng DWH (Hadoop), tinatayang. 2,5 daang mga database source, ito ay isang team mula sa 4 na developer ng ETL, na nahahati sa ETL data processing sa DWH at ELT data processing sa loob ng DWH at siyempre higit pa isang admin, na tumatalakay sa imprastraktura ng serbisyo.

Планы на будущее

Ang bilang ng mga proseso ay hindi maiiwasang lumalaki, at ang pangunahing bagay na gagawin natin sa mga tuntunin ng imprastraktura ng Airflow ay ang pag-scale. Gusto naming bumuo ng isang Airflow cluster, maglaan ng isang pares ng mga paa para sa mga manggagawa sa Celery, at gumawa ng self-duplicating head na may mga proseso sa pag-iiskedyul ng trabaho at isang repository.

Epilogo

Ito, siyempre, ay hindi lahat ng gusto kong sabihin tungkol sa Airflow, ngunit sinubukan kong i-highlight ang mga pangunahing punto. Ang gana ay kasama sa pagkain, subukan ito at magugustuhan mo ito :)

Pinagmulan: www.habr.com

Magdagdag ng komento