Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Halò, Habr! San artaigil seo tha mi airson bruidhinn mu aon inneal fìor mhath airson pròiseasan giollachd dàta baidse a leasachadh, mar eisimpleir, ann am bun-structar DWH corporra no an DataLake agad. Bruidhnidh sinn mu dheidhinn Apache Airflow (air ainmeachadh an-seo mar Airflow). Tha e air a chall gu mì-chothromach bho aire Habré, agus sa phrìomh phàirt feuchaidh mi ri toirt a chreidsinn ort gum b’ fhiach sùil a thoirt air Airflow co-dhiù nuair a thaghas tu clàr-ama airson na pròiseasan ETL / ELT agad.

Roimhe sin, sgrìobh mi sreath de artaigilean air cuspair DWH nuair a bha mi ag obair aig Tinkoff Bank. A-nis tha mi air a bhith nam phàirt de sgioba Mail.Ru Group agus tha mi a’ leasachadh àrd-ùrlar airson mion-sgrùdadh dàta anns an raon cluich. Gu fìrinneach, mar a nochdas naidheachdan is fuasglaidhean inntinneach, bruidhnidh mo sgioba agus mise an seo mun àrd-ùrlar againn airson mion-sgrùdadh dàta.

Prologue

Mar sin, tòisichidh sinn. Dè th' ann an Airflow? Seo leabharlann (no seata de leabharlannan) pròiseasan obrach a leasachadh, a phlanadh agus a sgrùdadh. Tha prìomh fheart Airflow: còd Python air a chleachdadh airson cunntas a thoirt air pròiseasan (leasachadh). Tha mòran bhuannachdan aig seo airson do phròiseact agus leasachadh a chuir air dòigh: gu dearbh, chan eil anns a’ phròiseact ETL agad (mar eisimpleir) ach pròiseact Python, agus faodaidh tu a chuir air dòigh mar a thogras tu, a’ toirt aire do mhion-fhiosrachadh a’ bhun-structair, meud sgioba agus riatanasan eile. Gu h-ionnsramaid tha a h-uile dad sìmplidh. Cleachd mar eisimpleir PyCharm + Git. Tha e mìorbhuileach agus gu math goireasach!

A-nis leig dhuinn sùil a thoirt air na prìomh bhuidhnean de Airflow. Le bhith a’ tuigsinn brìgh agus adhbhar, faodaidh tu an ailtireachd pròiseas agad a chuir air dòigh gu dòigheil. Is dòcha gur e am prìomh bhuidheann an Graf Acyclic Stiùirichte (ris an canar DAG an-seo).

DAG

Tha DAG na cheangal brìoghmhor de na gnìomhan agad a tha thu airson a choileanadh ann an sreath a tha air a mhìneachadh gu teann a rèir clàr-ama sònraichte. Tha Airflow a’ toirt seachad eadar-aghaidh lìn goireasach airson a bhith ag obair le DAGn agus buidhnean eile:

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Is dòcha gum bi an DAG a’ coimhead mar seo:

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Bidh an leasaiche, nuair a bhios e a’ dealbhadh DAG, a’ suidheachadh seata de ghnìomhaichean air an tèid gnìomhan taobh a-staigh an DAG a thogail. An seo thig sinn gu eintiteas cudromach eile: Gnìomhaiche Sruth-adhair.

Luchd-obrachaidh

Is e eintiteas a th’ ann an gnìomhaiche air a bheil na h-eisimpleirean obrach air an cruthachadh, a mhìnicheas dè a thachras nuair a thathar a’ coileanadh eisimpleir obrach. Sgaoileadh adhair bho GitHub mu thràth tha seata de ghnìomhaichean deiseil airson a chleachdadh. Eisimpleirean:

  • BashOperator - gnìomhaiche airson òrdugh bash a chuir an gnìomh.
  • PythonOperator - gnìomhaiche airson còd Python a ghairm.
  • EmailOperator - gnìomhaiche airson post-d a chuir.
  • HTTPOperator - gnìomhaiche airson obrachadh le iarrtasan http.
  • SqlOperator - gnìomhaiche airson còd SQL a chuir an gnìomh.
  • Tha Sensor na ghnìomhaiche airson feitheamh ri tachartas (ruigsinn na h-ùine riatanach, coltas an fhaidhle a tha a dhìth, loidhne san stòr-dàta, freagairt bhon API, msaa, msaa).

