Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Moni, Habr! M'nkhaniyi ndikufuna kulankhula za chida chimodzi chachikulu chopangira njira zosinthira deta, mwachitsanzo, muzomangamanga za DWH kapena DataLake yanu. Tikambirana za Apache Airflow (yomwe imatchedwa Airflow). Ndizosakhudzidwa mopanda chilungamo pa Habré, ndipo gawo lalikulu ndiyesera kukutsimikizirani kuti osachepera Airflow ndiyofunika kuyang'ana posankha ndandanda wa njira zanu za ETL/ELT.

M'mbuyomu, ndidalemba mndandanda wankhani za DWH pomwe ndimagwira ntchito ku Tinkoff Bank. Tsopano ndakhala gawo la gulu la Mail.Ru Group ndipo ndikupanga nsanja yowunikira deta m'dera lamasewera. Kwenikweni, pamene nkhani ndi mayankho osangalatsa akuwonekera, gulu langa ndi ine tidzakambirana pano za nsanja yathu yowunikira deta.

Mawu oyambira

Kotero, tiyeni tiyambe. Kodi Airflow ndi chiyani? Iyi ndi laibulale (kapena seti ya malaibulale) kukonza, kukonza ndi kuyang'anira ntchito. Mbali yayikulu ya Airflow: Khodi ya Python imagwiritsidwa ntchito pofotokoza (kupanga) njira. Izi zili ndi zabwino zambiri pakukonza polojekiti yanu ndi chitukuko: kwenikweni, (mwachitsanzo) polojekiti yanu ya ETL ndi pulojekiti ya Python chabe, ndipo mutha kuyikonza momwe mungafunire, poganizira zachitukuko, kukula kwa gulu ndi zofunika zina. Mwa zida zonse ndi zophweka. Gwiritsani ntchito mwachitsanzo PyCharm + Git. Ndizodabwitsa komanso zothandiza kwambiri!

Tsopano tiyeni tiwone mabungwe akuluakulu a Airflow. Pomvetsetsa zenizeni ndi cholinga chawo, mutha kulinganiza bwino kamangidwe kanu. Mwina chinthu chachikulu ndi Directed Acyclic Graph (yotchedwa DAG).

DAG

DAG ndi mgwirizano wofunikira wa ntchito zanu zomwe mukufuna kumaliza motsatizana motsatira dongosolo linalake. Airflow imapereka mawonekedwe osavuta a intaneti ogwirira ntchito ndi ma DAG ndi mabungwe ena:

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

DAG ikhoza kuwoneka motere:

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Wopanga mapulogalamu, popanga DAG, amayika gulu la ogwira ntchito omwe ntchito mkati mwa DAG idzamangidwe. Apa tikubwera ku chinthu china chofunikira: Airflow Operator.

Ogwira ntchito

Wogwiritsa ntchito ndi bungwe lomwe limatengera nthawi za ntchito, zomwe zimafotokoza zomwe zidzachitike panthawi yantchito. Airflow imatulutsidwa kuchokera ku GitHub muli kale gulu la operekera omwe akonzeka kugwiritsidwa ntchito. Zitsanzo:

  • BashOperator - wogwiritsa ntchito potsatira lamulo la bash.
  • PythonOperator - wogwiritsa ntchito poyimbira nambala ya Python.
  • EmailOperator - wogwiritsa ntchito kutumiza imelo.
  • HTTPOperator - woyendetsa ntchito ndi zopempha za http.
  • SqlOperator - wogwiritsa ntchito SQL code.
  • Sensor ndi woyendetsa kuyembekezera chochitika (kufika kwa nthawi yofunikira, maonekedwe a fayilo yofunikira, mzere mu database, yankho lochokera ku API, etc., etc.).

Pali othandizira ena: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Mukhozanso kupanga ogwiritsira ntchito potengera mawonekedwe anu ndikuwagwiritsa ntchito mu polojekiti yanu. Mwachitsanzo, tidapanga MongoDBToHiveViaHdfsTransfer, wogwiritsa ntchito kutumiza zikalata kuchokera ku MongoDB kupita ku Hive, ndi ogwiritsa ntchito angapo kuti agwire nawo ntchito. Dinani Nyumba: CLoadFromHiveOperator ndi CHTableLoaderOperator. Kwenikweni, polojekiti ikangogwiritsa ntchito kachidindo kokhazikika pamawu oyambira, mutha kuganiza zowapanga kukhala mawu atsopano. Izi zipangitsa kuti chitukuko chikhale chosavuta, ndipo mudzakulitsa laibulale yanu ya ogwira ntchito pantchitoyo.

