Dia duit, is mise Dmitry Logvinenko - Innealtóir Sonraí de chuid na Roinne Analytics de ghrúpa cuideachtaí Vezet.
Inseoidh mé duit faoi uirlis iontach chun próisis ETL a fhorbairt - Apache Airflow. Ach tá Airflow chomh ilghnéitheach agus chomh ilghnéitheach sin gur chóir duit breathnú níos géire air fiú mura bhfuil baint agat le sreafaí sonraí, ach go bhfuil gá agat próisis ar bith a sheoladh go tréimhsiúil agus monatóireacht a dhéanamh ar a gcur i gcrích.
Agus tá, ní inseoidh mé ní hamháin, ach freisin taispeánfaidh mé: tá go leor cód, screenshots agus moltaí ag an gclár.
An rud a fheiceann tú de ghnáth nuair a dhéanann tú google an focal Airflow / Wikimedia Commons
- ach níos fearr, agus rinneadh é chun críocha go hiomlán difriúil, eadhon (mar atá sé scríofa roimh an kata):
tascanna a rith agus monatóireacht a dhéanamh ar líon neamhtheoranta meaisíní (mar a cheadóidh go leor Soilire / Kubernetes agus do choinsiasa duit)
le giniúint sreabhadh oibre dinimiciúil ó chód Python an-éasca a scríobh agus a thuiscint
agus an cumas aon bhunachair shonraí agus APIanna a nascadh lena chéile trí úsáid a bhaint as comhpháirteanna réamhdhéanta agus forlíontáin de dhéantús baile (rud atá thar a bheith simplí).
Úsáidimid Apache Airflow mar seo:
bailímid sonraí ó fhoinsí éagsúla (go leor cásanna SQL Server agus PostgreSQL, APIanna éagsúla le méadracht feidhmchláir, fiú 1C) in DWH agus ODS (tá Vertica agus Clickhouse againn).
conas chun cinn cron, a chuireann tús leis na próisis chomhdhlúthaithe sonraí ar ODS, agus a dhéanann monatóireacht ar a gcothabháil freisin.
Go dtí le déanaí, bhí ár gcuid riachtanas clúdaithe ag freastalaí beag amháin le 32 cores agus 50 GB RAM. In Airflow, oibríonn sé seo:
níos mó 200 dag (sreafaí oibre i ndáiríre, inar líonta muid tascanna),
i ngach ar an meán 70 tasc,
tosaíonn an mhaitheas seo (ar an meán freisin) uair an chloig.
Agus faoin gcaoi ar mhéadaigh muid, scríobhfaidh mé thíos, ach anois déanaimis sainmhíniú ar an bhfadhb über a réiteoimid:
Tá trí Fhreastalaí SQL bunaidh, gach ceann acu le 50 bunachar sonraí - cásanna de thionscadal amháin, faoi seach, tá an struchtúr céanna acu (beagnach i ngach áit, mua-ha-ha), rud a chiallaíonn go bhfuil tábla Orduithe ag gach ceann acu (go fortunately, tábla leis sin is féidir ainm a bhrú isteach in aon ghnó). Glacaimid na sonraí trí réimsí seirbhíse (freastalaí foinse, bunachar sonraí foinse, ID tasc ETL) a chur leis agus caithimid go naive iad, abair, Vertica.
A ligean ar dul!
An phríomhchuid, praiticiúil (agus beagán teoiriciúil)
Cén fáth a bhfuil muid (agus tú)
Nuair a bhí na crainn mór agus bhí mé simplí SQL-schik i miondíol Rúisis amháin, rinneamar próisis ETL a scamadh mar a chéile le sreafaí sonraí ag baint úsáide as dhá uirlis atá ar fáil dúinn:
Lárionad Cumhachta Informatica - córas thar a bheith scaipthe, thar a bheith táirgiúil, lena crua-earraí féin, a leagan féin. Bhain mé úsáid as Dia forbid 1% dá cumais. Cén fáth? Bhuel, ar an gcéad dul síos, chuir an comhéadan seo, áit éigin ó na 380í, brú orainn go meabhrach. Ar an dara dul síos, tá an contraption seo deartha le haghaidh próisis an-mhaisiúla, athúsáid comhpháirte buile agus cleasanna eile atá thar a bheith tábhachtach don fhiontar. Maidir leis an méid a chosnaíonn sé, cosúil le sciathán an Airbus AXNUMX / bliain, ní déarfaimid rud ar bith.
Bí ar an airdeall, is féidir le screenshot daoine faoi 30 a ghortú beagán
Freastalaí Comhtháthaithe SQL Server - d'úsáideamar an comrádaí seo inár sreafaí laistigh den tionscadal. Bhuel, i ndáiríre: úsáidimid SQL Server cheana féin, agus bheadh sé míréasúnta ar bhealach éigin gan a chuid uirlisí ETL a úsáid. Tá gach rud go maith: tá an comhéadan álainn araon, agus tuairiscíonn an dul chun cinn ... Ach ní hé seo an fáth go bhfuil grá againn ar tháirgí bogearraí, ó, ní le haghaidh seo. Leagan é dtsx (a bhfuil XML le nóid shuffled ar shábháil) is féidir linn, ach cad é an pointe? Cad faoi thasc-phacáiste a dhéanamh a tharraingeoidh na céadta tábla ó fhreastalaí amháin go freastalaí eile? Sea, cad céad, beidh do mhéar innéacs titim amach as fiche píosaí, cliceáil ar an cnaipe luiche. Ach is cinnte go bhféachann sé níos faiseanta:
Is cinnte gur lorg muid bealaí amach. Cás fiú beagnach tháinig go dtí gineadóir pacáiste SSIS féinscríofa ...
…agus ansin fuair post nua mé. Agus rug Apache Airflow orm air.
Nuair a fuair mé amach go bhfuil cur síos ar phróiseas ETL cód simplí Python, ní raibh mé ag damhsa le haghaidh áthas. Seo é an chaoi a ndearnadh leaganacha agus difríocht de shruthanna sonraí, agus tháinig sé chun bheith ina ábhar de chód Python i gceann go leith nó dhá scáileán 13” táblaí le struchtúr amháin a dhoirteadh ó na céadta bunachar sonraí isteach i sprioc amháin.
Cnuasach an bhraisle
Ná déanaimis kindergarten go hiomlán a shocrú, agus gan labhairt faoi rudaí go hiomlán soiléir anseo, cosúil le Airflow a shuiteáil, do bhunachar sonraí roghnaithe, Soilire agus cásanna eile a thuairiscítear sna duganna.
Ionas gur féidir linn turgnaimh a thosú láithreach, rinne mé sceitse docker-compose.yml ina bhfuil:
A ligean ar a ardú i ndáiríre Sruth aeir: Sceidealóir, Freastalaí Gréasáin. Beidh Flower ag sníomh ann freisin chun monatóireacht a dhéanamh ar thascanna Soilire (toisc go bhfuil sé curtha isteach cheana féin apache/airflow:1.10.10-python3.7, ach is cuma linn)
PostgreSQL, ina scríobhfaidh Airflow a chuid faisnéise seirbhíse (sonraí sceidealaithe, staitisticí forghníomhaithe, etc.), agus marcálfaidh Soilire tascanna críochnaithe;
Redis, a fheidhmeoidh mar bhróicéir tasc do Soilire;
Oibrí soilire, a bheidh ag gabháil do chur i gcrích díreach cúraimí.
Go fillteán ./dags cuirfimid ár gcomhaid leis an tuairisc ar dags. Déanfar iad a phiocadh suas ar an eitilt, mar sin ní gá a juggle an chairn iomlán tar éis gach sraothartach.
I roinnt áiteanna, nach bhfuil an cód sna samplaí a thaispeáint go hiomlán (ionas nach a tranglam suas an téacs), ach áit éigin tá sé modhnaithe sa phróiseas. Is féidir samplaí de chóid oibre iomlána a fháil sa stór https://github.com/dm-logv/airflow-tutorial.
I gcomhthionól an chomhdhéanamh, bhí mé ag brath go mór ar an íomhá aitheanta go maith puclach/dugaire-aersreabh - bí cinnte é a sheiceáil. B'fhéidir nach bhfuil aon rud eile ag teastáil uait i do shaol.
Tá gach socrú Airflow ar fáil ní hamháin tríd airflow.cfg, ach freisin trí athróga timpeallachta (a bhuíochas leis na forbróirí), ar bhain mé leas mailíseach astu.
Ar ndóigh, níl sé réidh le táirgeadh: níor chuir mé buille croí ar choimeádáin d'aon ghnó, níor bhac mé le slándáil. Ach rinne mé an t-íosmhéid oiriúnach dár turgnamhóirí.
Tabhair faoi deara:
Caithfidh an fillteán dag a bheith inrochtana don sceidealóir agus do na hoibrithe araon.
Baineann an rud céanna le gach leabharlann tríú páirtí - ní mór iad go léir a shuiteáil ar mheaisíní le sceidealóir agus oibrithe.
Bhuel, anois tá sé simplí:
$ docker-compose up --scale worker=3
Tar éis gach rud a ardú, is féidir leat breathnú ar na comhéadain gréasáin:
Más rud é nár thuig tú aon rud sna “dags” seo go léir, seo chugainn foclóir gairid:
Sceidealóir - an t-uncail is tábhachtaí i Airflow, ag rialú go n-oibríonn robots go crua, agus ní duine: déanann sé monatóireacht ar an sceideal, nuashonraíonn dags, seolann sé tascanna.
Go ginearálta, i leaganacha níos sine, bhí fadhbanna aige le cuimhne (níl, ní amnesia, ach sceitheadh) agus d'fhan an paraiméadar oidhreachta fiú sna cumraíochtaí run_duration - a eatramh atosú. Ach anois tá gach rud go breá.
GCM (aka "dag") - "graf aicyclic faoi threoir", ach is beag duine a inseoidh sainmhíniú den sórt sin, ach i ndáiríre is coimeádán é le haghaidh tascanna a idirghníomhaíonn lena chéile (féach thíos) nó analóg de Phacáiste i SSIS agus Sreabhadh Oibre in Informatica .
Chomh maith le daga, d'fhéadfadh go mbeadh fo-dhuillí ann fós, ach is dócha nach n-éireoidh linn iad.
Rith DAG - dag tosaithe, a sanntar a chuid féin execution_date. Is féidir le dagrans den dag céanna oibriú ag an am céanna (má tá do thascanna déanta agat gan mhoill, ar ndóigh).
oibreoir is píosaí cód iad atá freagrach as gníomh sonrach a dhéanamh. Tá trí chineál oibreoirí ann:
gníomhcosúil lenár is fearr leat PythonOperator, ar féidir leo aon chód Python (bailí) a fhorghníomhú;
aistriú, a iompraíonn sonraí ó áit go háit, abair, MsSqlToHiveTransfer;
braiteoir ar an láimh eile, ligfidh sé duit freagairt nó mhoilliú a dhéanamh ar fhorghníomhú breise an dag go dtí go dtarlaíonn teagmhas. HttpSensor is féidir leis an críochphointe sonraithe a tharraingt, agus nuair a bhíonn an freagra inmhianaithe ag fanacht, cuir tús leis an aistriú GoogleCloudStorageToS3Operator. Fiafróidh meon fiosrach: “cén fáth? Tar éis an tsaoil, is féidir leat athrá a dhéanamh díreach san oibreoir!” Agus ansin, ionas nach gcuirfear bac ar an linn tascanna le hoibreoirí ar fionraí. Tosaíonn an braiteoir, seiceálann agus bás roimh an gcéad iarracht eile.
Tasc - déantar oibreoirí dearbhaithe, beag beann ar an gcineál, agus atá ceangailte leis an dag a ardú go céim an taisc.
shampla tasc - nuair a chinn an pleanálaí ginearálta go raibh sé in am tascanna a chur chun catha ar taibheoirí-oibrithe (ar an bpointe, má úsáidimid LocalExecutor nó chuig nód iargúlta i gcás CeleryExecutor), sannann sé comhthéacs dóibh (i.e., sraith athróg - paraiméadair forghníomhaithe), leathnaíonn sé teimpléid ordaithe nó fiosrúcháin, agus comhthiomsaíonn sé iad.
Gineann muid tascanna
Ar dtús, déanaimis breac-chuntas ar scéim ghinearálta ár doug, agus ansin déanfaimid níos mó agus níos mó isteach ar na sonraí, toisc go gcuirimid roinnt réitigh neamhfhánacha i bhfeidhm.
Mar sin, sa bhfoirm is simplí, beidh cuma mar seo ar a leithéid de dag:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Déanaimis é a dhéanamh amach:
Gcéad dul síos, allmhairímid na libs riachtanacha agus rud éigin eile;
sql_server_ds - An bhfuil List[namedtuple[str, str]] le hainmneacha na nasc ó Airflow Connections agus na bunachair shonraí óna dtógfaimid ár bpláta;
dag - an fógra ár dag, ní mór a bheith riachtanach i globals(), nó ní bhfaighidh Airflow é. Ní mór do Doug a rá freisin:
Cad is ainm dó orders - beidh an t-ainm seo le feiceáil sa chomhéadan gréasáin ansin,
go n-oibreoidh sé ó mheadhon oidhche an t-ochtmhadh d'Iúl,
agus ba chóir é a rith, thart ar gach 6 uair an chloig (do guys diana anseo in ionad timedelta() inghlactha cron-líne 0 0 0/6 ? * * *, le haghaidh an níos lú fionnuar - slonn mar @daily);
workflow() Déanfaidh an príomh-jab, ach ní anois. Faoi láthair, ní dhéanfaimid ach ár gcomhthéacs a dhumpáil isteach sa loga.
Agus anois an draíocht simplí a bhaineann le tascanna a chruthú:
rithimid tríd ár bhfoinsí;
tosaigh PythonOperator, a fhorghníomhóidh ár Caochadán workflow(). Ná déan dearmad ainm uathúil (laistigh den dag) don tasc a shonrú agus an dag féin a cheangal. Bratach provide_context ina dhiaidh sin, Doirt argóintí breise isteach an fheidhm, a bheidh muid a bhailiú go cúramach ag baint úsáide as **context.
Go dtí seo, sin é go léir. Cad a fuair muid:
dag nua sa chomhéadan gréasáin,
céad go leith tasc a dhéanfar go comhthreomhar (má cheadaíonn na socruithe Airflow, Soilire agus cumas an fhreastalaí é).
Bhuel, fuair beagnach é.
Cé a shuiteáilfidh na spleáchais?
Chun an rud ar fad seo a shimpliú, chuaigh mé isteach docker-compose.yml próiseáil requirements.txt ar gach nóid.
Anois tá sé imithe:
Is tascanna iad cearnóga liatha a phróiseálann an sceidealóir.
Fanann muid beagán, gearrann na hoibrithe na tascanna:
Tá na cinn glas, ar ndóigh, tar éis a gcuid oibre a chríochnú go rathúil. Níl an-rath ar Reds.
Dála an scéil, níl aon fhillteán ar ár dtáirgí ./dags, níl aon sioncrónú idir meaisíní - luíonn gach dags isteach git ar ár Gitlab, agus dáileann Gitlab CI nuashonruithe ar mheaisíní agus iad á gcumasc isteach master.
Beagán faoi Flower
Cé go bhfuil na hoibrithe ag caoineadh ár n-pacifiers, cuimhnigh ar uirlis eile a fhéadfaidh rud éigin a thaispeáint dúinn - Flower.
An chéad leathanach ina bhfuil faisnéis achomair ar nóid oibrithe:
An leathanach is déine le tascanna a chuaigh ag obair:
An leathanach is leadránach le stádas ár mbróicéir:
Tá an leathanach is gile le graif stádais tascanna agus a gcuid ama cur i gcrích:
Táimid luchtaithe an underloaded
Mar sin, d'oibrigh na tascanna go léir amach, is féidir leat an lucht créachtaithe a dhéanamh.
Agus bhí go leor lucht créachtaithe - ar chúis amháin nó eile. I gcás úsáid cheart a bhaint as Airflow, léiríonn na cearnóga céanna seo nach raibh na sonraí cinnte.
Ní mór duit féachaint ar an logáil agus atosú ar na cásanna tasc tite.
Trí chliceáil ar aon chearnóg, feicfimid na gníomhartha atá ar fáil dúinn:
Is féidir leat a ghlacadh agus a dhéanamh Glan an tite. Is é sin, déanaimid dearmad go bhfuil rud éigin teipthe ann, agus beidh an tasc ásc céanna téigh go dtí an sceidealóir.
Is léir nach bhfuil sé an-daonnúil é seo a dhéanamh leis an luch leis na cearnóga dearga go léir - ní hé seo an rud a mbeimid ag súil leis ó Airflow. Ar ndóigh, tá airm ollscriosta againn: Browse/Task Instances
Roghnaigh muid gach rud ag an am céanna agus athshocróimid go nialas, cliceáil ar an mír cheart:
Tar éis a ghlanadh, breathnaíonn ár tacsaithe mar seo (tá siad ag fanacht leis an sceidealóir iad a sceidealú cheana féin):
Naisc, crúcaí agus athróga eile
Tá sé in am breathnú ar an gcéad DAG eile, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
An ndearna gach duine nuashonrú tuairisce riamh? Seo í arís: tá liosta foinsí ann óna bhfaightear na sonraí; tá liosta cá háit le cur; ná déan dearmad honk nuair a tharla nó a bhris gach rud (bhuel, níl sé seo fúinn, níl).
A ligean ar dul tríd an comhad arís agus breathnú ar an stuif nua doiléir:
from commons.operators import TelegramBotSendMessage - ní chuireann aon rud cosc orainn ár n-oibreoirí féin a dhéanamh, rud a bhaineamar leas as trí fhillteán beag a dhéanamh chun teachtaireachtaí a sheoladh chuig Unblocked. (Beidh muid ag caint níos mó faoin oibreoir seo thíos);
default_args={} - is féidir le dag na hargóintí céanna a dháileadh ar a cuid oibreoirí go léir;
to='{{ var.value.all_the_kings_men }}' - Gort to ní bheidh hardcoded againn, ach ginte go dinimiciúil ag baint úsáide as Jinja agus athróg le liosta de na ríomhphoist, a chuir mé go cúramach i Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — coinníoll chun an t-oibreoir a thosú. Is é ár gcás, beidh an litir ag eitilt go dtí an bosses ach amháin má tá gach spleáchas oibriú amach go rathúil;
tg_bot_conn_id='tg_main' - argóintí conn_id glacadh le haitheantais naisc a chruthaímid iontu Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - ní bheidh teachtaireachtaí i Telegram eitilt ar shiúl ach amháin má tá tascanna tite;
task_concurrency=1 - cuirimid cosc ar roinnt cásanna de thasc amháin a sheoladh go comhuaineach. Seachas sin, gheobhaidh muid an seoladh comhuaineach roinnt VerticaOperator (ag féachaint ar bhord amháin);
report_update >> [email, tg] - go léir VerticaOperator teacht le chéile agus litreacha agus teachtaireachtaí á seoladh, mar seo:
Ach ós rud é go bhfuil coinníollacha seolta éagsúla ag oibreoirí fógra, ní oibreoidh ach duine amháin. Sa Tree View, tá cuma beagán níos lú ar gach rud:
Beidh mé ag rá cúpla focal faoi macraí agus a gcairde - athróga.
Is áitshealbhóirí Jinja iad Macraí ar féidir leo faisnéis úsáideach éagsúla a chur in áit argóintí oibreoirí. Mar shampla, mar seo:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} leathnófar go dtí ábhar na hathróige comhthéacs é execution_date san fhormáid YYYY-MM-DD: 2020-07-14. Is é an chuid is fearr ná go gcuirtear athróga comhthéacs in iúl do thasc ar leith (cearnóg sa Tree View), agus nuair a atosófar iad, leathnóidh na sealbhóirí áite chuig na luachanna céanna.
Is féidir na luachanna sannta a fheiceáil ag baint úsáide as an gcnaipe Rindreáilte ar gach tasc ásc. Seo mar a bhí an tasc le litir a sheoladh:
Agus mar sin ag an tasc le teachtaireacht a sheoladh:
Tá liosta iomlán macraí ionsuite don leagan is déanaí atá ar fáil ar fáil anseo: tagairt macraí
Thairis sin, le cabhair ó fhorlíontáin, is féidir linn ár macraí féin a dhearbhú, ach sin scéal eile.
Chomh maith leis na rudaí réamhshainithe, is féidir linn luachanna ár n-athróg a chur in ionad (d'úsáid mé é seo cheana féin sa chód thuas). A ligean ar a chruthú i Admin/Variables cupla rud:
bain úsáid as an cosán go dtí an eochair atá ag teastáil: {{ var.json.bot_config.bot.token }}.
Déarfaidh mé focal amháin go litriúil agus taispeánfaidh mé seat amháin faoi naisc. Tá gach rud bunrang anseo: ar an leathanach Admin/Connections cruthaímid nasc, cuirimid ár logins / pasfhocail agus paraiméadair níos sainiúla ann. Mar seo:
Is féidir pasfhocail a chriptiú (níos cruinne ná an réamhshocrú), nó is féidir leat an cineál ceangail a fhágáil amach (mar a rinne mé do tg_main) - is é an bhfíric go bhfuil an liosta de na cineálacha hardwired i múnlaí Airflow agus ní féidir a leathnú gan dul isteach ar na cóid foinse (más rud é go tobann ní raibh mé google rud éigin, le do thoil ceart dom), ach ní bheidh aon rud a stopadh dúinn ó creidmheasanna a fháil díreach ag ainm.
Is féidir leat a dhéanamh freisin naisc éagsúla leis an ainm céanna: sa chás seo, an modh BaseHook.get_connection(), a fhaigheann naisc de réir ainm dúinn, a thabhairt randamach ó roinnt ainmneacha (bheadh sé níos loighciúil Babhta Robin a dhéanamh, ach fágaimis é ar choinsias na bhforbróirí Airflow).
Is cinnte gur uirlisí fionnuara iad Athróga agus Naisc, ach tá sé tábhachtach gan an t-iarmhéid a chailleadh: cé na codanna de do shreabhadh a stórálann tú sa chód féin, agus na codanna a thugann tú do Airflow le haghaidh stórála. Ar thaobh amháin, is féidir go mbeadh sé áisiúil an luach a athrú go tapa, mar shampla, bosca poist, tríd an Chomhéadain. Ar an láimh eile, tá sé seo fós ar ais chuig an cliceáil luiche, as a raibh muid (mé) ag iarraidh a fháil réidh.
Tá oibriú le naisc ar cheann de na tascanna crúcaí. Go ginearálta, is pointí iad crúcaí Airflow chun é a nascadh le seirbhísí agus leabharlanna tríú páirtí. m.sh., JiraHook osclóidh cliant dúinn idirghníomhú le Jira (is féidir leat tascanna a bhogadh ar ais agus amach), agus le cabhair ó SambaHook is féidir leat comhad áitiúil a bhrú chun smb-pointe.
An t-oibreoir saincheaptha a pharsáil
Agus tháinig gar dúinn féachaint ar conas a dhéantar é TelegramBotSendMessage
Cód commons/operators.py leis an oibreoir iarbhír:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Anseo, cosúil le gach rud eile in Airflow, tá gach rud an-simplí:
Oidhreacht ó BaseOperator, a chuireann go leor rudaí a bhaineann go sonrach le Aer-Shreabhadh i bhfeidhm (féach ar do chuid fóillíochta)
Réimsí dearbhaithe template_fields, ina mbeidh Jinja lorg Macraí a phróiseáil.
D'eagraigh na hargóintí cearta ar son __init__(), socraigh na mainneachtainí nuair is gá.
Ní dhearna muid dearmad faoi thúsú an sinsear ach an oiread.
D'oscail an hook comhfhreagrach TelegramBotHookfuair rud cliant uaidh.
Modh sáraithe (athshainithe). BaseOperator.execute(), a dhéanfaidh Airfow twitch nuair a thagann an t-am chun an t-oibreoir a sheoladh - ann cuirfimid an príomhghníomh i bhfeidhm, ag dearmad logáil isteach. (Logaimid isteach, dála an scéil, ceart isteach stdout и stderr - Déanfaidh aer-sreabhadh gach rud a thascradh, é a fhilleadh go hálainn, é a dhianscaoileadh nuair is gá.)
A ligean ar a fheiceáil cad atá againn commons/hooks.py. An chéad chuid den chomhad, leis an duán féin:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Níl a fhios agam fiú cad atá le míniú anseo, ní dhéanfaidh mé ach na pointí tábhachtacha a thabhairt faoi deara:
Déanaimid oidhreacht, smaoinímid ar na hargóintí - i bhformhór na gcásanna beidh sé ar cheann: conn_id;
Sárú ar mhodhanna caighdeánacha: chuir mé teorainn orm féin get_conn(), ina bhfaighidh mé na paraiméadair nasc de réir ainm agus díreach an t-alt a fháil extra (réimse JSON é seo), inar chuir mé (de réir mo threoracha féin!) an comhartha bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Cruthaím sampla dár TelegramBot, rud a thugann comhartha sonrach dó.
Sin é an méid. Is féidir leat cliant a fháil ó Hook ag baint úsáide as TelegramBotHook().clent nó TelegramBotHook().get_conn().
Agus an dara cuid den chomhad, ina ndéanaim microwrapper don Telegram REST API, ionas nach tarraingim an rud céanna python-telegram-bot ar mhodh amháin sendMessage.
Is é an bealach ceart é a shuimiú: TelegramBotSendMessage, TelegramBotHook, TelegramBot - sa bhreiseán, cuir i stór poiblí é, agus tabhair chuig Foinse Oscailte é.
Agus muid ag déanamh staidéir air seo go léir, d’éirigh lenár nuashonruithe tuairisce theip go rathúil agus chuir siad teachtaireacht earráide chugam sa chainéal. Táim chun seiceáil féachaint an bhfuil sé mícheart ...
Bhris rud éigin inár madra! Nach é sin a rabhamar ag súil leis? Díreach!
An bhfuil tú ag dul a dhoirteadh?
An mbraitheann tú gur chaill mé rud éigin? Dealraíonn sé gur gheall sé sonraí a aistriú ó SQL Server go Vertica, agus ansin thóg sé é agus bhog sé as an ábhar, an scoundrel!
Bhí an t-uafás seo d'aon ghnó, ní raibh orm ach téarmaíocht éigin a mhíniú duit. Anois is féidir leat dul níos faide.
Ba é seo an plean a bhí againn:
Déan dag
Gin tascanna
Féach cé chomh hálainn atá gach rud
Sann uimhreacha seisiúin le líonadh
Faigh sonraí ó SQL Server
Cuir sonraí isteach i Vertica
Bailigh staitisticí
Mar sin, chun é seo a chur ar bun agus a reáchtáil, chuir mé beagán breise lenár gcuid docker-compose.yml:
Vertica mar óstach dwh leis na socruithe is réamhshocraithe,
trí chás de fhreastalaí SQL,
líonaimid na bunachair shonraí sa dara ceann le roinnt sonraí (ní breathnú isteach i gcás ar bith mssql_init.py!)
Seolann muid gach rud go maith le cabhair ó ordú beagán níos casta ná an uair dheireanach:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Cad a ghin ár randomizer miracle, is féidir leat úsáid a bhaint as an mír Data Profiling/Ad Hoc Query:
Is é an rud is mó ná é a thaispeáint d'anailísithe
mionléiriú ar Seisiúin ETL Ní dhéanfaidh mé, tá gach rud fánach ansin: déanaimid bonn, tá comhartha ann, fillteann muid gach rud le bainisteoir comhthéacs, agus anois déanaimid é seo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
seisiún.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Tá an t-am tagtha ár sonraí a bhailiú ónár gcéad go leith tábla. Déanaimis é seo le cabhair ó línte an-unpretentious:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Le cabhair ó Hook a fháil againn ó Airflow pymssql-ceangal
Cuir srian i bhfoirm dáta isteach san iarratas - cuirfidh an t-inneall teimpléad isteach é san fheidhm.
Ár n-iarratas a bheathú pandasa gheobhaidh sinn DataFrame - beidh sé úsáideach dúinn amach anseo.
Tá ionadú á úsáid agam {dt} in ionad paraiméadar iarratais %s ní toisc gur Pinocchio olc mé, ach toisc pandas Ní féidir a láimhseáil pymssql agus duillíní an ceann deireanach params: Listcé gur mian leis i ndáiríre tuple.
Chomh maith leis sin faoi deara go bhfuil an forbróir pymssql chinn gan tacaíocht a thabhairt dó níos mó, agus tá sé in am chun bogadh amach pyodbc.
Feicfimid cad a líontar argóintí ár bhfeidhmeanna le Airflow:
Mura bhfuil aon sonraí ann, níl aon phointe ann leanúint ar aghaidh. Ach tá sé aisteach freisin an líonadh a mheas go rathúil. Ach ní botún é seo. A-ah-ah, cad atá le déanamh?! Agus seo cad:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException insíonn sé do Airflow nach bhfuil aon earráidí, ach ní dhéanaimid an tasc. Ní bheidh cearnach glas nó dearg ag an gcomhéadan, ach bándearg.
ID ár seisiún tuilte (beidh sé difriúil do gach tasc),
A hash ón bhfoinse agus ID ordú - ionas go mbeidh sa bhunachar sonraí deiridh (ina bhfuil gach rud poured isteach tábla amháin) ní mór dúinn ID ordú uathúil.
Tá an chéim leathdhéanach fós: doirt gach rud isteach i Vertica. Agus, rud aisteach go leor, is é CSV ceann de na bealaí is iontach agus is éifeachtaí chun é seo a dhéanamh!
Ar an díolachán, cruthaímid an spriocphláta de láimh. Anseo cheadaigh mé meaisín beag dom féin:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Tá mé ag úsáid VerticaOperator() Cruthaím scéimre bunachar sonraí agus tábla (mura bhfuil siad ann cheana féin, ar ndóigh). Is é an rud is mó ná na spleáchais a shocrú i gceart:
- Bhuel, - a dúirt an luch beag, - nach bhfuil, anois
An bhfuil tú cinnte gurb mise an t-ainmhí is uafásach san fhoraois?
Julia Donaldson, An Gruffalo
I mo thuairimse, dá mbeadh mo chomhghleacaithe agus mé comórtas: cé a chruthú agus a sheoladh go tapa próiseas ETL ó scratch: siad lena SSIS agus luch agus mé le Airflow ... Agus ansin ba mhaith linn a chur i gcomparáid freisin ar an éascaíocht cothabhála ... Wow, sílim go n-aontóidh tú go mbuafaidh mé iad ar gach taobh!
Más rud é beagán níos dáiríre, ansin rinne Apache Airflow - trí chur síos a dhéanamh ar phróisis i bhfoirm cód cláir - mo phost i bhfad níos compordaí agus níos taitneamhaí.
Tugann a shíneadh neamhtheoranta, i dtéarmaí breiseán agus réamhshuíomh le hinscálaitheacht, an deis duit Airflow a úsáid i mbeagnach aon réimse: fiú amháin sa timthriall iomlán de bhailiú, ullmhú agus próiseáil sonraí, fiú agus roicéad á seoladh (go Mars, de. cúrsa).
Cuid deiridh, tagairt agus faisnéis
An raic atá bailithe againn duit
start_date. Sea, is meme áitiúil é seo cheana féin. Trí phríomh-argóint Doug start_date pas ar fad. Go hachomair, má shonraíonn tú i start_date dáta reatha, agus schedule_interval - lá amháin, ansin beidh DAG ag tosú amárach tráth nach luaithe.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Agus gan fadhbanna níos mó.
Tá earráid ama rite eile bainteach leis: Task is missing the start_date parameter, rud a léiríonn go minic go ndearna tú dearmad ceangal a dhéanamh leis an oibreoir dag.
Go léir ar mheaisín amháin. Sea, agus bunanna (Airflow féin agus ár sciath), agus freastalaí gréasáin, agus sceidealóir, agus oibrithe. Agus d'oibrigh sé fiú. Ach le himeacht ama, d'fhás líon na dtascanna do sheirbhísí, agus nuair a thosaigh PostgreSQL ag freagairt don innéacs i 20 s in ionad 5 ms, thógamar é agus thugamar ar shiúl é.
Seiceadóir Áitiúil. Sea, táimid fós inár suí air, agus tá muid tagtha go dtí imeall an duibheagáin cheana féin. Is leor LocalExecutor dúinn go dtí seo, ach anois tá sé in am leathnú le hoibrí amháin ar a laghad, agus beidh orainn oibriú go crua chun bogadh go CeleryExecutor. Agus i bhfianaise gur féidir leat oibriú leis ar mheaisín amháin, ní chuireann aon rud bac ort Soilire a úsáid fiú ar fhreastalaí, rud “ar ndóigh, ní rachaidh sé isteach i dtáirgeadh, go hionraic!”
Neamhúsáid uirlisí ionsuite:
Naisc chun dintiúir seirbhíse a stóráil,
SLA chailleann freagairt do thascanna nár oibrigh amach in am,
xcom le haghaidh malartú meiteashonraí (a dúirt mé meiteadata!) idir thascanna dag.
Mí-úsáid ríomhphoist. Bhuel, cad is féidir liom a rá? Socraíodh foláirimh le haghaidh gach athrá ar thascanna tite. Anois tá >90k r-phoist ag Gmail ó Airflow, agus diúltaíonn an muzzle ríomhphoist gréasáin níos mó ná 100 a phiocadh suas agus a scriosadh ag an am céanna.
Ionas gur féidir linn oibriú níos mó fós lenár gcinn agus ní lenár lámha, d’ullmhaigh Airflow dúinn é seo:
REST API - tá stádas Turgnamhach fós aige, rud nach gcuireann cosc air oibriú. Leis, ní hamháin gur féidir leat eolas a fháil faoi dags agus faoi thascanna, ach freisin stop a chur / tús a chur le dag, Rith DAG nó linn snámha a chruthú.
CLI - tá go leor uirlisí ar fáil tríd an líne ordaithe nach bhfuil deacair iad a úsáid tríd an WebUI, ach atá as láthair go ginearálta. Mar shampla:
backfill ag teastáil chun cásanna tasc a atosú.
Mar shampla, tháinig anailísithe agus dúirt: “Agus tá nonsense agatsa, a chomrádaí, sna sonraí ón 1 Eanáir go dtí an 13 Eanáir! Deisigh é, deisigh é, deisigh é, deisigh é!" Agus is hobad den sórt sin thú:
run, a ligeann duit tasc ásc amháin a rith, agus fiú scóráil ar gach spleáchas. Thairis sin, is féidir leat é a rith trí LocalExecutor, fiú má tá braisle Soilire agat.
An rud céanna go leor test, ach freisin i boinn scríobhann rud ar bith.
connections Ceadaíonn cruthú mais naisc as an bhlaosc.
API Python - bealach idirghníomhaithe sách crua, atá beartaithe le haghaidh breiseán, agus gan a bheith ag snámh inti le lámha beaga. Ach cé atá chun stop a chur orainn dul go dtí /home/airflow/dags, rith ipython agus tosú ag praiseach thart? Is féidir leat, mar shampla, gach nasc leis an gcód seo a leanas a onnmhairiú:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Ag nascadh le bunachar meiteashonraí Airflow. Ní mholaim scríobh chuige, ach d’fhéadfadh sé a bheith i bhfad níos tapúla agus níos éasca sonraí tasc a fháil le haghaidh méadrachtaí sonracha éagsúla ná trí aon cheann de na APInna.
Ligean le rá nach bhfuil gach ceann dár tascanna idempotent, ach is féidir leo titim uaireanta, agus tá sé seo gnáth. Ach tá cúpla blockchain amhrasach cheana féin, agus bheadh sé riachtanach a sheiceáil.
Seachain SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
tagairtí
Agus ar ndóigh, is iad na chéad deich nasc ó eisiúint Google ábhar an fhillteáin Airflow ó mo leabharmharcanna.
An Zen de Python agus Apache Airflow - cur ar aghaidh intuigthe DAG, caitheamh comhthéacs i bhfeidhmeanna, arís faoi spleáchais, agus freisin faoi thasc a sheoladh gan bacadh le.