Dia duit, Habr! San Airteagal seo ba mhaith liom labhairt faoi uirlis iontach amháin chun próisis phróiseála sonraí baisc a fhorbairt, mar shampla, i mbonneagar DWH corparáideach nó do DataLake. Déanfaimid labhairt faoi Apache Airflow (dá ngairfear Airflow anseo feasta). Ní thugtar aird go héagórach ar Habré, agus den chuid is mó déanfaidh mé iarracht a chur ina luí ort gur fiú féachaint ar Airflow ar a laghad agus sceidealóir á roghnú agat do do phróisis ETL/ELT.
Roimhe sin, scríobh mé sraith alt ar an ábhar DWH nuair a d'oibrigh mé ag Tinkoff Bank. Anois táim mar chuid d'fhoireann Mail.Ru Group agus tá ardán á fhorbairt agam le haghaidh anailíse sonraí sa réimse cearrbhachais. I ndáiríre, de réir mar a thagann nuacht agus réitigh suimiúla le feiceáil, labhróidh m’fhoireann agus mise anseo faoinár n-ardán le haghaidh anailísíochta sonraí.
Prologue
Mar sin, déanaimis tosú. Cad is Airflow? Leabharlann í seo (nó
Anois, déanaimis féachaint ar na príomh-eintitis Airflow. Trí thuiscint a fháil ar a mbunús agus a gcuspóir, is féidir leat d'ailtireacht próisis a eagrú go barrmhaith. B'fhéidir gurb é an Graf Aicmileach faoi Threoir (dá ngairfear DAG anseo feasta) an príomh-eintiteas.
GCM
Is éard is DAG ann ná ceangal bríoch éigin de do thascanna is mian leat a chur i gcrích i seicheamh docht sainithe de réir sceidil ar leith. Soláthraíonn Airflow comhéadan gréasáin áisiúil chun oibriú le DAGanna agus eintitis eile:
Seans go mbeidh cuma mar seo ar an DAG:
Le linn don fhorbróir DAG a dhearadh, leagann sé síos sraith oibreoirí ar a dtógfar tascanna laistigh den DAG. Anseo tagann muid chuig aonán tábhachtach eile: Oibreoir Aeir-Sreafa.
Oibreoirí
Is eintiteas é oibreoir ar a mbonn a chruthaítear cásanna poist, a chuireann síos ar cad a tharlóidh le linn post a chur i gcrích.
- BashOperator - oibreoir chun ordú bash a fhorghníomhú.
- PythonOperator - oibreoir chun glaoch ar chód Python.
- RíomhphostOibritheoir - oibreoir chun ríomhphost a sheoladh.
- HTTPOperator - oibreoir chun oibriú le hiarratais http.
- SqlOperator - oibreoir chun cód SQL a fhorghníomhú.
- Is oibreoir é Braiteoir chun fanacht le himeacht (teacht an ama riachtanach, cuma an chomhaid riachtanach, líne sa bhunachar sonraí, freagra ón API, etc., etc.).
Tá oibreoirí níos sainiúla ann: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Is féidir leat freisin oibreoirí a fhorbairt bunaithe ar do shaintréithe féin agus iad a úsáid i do thionscadal. Mar shampla, chruthaíomar MongoDBToHiveViaHdfsTransfer, oibreoir chun doiciméid a onnmhairiú ó MongoDB go Hive, agus roinnt oibreoirí chun oibriú le
Ansin, is gá na tascanna seo go léir a chur i gcrích, agus anois déanfaimid labhairt faoin sceidealóir.
Sceidealóir
Tógtar ar sceidealóir tascanna Airflow
Tá teorainn ag gach linn ar líon na sliotán. Agus DAG á chruthú, tugtar linn dó:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Is féidir linn a shainítear ag leibhéal DAG a shárú ag leibhéal an taisc.
Tá próiseas ar leith, Sceidealóir, freagrach as gach tasc san Aer-sreabhadh a sceidealú. I ndáiríre, déileálann an Sceidealóir leis na meicníochtaí go léir a bhaineann le tascanna a leagan síos le cur i gcrích. Téann an tasc trí roinnt céimeanna sula gcuirtear i gcrích é:
- Tá na tascanna roimhe seo curtha i gcrích sa DAG agus is féidir ceann nua a chur ar ciúáil.
- Déantar an scuaine a shórtáil ag brath ar thosaíocht na dtascanna (is féidir tosaíochtaí a rialú freisin), agus má tá sliotán saor in aisce sa chomhthiomsú, is féidir an tasc a chur i bhfeidhm.
- Má tá soilire oibrithe saor in aisce, seoltar an tasc chuige; tosaíonn an obair a chláraigh tú sa fhadhb, ag baint úsáide as oibreoir amháin nó eile.
Simplí go leor.
Ritheann an sceidealóir ar thacar gach DAG agus gach tasc laistigh de DAGs.
Chun go dtosóidh an Sceidealóir ag obair le DAG, ní mór don DAG sceideal a shocrú:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Tá sraith réamhshocruithe réamhdhéanta ann: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Is féidir leat slonn cron a úsáid freisin:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Dáta Forghníomhaithe
Chun tuiscint a fháil ar an gcaoi a n-oibríonn Airflow, tá sé tábhachtach a thuiscint cad é Dáta Forghníomhaithe DAG. I Sreabhadh Aeir, tá gné Dáta Forghníomhaithe ag DAG, i.e., ag brath ar sceideal oibre an DAG, cruthaítear cásanna tascanna do gach Dáta Forghníomhaithe. Agus do gach Dáta Forghníomhaithe, is féidir tascanna a ath-fhorghníomhú - nó, mar shampla, is féidir le DAG oibriú go comhuaineach i roinnt Dátaí Forghníomhaithe. Tá sé seo léirithe go soiléir anseo:
Ar an drochuair (nó b’fhéidir go fortunately: braitheann sé ar an gcás), má dhéantar cur i bhfeidhm an taisc sa DAG a cheartú, ansin leanfar le forghníomhú sa Dáta Forghníomhaithe roimhe seo agus na coigeartuithe á gcur san áireamh. Tá sé seo go maith más gá duit sonraí a athríomh ó thréimhsí roimhe seo ag baint úsáide as algartam nua, ach tá sé olc mar go bhfuil an atáirgtheacht an toradh caillte (ar ndóigh, aon duine bodhraigh tú a thabhairt ar ais ar an leagan riachtanach den chód foinse ó Git agus ríomh cad ní mór duit uair amháin, an bealach is gá duit é).
Tascanna a ghiniúint
Tá cur i bhfeidhm an DAG cód i Python, agus mar sin tá bealach an-áisiúil againn chun méid an chóid a laghdú nuair a bhíonn muid ag obair, mar shampla, le foinsí bearrtha. Ligean le rá go bhfuil trí shards MySQL agat mar fhoinse, ní mór duit dreapadh isteach i ngach ceann acu agus roinnt sonraí a phiocadh suas. Thairis sin, go neamhspleách agus go comhthreomhar. Seans go mbeidh cuma mar seo ar an gcód Python sa DAG:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
Breathnaíonn an DAG mar seo:
Sa chás seo, is féidir leat shard a chur leis nó a bhaint as ach na socruithe a choigeartú agus an DAG a nuashonrú. Compordach!
Is féidir leat giniúint cód níos casta a úsáid freisin, mar shampla, oibriú le foinsí i bhfoirm bunachar sonraí nó déan cur síos ar struchtúr tábla, algartam chun oibriú le tábla, agus, ag cur san áireamh gnéithe an bhonneagair DWH, próiseas a ghiniúint. chun N táblaí a lódáil isteach i do stór. Nó, mar shampla, ag obair le API nach dtacaíonn le bheith ag obair le paraiméadar i bhfoirm liosta, is féidir leat N tascanna a ghiniúint i DAG ón liosta seo, teorainn a chur le comhthreomhar na n-iarratas san API go linn, agus scrape na sonraí riachtanacha ón API. Solúbtha!
stór
Tá a stór inneall féin ag Airflow, bunachar sonraí (is féidir MySQL nó Postgres a bheith ann, tá Postgres againn), a stórálann staid na dtascanna, DAGanna, socruithe nasc, athróga domhanda, etc., etc. Anseo ba mhaith liom a rá go bhfuil an tá stór i Airflow an-simplí (thart ar 20 tábla) agus áisiúil más mian leat aon cheann de do phróisis féin a thógáil ar a bharr. Is cuimhin liom na 100500 tábla i stór Informatica, arbh éigean staidéar a dhéanamh orthu ar feadh i bhfad sula dtuigim conas fiosrúchán a thógáil.
Monatóireacht
Mar gheall ar simplíocht an stór, is féidir leat próiseas monatóireachta tasc a thógáil atá áisiúil duit. Bainimid úsáid as leabhar nótaí i Zeppelin, áit a bhreathnaíonn muid ar stádas na dtascanna:
D’fhéadfadh sé seo a bheith mar chomhéadan gréasáin Airflow féin freisin:
Is foinse oscailte é an cód Airflow, agus mar sin tá foláireamh curtha againn le Telegram. Déanann gach sampla reatha de thasc, má tharlaíonn earráid, an grúpa a thurscar in Telegram, áit a bhfuil an fhoireann forbartha agus tacaíochta ar fad comhdhéanta.
Faighimid freagra pras trí Telegram (más gá), agus trí Zeppelin faigheann muid pictiúr iomlán de thascanna in Airflow.
Ar an iomlán
Is foinse oscailte go príomha é sreabhadh aeir, agus níor cheart duit a bheith ag súil le míorúiltí uaidh. Bí réidh an t-am agus an iarracht a chur isteach chun réiteach a oibríonn a chruthú. Is féidir an sprioc a bhaint amach, creidim dom, is fiú é. Luas na forbartha, solúbthacht, éascaíocht próisis nua a chur leis - beidh tú in ann é a thaitin. Ar ndóigh, ní mór duit a lán aird a thabhairt ar eagrú an tionscadail, cobhsaíocht an Airflow féin: ní tharlaíonn míorúiltí.
Anois tá Airflow againn ag obair go laethúil thart ar 6,5 míle tascanna. Tá siad an-difriúil i gcarachtar. Tá tascanna ann maidir le sonraí a luchtú isteach sa phríomh-DWH ó go leor foinsí éagsúla agus an-sonrach, tá tascanna ann maidir le héadanais stórais a ríomh taobh istigh den phríomh-DWH, tá tascanna ann sonraí a fhoilsiú isteach i DWH tapa, tá go leor, go leor tascanna éagsúla - agus Airflow cogann sé suas iad go léir lá i ndiaidh lae. Ag labhairt di i líon, is é seo 2,3 mhíle Tascanna ELT le castacht éagsúil laistigh de DWH (Hadoop), thart ar. 2,5 céad bunachar sonraí foinsí, seo foireann ó 4 fhorbróir ETL, atá roinnte i bpróiseáil sonraí ETL i bpróiseáil sonraí DWH agus ELT taobh istigh DWH agus ar ndóigh níos mó riarthóir amháin, a dhéileálann le bonneagar na seirbhíse.
Pleananna don todhchaí
Tá líon na bpróiseas ag dul i méid gan dabht, agus is é an rud is mó a bheidh á dhéanamh againn ó thaobh bhonneagar an Aershreafa ná scálaithe. Ba mhaith linn braisle Airflow a thógáil, péire cosa a leithdháileadh d'oibrithe Soilire, agus ceann féin-dhúbailt a dhéanamh le próisis sceidealaithe poist agus stór.
Epilogue
Ní hé seo, ar ndóigh, gach rud ba mhaith liom a insint faoi Airflow, ach rinne mé iarracht aird a tharraingt ar na príomhphointí. Tagann goile le hithe, bain triail as agus taitneoidh sé leat :)
Foinse: will.com