Kenako, zochitika zonsezi za ntchito ziyenera kuchitidwa, ndipo tsopano tikambirana za scheduler.

Wopanga dongosolo

Zokonzera ntchito za Airflow zimamangidwapo Selari. Selari ndi laibulale ya Python yomwe imakupatsani mwayi wokonzekera mzere wophatikizana komanso kugawa ntchito. Kumbali ya Airflow, ntchito zonse zimagawidwa m'madziwe. Maiwe amapangidwa pamanja. Nthawi zambiri, cholinga chawo ndikuchepetsa kuchuluka kwa ntchito ndi gwero kapena kuyimira ntchito mu DWH. Maiwe amatha kuwongoleredwa kudzera pa intaneti:

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Dziwe lirilonse liri ndi malire pa chiwerengero cha mipata. Mukapanga DAG, amapatsidwa dziwe:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Dziwe lotanthauziridwa pamlingo wa DAG likhoza kulembedwa pamlingo wantchito.
Njira ina, Scheduler, ili ndi udindo wokonza ntchito zonse mu Airflow. M'malo mwake, Scheduler imagwira ntchito ndi makina onse oyika ntchito kuti achite. Ntchitoyi imadutsa magawo angapo isanayambe kuchitidwa:

  1. Ntchito zam'mbuyomu zatsirizidwa mu DAG; yatsopano ikhoza kutsatiridwa.
  2. Mzerewu umasanjidwa malinga ndi zofunikira za ntchito (zoyamba zikhoza kuyendetsedwa), ndipo ngati pali malo aulere padziwe, ntchitoyi ikhoza kuchitidwa.
  3. Ngati pali udzu winawake waulere, ntchitoyo imatumizidwa kwa iye; ntchito yomwe mudakonza muvuto imayamba, pogwiritsa ntchito wogwiritsa ntchito m'modzi kapena wina.

Zosavuta mokwanira.

Scheduler imayenda pa seti ya ma DAG onse ndi ntchito zonse mkati mwa ma DAG.

Kuti Scheduler ayambe kugwira ntchito ndi DAG, DAG iyenera kukhazikitsa ndandanda:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Pali seti ya presets okonzeka: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Mukhozanso kugwiritsa ntchito mawu a cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Tsiku Lophedwa

Kuti mumvetsetse momwe Airflow imagwirira ntchito, ndikofunikira kumvetsetsa kuti Tsiku Lomaliza ndi la DAG. Mu Airflow, DAG ili ndi gawo la Tsiku Logwira Ntchito, mwachitsanzo, kutengera ndandanda ya ntchito ya DAG, zochitika zimapangidwira pa Tsiku lililonse la Kuphedwa. Ndipo pa Tsiku Lililonse Lakuphedwa, ntchito zitha kuchitidwanso - kapena, mwachitsanzo, DAG imatha kugwira ntchito nthawi imodzi m'masiku angapo Ophedwa. Izi zikuwonetsedwa bwino apa:

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Tsoka ilo (kapena mwina mwamwayi: zimatengera momwe zinthu ziliri), ngati kukhazikitsidwa kwa ntchitoyo mu DAG kukonzedwa, ndiye kuti kuphedwa mu Tsiku la Kuphedwa lapitalo kudzapitilira poganizira zosinthazo. Izi ndi zabwino ngati mukufuna kuwerengeranso deta m'nthawi zakale pogwiritsa ntchito algorithm yatsopano, koma ndizoyipa chifukwa kubwezanso kwa zotsatira kumatayika (zowona, palibe amene akukuvutitsani kuti mubweze mtundu wofunikira wa code kuchokera ku Git ndikuwerengera zomwe muyenera nthawi imodzi, momwe mukufunira).

Kupanga ntchito

Kukhazikitsidwa kwa DAG ndi kachidindo ku Python, kotero tili ndi njira yabwino kwambiri yochepetsera kuchuluka kwa kachidindo pogwira ntchito, mwachitsanzo, ndi magwero a sharded. Tiyerekeze kuti muli ndi ma shards atatu a MySQL monga gwero, muyenera kukwera mu iliyonse ndikutenga deta. Komanso, paokha komanso mofanana. Khodi ya Python mu DAG ikhoza kuwoneka motere:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG ikuwoneka motere:

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Pankhaniyi, mutha kuwonjezera kapena kuchotsa shard mwa kungosintha makonda ndikusintha DAG. Zabwino!