Tha gnìomhaichean nas sònraichte ann: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Faodaidh tu cuideachd gnìomhaichean a leasachadh stèidhichte air na feartan agad fhèin agus an cleachdadh sa phròiseact agad. Mar eisimpleir, chruthaich sinn MongoDBToHiveViaHdfsTransfer, gnìomhaiche airson sgrìobhainnean às-mhalairt bho MongoDB gu Hive, agus grunn ghnìomhaichean airson a bhith ag obair le Cliog Taigh: CHLoadFromHiveOperator agus CHTableLoaderOperator. Gu bunaiteach, cho luath ‘s a bhios pròiseact air còd a chleachdadh gu tric air a thogail air aithrisean bunaiteach, faodaidh tu smaoineachadh air a thogail ann an aithris ùr. Nì seo tuilleadh leasachaidh nas sìmplidhe, agus leudaichidh tu do leabharlann de ghnìomhaichean sa phròiseact.

An ath rud, feumar na gnìomhan sin uile a choileanadh, agus a-nis bruidhnidh sinn mun chlàr-ama.

Clàr-ama

Thathas a’ togail air clàr-obrach gnìomh Airflow soilire. Is e leabharlann Python a th’ ann an Celery a leigeas leat ciudha a chuir air dòigh a bharrachd air coileanadh gnìomhan asyncronach agus sgaoilte. Air taobh Airflow, tha a h-uile gnìomh air a roinn ann an amaran. Tha amaran air an cruthachadh le làimh. Mar as trice, is e an adhbhar aca an eallach obrach a bhith ag obair leis an stòr a chuingealachadh no gnìomhan taobh a-staigh an DWH a chomharrachadh. Faodar amaran a riaghladh tron ​​eadar-aghaidh lìn:

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Tha crìoch aig gach amar air an àireamh de shliotan. Nuair a bhios tu a’ cruthachadh DAG, thèid cruinneachadh a thoirt dha:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Faodar casg a chuir air amar a tha air a mhìneachadh aig ìre DAG aig ìre gnìomh.
Tha pròiseas air leth, Neach-clàraidh, an urra ri bhith a’ clàradh a h-uile gnìomh ann an Airflow. Gu fìrinneach, bidh Clàr-ama a’ dèiligeadh ris a h-uile meacanaig airson gnìomhan a shuidheachadh airson a chuir gu bàs. Bidh an obair a’ dol tro ghrunn ìrean mus tèid a chur gu bàs:

  1. Chaidh na gnìomhan roimhe seo a chrìochnachadh anns an DAG; faodar fear ùr a chiudha.
  2. Tha an ciudha air a rèiteachadh a rèir prìomhachas gnìomhan (faodar smachd a chumail air prìomhachasan cuideachd), agus ma tha slot an-asgaidh anns an amar, faodar a’ ghnìomh a chuir an gnìomh.
  3. Ma tha soilire oibrichean saor, tha an obair air a chuir thuige; bidh an obair a rinn thu prògramadh san duilgheadas a’ tòiseachadh, a’ cleachdadh gnìomhaiche aon no eile.

Simple gu leòr.

Bidh clàr-ama a’ ruith air an t-seata de gach DAG agus a h-uile gnìomh taobh a-staigh DAGn.

Airson an Neach-clàraidh tòiseachadh ag obair le DAG, feumaidh an DAG clàr-ama a shuidheachadh:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Tha seata de ro-òrdughan deiseil ann: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Faodaidh tu cuideachd abairtean cron a chleachdadh:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ceann-latha cur gu bàs

