Hæ, ég er Dmitry Logvinenko - Gagnaverkfræðingur greiningardeildar Vezet fyrirtækjasamsteypunnar.
Ég mun segja þér frá frábæru tæki til að þróa ETL ferla - Apache Airflow. En Airflow er svo fjölhæft og margþætt að þú ættir að skoða það betur jafnvel þótt þú sért ekki í gagnaflæði, heldur þurfir þú reglulega að ræsa hvaða ferla sem er og fylgjast með framkvæmd þeirra.
Og já, ég mun ekki aðeins segja, heldur einnig sýna: forritið hefur mikið af kóða, skjámyndum og ráðleggingum.
Það sem þú sérð venjulega þegar þú gúglar orðið Airflow / Wikimedia Commons
- aðeins betra, og það var gert í allt öðrum tilgangi, nefnilega (eins og það er skrifað á undan kat):
keyra og fylgjast með verkefnum á ótakmarkaðan fjölda véla (eins og mörg sellerí / Kubernetes og samviska þín mun leyfa þér)
með kraftmiklu verkflæðisframleiðslu frá mjög auðvelt að skrifa og skilja Python kóða
og getu til að tengja hvaða gagnagrunna og API sem er við hvert annað með því að nota bæði tilbúna íhluti og heimagerða viðbætur (sem er mjög einfalt).
Við notum Apache Airflow svona:
við söfnum gögnum frá ýmsum aðilum (mörgum SQL Server og PostgreSQL tilvikum, ýmsum API með forritamælingum, jafnvel 1C) í DWH og ODS (við höfum Vertica og Clickhouse).
hversu háþróaður cron, sem byrjar gagnasamþjöppunarferli á ODS, og fylgist einnig með viðhaldi þeirra.
Þar til nýlega var þörfum okkar fullnægt af einum litlum netþjóni með 32 kjarna og 50 GB af vinnsluminni. Í Airflow virkar þetta:
meira 200 dags (reyndar verkflæði, þar sem við fylltum verkefni),
í hverjum að meðaltali 70 verkefni,
þetta góðgæti byrjar (einnig að meðaltali) einu sinni á klukkustund.
Og um hvernig við stækkuðum mun ég skrifa hér að neðan, en nú skulum við skilgreina yfirvandann sem við munum leysa:
Það eru þrír uppruna-SQL-þjónar, hver með 50 gagnagrunnum - tilvik af einu verkefni, hver um sig, þeir hafa sömu uppbyggingu (nánast alls staðar, mua-ha-ha), sem þýðir að hver hefur pöntunartöflu (sem betur fer töflu með því) nafn er hægt að ýta inn í hvaða fyrirtæki sem er). Við tökum gögnin með því að bæta við þjónustusviðum (upprunaþjóni, frumgagnagrunni, ETL verkefnaauðkenni) og henda þeim á barnalegan hátt inn í, segjum, Vertica.
Við skulum fara!
Aðalhlutinn, verklegur (og svolítið fræðilegur)
Af hverju er það fyrir okkur (og fyrir þig)
Þegar trén voru stór og ég var einföld SQL-schik í einni rússneskri smásölu, við svikum ETL ferla aka gagnaflæði með því að nota tvö verkfæri sem eru tiltæk fyrir okkur:
Informatica Power Center - ákaflega dreift kerfi, afar afkastamikið, með eigin vélbúnaði, eigin útgáfu. Ég notaði Guð forði 1% af getu þess. Hvers vegna? Jæja, fyrst af öllu, þetta viðmót, einhvers staðar frá 380, setti andlega pressu á okkur. Í öðru lagi er þessi búnaður hannaður fyrir einstaklega flotta ferla, tryllta endurnotkun íhluta og önnur mjög mikilvæg fyrirtækisbrellur. Um þá staðreynd að það kostar, eins og væng Airbus AXNUMX / ár, munum við ekki segja neitt.
Varist, skjáskot getur skaðað fólk undir 30 aðeins
SQL Server samþættingarþjónn - við notuðum þennan félaga í innra verkefnaflæði okkar. Jæja, í raun: við notum nú þegar SQL Server og það væri einhvern veginn ósanngjarnt að nota ekki ETL verkfæri hans. Allt í henni er gott: bæði viðmótið er fallegt og framvinduskýrslurnar ... En þetta er ekki ástæðan fyrir því að við elskum hugbúnaðarvörur, ó, ekki fyrir þetta. Útgáfa það dtsx (sem er XML með hnútum sem eru stokkaðir á vistun) við getum það, en hver er tilgangurinn? Hvað með að búa til verkefnapakka sem mun draga hundruð borða frá einum netþjóni til annars? Já, hvaða hundrað, vísifingur þinn mun detta af tuttugu bitum, með því að smella á músarhnappinn. En það lítur örugglega meira út í tísku:
Við leituðum svo sannarlega leiða út. Mál jafnvel næstum kom að sjálfskrifuðum SSIS pakka rafall ...
…og svo fann ég nýtt starf. Og Apache Airflow náði mér á því.
Þegar ég komst að því að ETL ferlalýsingar eru einfaldur Python kóða dansaði ég bara ekki af gleði. Svona voru gagnastraumar útfærðir og breyttir og að hella töflum með einni uppbyggingu úr hundruðum gagnagrunna í eitt skotmark varð spurning um Python kóða á einum og hálfum eða tveimur 13” skjám.
Að setja saman klasann
Við skulum ekki skipuleggja algjörlega leikskóla og ekki tala um algjörlega augljósa hluti hér, eins og að setja upp Airflow, gagnagrunninn sem þú valdir, sellerí og önnur tilvik sem lýst er í bryggjunni.
Svo að við getum strax hafið tilraunir, skissaði ég docker-compose.yml þar sem:
Við skulum reyndar hækka Loftstreymi: Tímaáætlun, vefþjónn. Blóm mun einnig snúast þar til að fylgjast með selleríverkefnum (vegna þess að það hefur þegar verið ýtt inn apache/airflow:1.10.10-python3.7, en okkur er sama)
PostgreSQL, þar sem Airflow mun skrifa þjónustuupplýsingar sínar (dagskrárgögn, framkvæmdatölfræði osfrv.), og Celery mun merkja lokið verkefnum;
Redis, sem mun starfa sem verkefnamiðlari fyrir sellerí;
Sellerí verkamaður, sem mun sinna beinni framkvæmd verkefna.
Til möppu ./dags við munum bæta við skrám okkar með lýsingu dags. Þeir verða teknir upp á flugu, þannig að það er engin þörf á að leika allan stafla eftir hverja hnerra.
Sums staðar er kóðinn í dæmunum ekki alveg sýndur (til að gera textann ekki í ruglinu), en einhvers staðar er honum breytt í ferlinu. Heildar dæmi um vinnukóða má finna í geymslunni https://github.com/dm-logv/airflow-tutorial.
Við samsetningu tónverksins treysti ég að miklu leyti á hina þekktu mynd pukkel/docker-loftflæði - endilega kíkið á það. Kannski þarftu ekki neitt annað í lífi þínu.
Allar loftflæðisstillingar eru fáanlegar ekki aðeins í gegnum airflow.cfg, en einnig í gegnum umhverfisbreytur (þökk sé þróunaraðilum), sem ég nýtti mér illgjarnt.
Auðvitað er það ekki tilbúið til framleiðslu: ég setti vísvitandi ekki hjartslátt á gáma, ég nennti ekki öryggi. En ég gerði það lágmark sem hentaði tilraunamönnum okkar.
Athugaðu að:
Dag mappan verður að vera aðgengileg bæði fyrir tímaritara og starfsmenn.
Sama á við um öll þriðja aðila bókasöfn - þau verða öll að vera uppsett á vélum með tímaáætlun og starfsmönnum.
Ef þú skildir ekkert í öllum þessum „dögum“, þá er hér stutt orðabók:
Tímaáætlun - mikilvægasti frændi í Airflow, sem stjórnar því að vélmenni vinni hörðum höndum, en ekki manneskja: fylgist með dagskrá, uppfærir dags, setur verkefni af stað.
Almennt séð, í eldri útgáfum, átti hann í vandræðum með minni (nei, ekki minnisleysi, heldur leki) og arfleifðarfæribreytan hélst jafnvel í stillingunum run_duration — endurræsingartímabil þess. En nú er allt í lagi.
DAG (aka "dag") - "stýrt óhringlaga graf", en slík skilgreining mun segja fáum, en í raun er það ílát fyrir verkefni sem hafa samskipti sín á milli (sjá hér að neðan) eða hliðstæða pakka í SSIS og Workflow í Informatica .
Auk dags geta enn verið undirdagar, en við munum líklegast ekki ná þeim.
DAG Run - frumstillt dag, sem er úthlutað sínum eigin execution_date. Dagrans sama dags getur virkað samhliða (ef þú hefur gert verkefni þín auðmjúk, auðvitað).
Flugrekandi eru stykki af kóða sem bera ábyrgð á að framkvæma tiltekna aðgerð. Það eru þrjár gerðir af rekstraraðilum:
aðgerðeins og uppáhaldið okkar PythonOperator, sem getur framkvæmt hvaða (gildan) Python kóða sem er;
flytja, sem flytja gögn frá stað til stað, td, MsSqlToHiveTransfer;
skynjari á hinn bóginn mun það leyfa þér að bregðast við eða hægja á frekari framkvæmd dagsins þar til atburður á sér stað. HttpSensor getur dregið tilgreindan endapunkt, og þegar æskilegt svar bíður, byrjaðu flutninginn GoogleCloudStorageToS3Operator. Forvitinn hugur mun spyrja: „af hverju? Þegar öllu er á botninn hvolft geturðu gert endurtekningar beint í símanum!“ Og svo, í því skyni að stífla ekki laug verkefna með stöðvuðum rekstraraðilum. Skynjarinn fer í gang, athugar og deyr fyrir næstu tilraun.
Verkefni - yfirlýstir rekstraraðilar, óháð tegund, og tengdir daginum eru færðir í stöðu verkefnis.
verkefni dæmi - þegar aðalskipuleggjandi ákvað að það væri kominn tími til að senda verkefni í bardaga á listamenn (rétt á staðnum, ef við notum LocalExecutor eða í fjarlægan hnút ef um er að ræða CeleryExecutor), það úthlutar þeim samhengi (þ.e. mengi af breytum - framkvæmdarbreytur), stækkar skipana- eða fyrirspurnarsniðmát og sameinar þau.
Við búum til verkefni
Í fyrsta lagi skulum við gera grein fyrir almennu kerfi dougsins okkar, og síðan munum við kafa í smáatriðin meira og meira, vegna þess að við notum nokkrar ekki léttvægar lausnir.
Svo, í sinni einföldustu mynd, mun slíkur dag líta svona út:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Við skulum reikna það út:
Fyrst flytjum við inn nauðsynlegar libs og eitthvað annað;
sql_server_ds - Er List[namedtuple[str, str]] með nöfnum á tengingum frá Airflow Connections og gagnagrunnum sem við munum taka diskinn okkar úr;
dag - tilkynning dagsins okkar, sem verður endilega að vera í globals(), annars finnur Airflow það ekki. Doug þarf líka að segja:
hvað heitir hann orders - þetta nafn mun þá birtast í vefviðmótinu,
að hann vinni frá miðnætti þann áttunda júlí,
og það ætti að keyra, á um það bil 6 klukkustunda fresti (fyrir harða krakka hér í staðinn fyrir timedelta() leyfilegt cron-lína 0 0 0/6 ? * * *, fyrir þá sem minna kúl - tjáning eins og @daily);
workflow() mun sinna aðalstarfinu, en ekki núna. Í bili munum við bara henda samhengi okkar inn í loggbókina.
Og nú er einfaldi galdurinn við að búa til verkefni:
við rennum í gegnum heimildir okkar;
frumstilla PythonOperator, sem mun aflífa dúlluna okkar workflow(). Ekki gleyma að tilgreina einstakt (innan dag) heiti verkefnisins og binda daginn sjálfan. Fáni provide_context aftur á móti mun hella fleiri rökum í fallið, sem við munum safna vandlega með því að nota **context.
Í bili er það allt. Það sem við fengum:
nýr dag í vefviðmótinu,
eitt og hálft hundrað verkefni sem verða unnin samhliða (ef Airflow, Sellerí stillingar og getu netþjóns leyfa það).
Jæja, náði því næstum.
Hver mun setja upp ósjálfstæðin?
Til að einfalda þetta allt saman þá skrúfaði ég inn docker-compose.yml vinnslu requirements.txt á öllum hnútum.
Nú er það horfið:
Gráir reitir eru verktilvik sem unnin eru af tímaáætlunarmanni.
Við bíðum aðeins, verkefnin eru tekin upp af starfsmönnum:
Þeir grænu hafa að sjálfsögðu lokið sínu starfi. Rauðir eru ekki mjög vel heppnaðir.
Við the vegur, það er engin mappa á vörunum okkar ./dags, það er engin samstilling á milli véla - allir dagar liggja í git á Gitlab okkar, og Gitlab CI dreifir uppfærslum á vélar við sameiningu master.
Smá um blóm
Á meðan verkamennirnir eru að troða snuðunum okkar skulum við muna eftir öðru tæki sem getur sýnt okkur eitthvað - Blóm.
Fyrsta síða með samantektarupplýsingum um starfshnúta:
Ákaflegasta síðan með verkefnum sem fóru í vinnuna:
Leiðinlegasta síðan með stöðu miðlara okkar:
Bjartasta síðan er með verkefnastöðugröf og framkvæmdartíma þeirra:
Við hleðjum undirhlaðna
Svo, öll verkefnin hafa gengið upp, þú getur borið burt særða.
Og það voru margir særðir - af einni eða annarri ástæðu. Ef um er að ræða rétta notkun á Airflow benda einmitt þessir reitir til þess að gögnin hafi örugglega ekki borist.
Þú þarft að fylgjast með skránni og endurræsa tilvikin sem hafa fallið.
Með því að smella á hvaða ferning sem er sjáum við þær aðgerðir sem okkur standa til boða:
Þú getur tekið og gert Clear the fallen. Það er, við gleymum að eitthvað hefur mistekist þar og sama tilviksverkefni mun fara í tímaáætlunarmanninn.
Það er ljóst að það er ekki mjög mannúðlegt að gera þetta með músinni með öllum rauðu ferningunum - þetta er ekki það sem við búumst við frá Airflow. Auðvitað eigum við gereyðingarvopn: Browse/Task Instances
Við skulum velja allt í einu og núllstilla, smelltu á réttan hlut:
Eftir hreinsun líta leigubílarnir okkar svona út (þeir eru nú þegar að bíða eftir að tímaáætlunarmaðurinn skipuleggi þá):
Tengingar, krókar og aðrar breytur
Það er kominn tími til að líta á næsta DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Hafa allir gert skýrsluuppfærslu? Þetta er hún aftur: það er listi yfir heimildir þaðan sem hægt er að fá gögnin; það er listi hvar á að setja; ekki gleyma að tuða þegar allt gerðist eða bilaði (jæja, þetta snýst ekki um okkur, nei).
Við skulum fara í gegnum skrána aftur og skoða nýju óljósu dótið:
from commons.operators import TelegramBotSendMessage - ekkert kemur í veg fyrir að við búum til okkar eigin símafyrirtæki, sem við nýttum okkur með því að búa til litla umbúðir til að senda skilaboð til Unblocked. (Við munum tala meira um þennan rekstraraðila hér að neðan);
default_args={} - dag getur dreift sömu rökum til allra rekstraraðila;
to='{{ var.value.all_the_kings_men }}' - sviði to við munum ekki hafa harðkóða, heldur myndað á kraftmikinn hátt með Jinja og breytu með lista yfir tölvupósta, sem ég setti vandlega inn Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — skilyrði fyrir því að ræsa rekstraraðila. Í okkar tilviki mun bréfið aðeins fljúga til yfirmannanna ef öll ósjálfstæði hafa gengið upp með góðum árangri;
tg_bot_conn_id='tg_main' - rök conn_id samþykkja auðkenni tenginga sem við búum til í Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - skilaboð í Telegram munu aðeins fljúga í burtu ef það eru fallin verkefni;
task_concurrency=1 - við bönnum samtímis kynningu á nokkrum verkefnatilvikum af einu verkefni. Annars munum við fá samtímis kynningu á nokkrum VerticaOperator (horft á eitt borð);
report_update >> [email, tg] - allt VerticaOperator sameinast við að senda bréf og skilaboð, eins og þetta:
En þar sem rekstraraðilar tilkynnenda hafa mismunandi ræsingarskilyrði mun aðeins einn virka. Í trésýninni lítur allt aðeins minna út:
Ég ætla að fara nokkrum orðum um fjölvi og vinir þeirra - breytum.
Fjölvi eru Jinja staðgenglar sem geta skipt út ýmsum gagnlegum upplýsingum í rekstrarrök. Til dæmis, svona:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} mun stækka við innihald samhengisbreytunnar execution_date með sniðinu YYYY-MM-DD: 2020-07-14. Það besta er að samhengisbreytur eru negldar við tiltekið verktilvik (ferningur í trésýninni), og þegar endurræst er munu staðgenglar stækka í sömu gildi.
Úthlutað gildi er hægt að skoða með því að nota Rendered hnappinn á hverju verktilviki. Svona er verkefnið við að senda bréf:
Og svo við verkefnið með því að senda skilaboð:
Heildarlisti yfir innbyggða fjölvi fyrir nýjustu tiltæku útgáfuna er fáanlegur hér: tilvísun fjölva
Þar að auki, með hjálp viðbætur, getum við lýst yfir eigin fjölvi, en það er önnur saga.
Til viðbótar við fyrirfram skilgreinda hluti, getum við skipt út gildum breytanna okkar (ég notaði þetta nú þegar í kóðanum hér að ofan). Við skulum búa til Admin/Variables nokkur atriði:
notaðu bara slóðina að viðkomandi lykli: {{ var.json.bot_config.bot.token }}.
Ég ætla bókstaflega að segja eitt orð og sýna eitt skjáskot um соединения. Allt er grunnatriði hér: á síðunni Admin/Connections við búum til tengingu, bætum við innskráningum / lykilorðum okkar og nákvæmari breytum þar. Svona:
Lykilorð geta verið dulkóðuð (rækilegar en sjálfgefið), eða þú getur sleppt tengingargerðinni (eins og ég gerði fyrir tg_main) - Staðreyndin er sú að listinn yfir tegundir er tengdur í Airflow módelum og er ekki hægt að stækka hann án þess að komast inn í frumkóðann (ef ég var skyndilega ekki að googla eitthvað, vinsamlegast leiðréttið mig), en ekkert mun koma í veg fyrir að við fáum inneignir bara kl. nafn.
Þú getur líka gert nokkrar tengingar með sama nafni: í þessu tilfelli, aðferðin BaseHook.get_connection(), sem fær okkur tengingar með nafni, mun gefa handahófi frá nokkrum nafna (það væri rökréttara að gera Round Robin, en við skulum skilja það eftir á samvisku Airflow þróunaraðila).
Breytur og tengingar eru vissulega flott verkfæri, en það er mikilvægt að missa ekki jafnvægið: hvaða hluta flæðisins þíns geymir þú í kóðanum sjálfum og hvaða hluta þú gefur Airflow til geymslu. Annars vegar getur verið þægilegt að breyta gildinu fljótt, til dæmis póstkassa, í gegnum HÍ. Á hinn bóginn er þetta enn afturhvarf til músarsmellsins, sem við (ég) vildum losna við.
Vinna með tengingar er eitt af verkefnunum krókar. Almennt séð eru Airflow krókar punktar til að tengja það við þjónustu og bókasöfn þriðja aðila. Td JiraHook mun opna viðskiptavin fyrir okkur til að hafa samskipti við Jira (þú getur fært verkefni fram og til baka), og með hjálp SambaHook þú getur ýtt staðbundinni skrá til smb-punktur.
Að þátta sérsniðna rekstraraðilann
Og við komumst nálægt því að skoða hvernig það er búið til TelegramBotSendMessage
Code commons/operators.py með raunverulegum rekstraraðila:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Hér, eins og allt annað í Airflow, er allt mjög einfalt:
Erfði frá BaseOperator, sem útfærir töluvert af Airflow-sértækum hlutum (horfðu á tómstundir þínar)
Yfirlýstir reitir template_fields, þar sem Jinja mun leita að fjölvi til að vinna úr.
Raðaði upp réttum rökum fyrir __init__(), stilltu sjálfgefnar stillingar þar sem þörf krefur.
Við gleymdum ekki frumstillingu forföðursins heldur.
Opnaði samsvarandi krók TelegramBotHookfengið viðskiptavinahlut frá henni.
Hnekkt (endurskilgreind) aðferð BaseOperator.execute(), sem Airfow mun kippa sér upp við þegar kemur að því að ræsa rekstraraðilann - í henni munum við útfæra aðalaðgerðina, gleyma að skrá þig inn. (Við skráum okkur inn, við the vegur, beint inn stdout и stderr - Loftflæði mun stöðva allt, vefja það fallega, sundurliða það þar sem þörf krefur.)
Við skulum sjá hvað við höfum commons/hooks.py. Fyrsti hluti skráarinnar, með króknum sjálfum:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ég veit ekki einu sinni hvað ég á að útskýra hér, ég ætla bara að taka eftir mikilvægum atriðum:
Við erfum, hugsum um rökin - í flestum tilfellum verður það eitt: conn_id;
Yfirstíga staðlaðar aðferðir: Ég takmarkaði mig get_conn(), þar sem ég fæ tengibreytur með nafni og fæ bara hlutann extra (þetta er JSON reitur), þar sem ég (samkvæmt mínum eigin leiðbeiningum!) setti Telegram bot táknið: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ég bý til dæmi um okkar TelegramBot, gefa því sérstakan tákn.
Það er allt og sumt. Þú getur fengið viðskiptavin frá krók með því að nota TelegramBotHook().clent eða TelegramBotHook().get_conn().
Og seinni hluti skráarinnar, þar sem ég bý til örumbúðir fyrir Telegram REST API, til að draga ekki það sama python-telegram-bot fyrir eina aðferð sendMessage.
Rétta leiðin er að leggja þetta allt saman: TelegramBotSendMessage, TelegramBotHook, TelegramBot - í viðbótinni, settu í opinbera geymslu og gefðu það til Open Source.
Á meðan við vorum að kynna okkur allt þetta tókst skýrsluuppfærslunum okkar að mistakast með góðum árangri og senda mér villuboð í rásinni. Ég ætla að athuga hvort það sé rangt...
Eitthvað brotnaði í hundinum okkar! Var það ekki það sem við áttum von á? Einmitt!
Ætlarðu að hella?
Finnst þér ég hafa misst af einhverju? Svo virðist sem hann hafi lofað að flytja gögn frá SQL Server til Vertica, og svo tók hann það og fór út fyrir efnið, skúrkur!
Þetta voðaverk var viljandi, ég varð einfaldlega að ráða einhver hugtök fyrir þig. Nú geturðu gengið lengra.
Planið okkar var þetta:
Gerðu dag
Búðu til verkefni
Sjáðu hvað allt er fallegt
Úthlutaðu lotunúmerum til fyllinga
Fáðu gögn frá SQL Server
Settu gögn í Vertica
Safna tölfræði
Svo, til að koma þessu öllu í gang, gerði ég smá viðbót við okkar docker-compose.yml:
Vertica sem gestgjafi dwh með mest sjálfgefna stillingum,
þrjú tilvik af SQL Server,
við fyllum gagnagrunna í þeim síðarnefnda með einhverjum gögnum (í engu tilviki skoðum mssql_init.py!)
Við ræsum allt það góða með hjálp aðeins flóknari skipunar en síðast:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Það sem kraftaverka slembivalið okkar bjó til, þú getur notað hlutinn Data Profiling/Ad Hoc Query:
Aðalatriðið er að sýna það ekki greinendum
útfæra nánar ETL fundur Ég geri það ekki, allt er léttvægt þarna: við búum til grunn, það er merki í honum, við vefjum allt með samhengisstjóra og nú gerum við þetta:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Tíminn er kominn safna gögnum okkar frá eitt og hálft hundrað borðum okkar. Við skulum gera þetta með hjálp mjög tilgerðarlausra lína:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Með hjálp króks fáum við frá Airflow pymssql-tengjast
Við skulum setja takmörkun í formi dagsetningar í beiðnina - henni verður kastað inn í aðgerðina af sniðmátsvélinni.
Mata beiðni okkar pandashver fær okkur DataFrame - það mun nýtast okkur í framtíðinni.
Ég er að nota staðgöngu {dt} í stað beiðnifæribreytu %s ekki vegna þess að ég er vondur Pinocchio, heldur vegna þess pandas ræður ekki við pymssql og sleppir því síðasta params: Listþó hann vilji virkilega tuple.
Athugaðu einnig að verktaki pymssql ákvað að styðja hann ekki lengur, og það er kominn tími til að flytja út pyodbc.
Við skulum sjá hvað Airflow fyllti rök aðgerða okkar með:
Ef það eru engin gögn, þá þýðir ekkert að halda áfram. En það er líka skrítið að telja fyllinguna vel heppnaða. En þetta eru ekki mistök. A-ah-ah, hvað á að gera?! Og hér er það:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException segir Airflow að það séu engar villur, en við sleppum verkefninu. Viðmótið verður ekki með grænum eða rauðum ferningi, heldur bleiku.
— Jæja, — sagði litla músin, — er það nú ekki
Ertu sannfærður um að ég sé hræðilegasta dýrið í skóginum?
Julia Donaldson, The Gruffalo
Ég held að ef félagar mínir og ég ættum samkeppni: hver mun fljótt búa til og hefja ETL ferli frá grunni: þeir með SSIS og mús og ég með Airflow ... Og þá myndum við líka bera saman auðvelt viðhald ... Vá, ég held að þú sért sammála því að ég mun sigra þá á öllum vígstöðvum!
Ef það er aðeins alvarlegra, þá gerði Apache Airflow - með því að lýsa ferlum í formi forritskóða - starf mitt mikið þægilegri og skemmtilegri.
Ótakmarkaður stækkanleiki þess, bæði hvað varðar viðbætur og tilhneigingu til sveigjanleika, gefur þér tækifæri til að nota Airflow á næstum hvaða svæði sem er: jafnvel í heildarferli söfnunar, undirbúnings og vinnslu gagna, jafnvel við að skjóta eldflaugum á loft (til Mars, eða námskeið).
Lokahluti, tilvísun og upplýsingar
Hrífan sem við höfum safnað fyrir þig
start_date. Já, þetta er nú þegar staðbundið meme. Í gegnum helstu rök Doug start_date allir standast. Í stuttu máli, ef þú tilgreinir í start_date núverandi dagsetning, og schedule_interval - einn daginn, þá byrjar DAG ekki fyrr á morgun.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Og engin vandamál lengur.
Það er önnur keyrsluvilla tengd henni: Task is missing the start_date parameter, sem gefur oftast til kynna að þú hafir gleymt að binda þig við dag rekstraraðila.
Allt á einni vél. Já, og bækistöðvar (loftflæðið sjálft og húðunin okkar), og vefþjónn, og tímaáætlun, og starfsmenn. Og það virkaði meira að segja. En með tímanum jókst fjöldi verkefna fyrir þjónustu og þegar PostgreSQL byrjaði að bregðast við vísitölunni á 20 sekúndum í stað 5 ms tókum við hana og fluttum hana í burtu.
LocalExecutor. Já, við sitjum enn á því og erum þegar komin að brún hyldýpsins. LocalExecutor hefur dugað okkur hingað til, en nú er kominn tími til að stækka með að minnsta kosti einum starfsmanni og við verðum að leggja hart að okkur til að fara yfir í CeleryExecutor. Og í ljósi þeirrar staðreyndar að þú getur unnið með það á einni vél, kemur ekkert í veg fyrir að þú notir Sellerí jafnvel á netþjóni, sem „að sjálfsögðu mun aldrei fara í framleiðslu, satt að segja!“
Ekki í notkun innbyggð verkfæri:
Tengingar til að geyma þjónustuskilríki,
SLA ungfrú að bregðast við verkefnum sem gengu ekki upp á réttum tíma,
xcom fyrir lýsigagnaskipti (sagði ég metagögn!) milli dagverkefna.
Misnotkun á pósti. Jæja, hvað get ég sagt? Viðvaranir voru settar upp fyrir allar endurtekningar á föllnum verkefnum. Núna er Gmail í vinnunni minni með >90 tölvupósta frá Airflow og trýni vefpóstsins neitar að taka upp og eyða meira en 100 í einu.
Til þess að við getum unnið enn meira með höfuðið en ekki með höndunum hefur Airflow undirbúið þetta fyrir okkur:
REST API - hann hefur enn stöðuna sem tilraunamaður, sem kemur ekki í veg fyrir að hann starfi. Með honum er ekki aðeins hægt að fá upplýsingar um dags og verkefni, heldur einnig stöðva/byrja dag, búa til DAG Run eða sundlaug.
CLI - mörg verkfæri eru fáanleg í gegnum skipanalínuna sem eru ekki bara óþægileg í notkun í gegnum vefviðmótið heldur eru þau almennt fjarverandi. Til dæmis:
backfill þarf til að endurræsa verktilvik.
Til dæmis komu sérfræðingar og sögðu: „Og þú, félagi, hefur vitleysu í gögnunum frá 1. til 13. janúar! Lagaðu það, lagaðu það, lagaðu það, lagaðu það!" Og þú ert svona helluborð:
run, sem gerir þér kleift að keyra eitt dæmi verkefni, og jafnvel skora á öllum ósjálfstæðum. Þar að auki geturðu keyrt það í gegnum LocalExecutor, jafnvel þótt þú sért með selleríklasa.
Gerir nokkurn veginn það sama test, aðeins líka í stöðvum skrifar ekkert.
connections gerir fjöldasköpun tenginga úr skelinni.
python api - frekar harðkjarna leið til að hafa samskipti, sem er ætluð fyrir viðbætur, og ekki sveima í henni með litlum höndum. En hver á að hindra okkur í að fara til /home/airflow/dags, hlaupa ipython og byrja að rugla? Þú getur til dæmis flutt út allar tengingar með eftirfarandi kóða:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Tengist Airflow lýsigagnagrunninum. Ég mæli ekki með því að skrifa á það, en að fá verkefnastöðu fyrir ýmsar sérstakar mælikvarða getur verið miklu hraðari og auðveldara en í gegnum hvaða API sem er.
Segjum að verkefni okkar séu ekki öll vanmáttug, en þau geta stundum fallið og það er eðlilegt. En nokkrar stíflur eru þegar grunsamlegar og það væri nauðsynlegt að athuga.
Varist SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
tilvísanir
Og auðvitað eru fyrstu tíu hlekkirnir frá útgáfu Google innihald Airflow möppunnar úr bókamerkjunum mínum.
Apache Airflow Documentation - auðvitað verðum við að byrja á skrifstofunni. skjöl, en hver les leiðbeiningarnar?
Best Practices - Jæja, lestu að minnsta kosti ráðleggingarnar frá höfundunum.
Zen of Python og Apache Airflow - óbein DAG-framsending, samhengi að henda inn aðgerðum, aftur um ósjálfstæði, og einnig um að sleppa verkefnaræsingum.