Mungagwiritsenso ntchito kupanga ma code ovuta kwambiri, mwachitsanzo, kugwira ntchito ndi magwero mu mawonekedwe a nkhokwe kapena kufotokozera ndondomeko ya tebulo, ndondomeko yogwiritsira ntchito tebulo, ndipo, poganizira za mawonekedwe a DWH, pangani ndondomeko. potsegula ma tebulo a N muzosungira zanu. Kapena, mwachitsanzo, kugwira ntchito ndi API yomwe sichirikiza kugwira ntchito ndi parameter mu mawonekedwe a mndandanda, mukhoza kupanga ntchito za N mu DAG kuchokera pamndandandawu, kuchepetsa kufanana kwa zopempha mu API ku dziwe, ndi scrape. deta yofunikira kuchokera ku API. Wosinthika!

posungira

Airflow ili ndi malo ake osungira kumbuyo, nkhokwe (ikhoza kukhala MySQL kapena Postgres, tili ndi Postgres), yomwe imasungira maiko a ntchito, ma DAG, makonzedwe ogwirizanitsa, zosinthika zapadziko lonse, ndi zina zotero. Pano ndikufuna kuti ndinene kuti chosungira mu Airflow ndichosavuta (pafupifupi matebulo a 20) ndipo ndichosavuta ngati mukufuna kupanga njira zanu zonse pamwamba pake. Ndimakumbukira matebulo 100500 omwe anali m'malo a Informatica, omwe adayenera kuphunziridwa kwa nthawi yayitali asanamvetsetse momwe angapangire funso.

Kuwunikira

Poganizira kuphweka kwa nkhokwe, mutha kupanga njira yowunikira ntchito yomwe ili yabwino kwa inu. Timagwiritsa ntchito notepad ku Zeppelin, komwe timayang'ana momwe ntchito zilili:

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Izi zitha kukhalanso mawonekedwe a intaneti a Airflow palokha:

Airflow ndi chida chosavuta komanso mwachangu kupanga ndikusunga njira zosinthira deta

Khodi ya Airflow ndi gwero lotseguka, chifukwa chake tawonjezera kuchenjeza ku Telegraph. Chiwonetsero chilichonse cha ntchito, ngati cholakwika chichitika, sipamu gulu mu Telegraph, pomwe gulu lonse lachitukuko ndi chithandizo limapangidwa.

Timalandila kuyankha mwachangu kudzera pa Telegalamu (ngati pakufunika), ndipo kudzera ku Zeppelin timalandira chithunzi chonse cha ntchito mu Airflow.

Chiwerengero

Airflow ndi gwero lotseguka, ndipo simuyenera kuyembekezera zozizwitsa kuchokera kwa izo. Khalani okonzeka kuyika nthawi ndi mphamvu kuti mupange yankho lomwe limagwira ntchito. Cholinga ndichotheka, ndikhulupirireni, ndichofunika. Kuthamanga kwachitukuko, kusinthasintha, kumasuka kwa kuwonjezera njira zatsopano - mungakonde. Inde, muyenera kumvetsera kwambiri bungwe la polojekitiyi, kukhazikika kwa Airflow palokha: zozizwitsa sizichitika.

Tsopano tili ndi Airflow ikugwira ntchito tsiku lililonse za 6,5 ntchito zikwi. Iwo ndi osiyana kwambiri ndi makhalidwe. Pali ntchito zokweza zidziwitso mu DWH yayikulu kuchokera kumagwero osiyanasiyana komanso apadera kwambiri, pali ntchito zowerengera sitolo mkati mwa DWH yayikulu, pali ntchito zofalitsa deta mu DWH yachangu, pali ntchito zambiri, zosiyanasiyana - ndi Airflow. amatafuna onse tsiku ndi tsiku. Kulankhula mu manambala, izi ndi 2,3 zikwi Ntchito za ELT zosinthika mosiyanasiyana mkati mwa DWH (Hadoop), pafupifupi. 2,5 mazana a database magwero, ili ndi gulu lochokera 4 opanga ETL, omwe amagawidwa mu ETL data processing mu DWH ndi ELT data processing mkati DWH ndi kumene zina admin wina, yemwe amagwira ntchito ndi zomangamanga za utumiki.

Zimakonzekera zam'tsogolo

Chiwerengero cha njira chikukula mosapeweka, ndipo chinthu chachikulu chomwe tikhala tikuchita pokhudzana ndi zomangamanga za Airflow ndikukulira. Tikufuna kupanga gulu la Airflow, kugawa miyendo iwiri kwa ogwira ntchito Selari, ndikupanga mutu wodzibwereza wokha ndi njira zokonzera ntchito komanso malo osungira.

Epilogue

Izi, ndithudi, si zonse zomwe ndikufuna kunena za Airflow, koma ndinayesera kuwunikira mfundo zazikulu. Kulakalaka kumabwera ndi kudya, yesani ndipo mungakonde :)

Source: www.habr.com

Kuwonjezera ndemanga