Gus tuigse fhaighinn air mar a tha Airflow ag obair, tha e cudromach tuigsinn dè a th’ ann an Ceann-latha Cur gu bàs airson DAG. Ann an Airflow, tha meud Ceann-latha Cur gu bàs aig DAG, ie, a rèir clàr-ama obrach an DAG, thathas a’ cruthachadh eisimpleirean gnìomh airson gach Ceann-latha Cur gu bàs. Agus airson gach Ceann-latha Cur gu bàs, faodar gnìomhan a choileanadh a-rithist - no, mar eisimpleir, faodaidh DAG obrachadh aig an aon àm ann an grunn Cinn-latha Cur gu bàs. Tha seo air a shealltainn gu soilleir an seo:

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Gu mì-fhortanach (no is dòcha gu fortanach: tha e an urra ris an t-suidheachadh), ma thèid buileachadh na h-obrach anns an DAG a cheartachadh, an uairsin leanaidh coileanadh anns a ’Cheann-latha Cur gu bàs roimhe a’ toirt aire do na h-atharrachaidhean. Tha seo math ma dh’ fheumas tu dàta ath-àireamhachadh anns na h-amannan a dh’ fhalbh a’ cleachdadh algairim ùr, ach tha e dona leis gu bheil ath-riochdachadh an toraidh air chall (gu dearbh, chan eil duine a’ cur dragh ort an dreach riatanach den chòd stòr a thilleadh bho Git agus obrachadh a-mach dè feumaidh tu aon uair, mar a tha feum agad air).

A 'cruthachadh ghnìomhan

Tha buileachadh an DAG na chòd ann am Python, agus mar sin tha dòigh gu math goireasach againn gus an ìre de chòd a lughdachadh nuair a bhios sinn ag obair, mar eisimpleir, le stòran biorach. Canaidh sinn gu bheil trì slatan MySQL agad mar stòr, feumaidh tu streap a-steach do gach fear agus beagan dàta a thogail. A bharrachd air an sin, gu neo-eisimeileach agus ann an co-shìnte. Is dòcha gum bi an còd Python anns an DAG a’ coimhead mar seo:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Tha an DAG a’ coimhead mar seo:

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Anns a 'chùis seo, faodaidh tu shard a chur ris no a thoirt air falbh le bhith dìreach ag atharrachadh nan roghainnean agus ag ùrachadh an DAG. Comhfhurtail!

Faodaidh tu cuideachd gineadh còd nas iom-fhillte a chleachdadh, mar eisimpleir, obair le stòran ann an cruth stòr-dàta no cunntas a thoirt air structar bùird, algairim airson obrachadh le clàr, agus, a’ toirt aire do fheartan bun-structair DWH, cruthaich pròiseas airson N bùird a luchdachadh a-steach don stòradh agad. No, mar eisimpleir, ag obair le API nach eil a’ toirt taic do bhith ag obair le paramadair ann an cruth liosta, faodaidh tu gnìomhan N a ghineadh ann an DAG bhon liosta seo, cuingealachadh a dhèanamh air co-shìnteachd iarrtasan san API gu amar, agus sgrìobadh. an dàta riatanach bhon API. Sùbailte!

tasgaidh

Tha a stòras backend fhèin aig Airflow, stòr-dàta (faodaidh MySQL no Postgres a bhith againn, tha Postgres againn), a bhios a’ stòradh staid ghnìomhan, DAGn, suidheachaidhean ceangail, caochladairean cruinneil, msaa, msaa. An seo bu mhath leam is urrainn dhomh a ràdh tha tasgadh ann an Airflow gu math sìmplidh (timcheall air 20 clàr) agus goireasach ma tha thu airson gin de na pròiseasan agad fhèin a thogail air a bharr. Tha cuimhne agam air na 100500 clàr ann an stòr Informatica, a dh’ fheumadh a bhith air a sgrùdadh airson ùine mhòr mus do thuig mi mar a thogas tu ceist.

Sgrùdadh

Leis cho sìmplidh ‘s a tha an stòr, faodaidh tu pròiseas sgrùdaidh gnìomh a thogail a tha iomchaidh dhut. Bidh sinn a’ cleachdadh notepad ann an Zeppelin, far am bi sinn a’ coimhead air inbhe ghnìomhan:

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Dh’ fhaodadh seo a bhith mar eadar-aghaidh lìn Airflow fhèin cuideachd:

Is e inneal a th’ ann an Airflow airson pròiseasan giollachd dàta baidse a leasachadh agus a chumail suas gu goireasach agus gu sgiobalta

Tha an còd Airflow na stòr fosgailte, agus mar sin tha sinn air rabhadh a chuir gu Telegram. Bidh gach eisimpleir ruith de ghnìomhachd, ma thachras mearachd, a ’spamadh a’ bhuidheann ann an Telegram, far a bheil an sgioba leasachaidh is taic gu lèir air a dhèanamh suas.

Bidh sinn a’ faighinn freagairt sgiobalta tro Telegram (ma tha feum air), agus tro Zeppelin gheibh sinn dealbh iomlan de ghnìomhan ann an Airflow.

Iomlan

Tha sruth-adhair sa mhòr-chuid stòr fosgailte, agus cha bu chòir dùil a bhith agad ri mìorbhuilean bhuaithe. Bi deiseil airson ùine agus oidhirp a chuir a-steach gus fuasgladh a chruthachadh a bhios ag obair. Tha an amas comasach, creid mi, is fhiach e e. Luas leasachaidh, sùbailteachd, furasta pròiseasan ùra a chur ris - còrdaidh e riut. Gu dearbh, feumaidh tu tòrr aire a thoirt do eagrachadh a 'phròiseict, seasmhachd an t-sruth-adhair fhèin: chan eil mìorbhailean a' tachairt.

A-nis tha Airflow againn ag obair gach latha mu 6,5 mìle gnìomh. Tha iad gu math eadar-dhealaichte ann an caractar. Tha gnìomhan ann airson dàta a luchdachadh a-steach don phrìomh DWH bho iomadh stòr eadar-dhealaichte agus gu math sònraichte, tha gnìomhan ann a bhith ag obrachadh a-mach aghaidhean stòr taobh a-staigh a ’phrìomh DWH, tha gnìomhan ann airson dàta fhoillseachadh gu DWH luath, tha mòran, mòran ghnìomhan eadar-dhealaichte - agus Airflow cagnaidh e suas iad uile o latha gu latha. A 'bruidhinn ann an àireamhan, tha seo 2,3 mìle Gnìomhan ELT de dhiofar iom-fhillteachd taobh a-staigh DWH (Hadoop), timcheall air. 2,5 ceud stòr-dàta stòran, is e sgioba bho 4 luchd-leasachaidh ETL, a tha air an roinn ann an giullachd dàta ETL ann an giullachd dàta DWH agus ELT taobh a-staigh DWH agus gu dearbh barrachd aon rianaire, a bhios a’ dèiligeadh ri bun-structar na seirbheis.

Planaichean airson an ama ri teachd

Tha e do-sheachanta gu bheil an àireamh de phròiseasan a’ fàs, agus is e am prìomh rud a bhios sinn a’ dèanamh a thaobh bun-structair Airflow a bhith a’ sgèileadh. Tha sinn airson cruinneachadh Airflow a thogail, paidhir chasan a riarachadh airson luchd-obrach Celery, agus ceann fèin-dhùblachadh a dhèanamh le pròiseasan clàr-obrach agus stòr-tasgaidh.

Epilogue

Chan e seo, gu dearbh, a h-uile dad a bu mhath leam innse mu Airflow, ach dh’ fheuch mi ris na prìomh phuingean a chomharrachadh. Thig blasad le ithe, feuch e agus còrdaidh e riut :)

Source: www.habr.com

Cuir beachd ann