Hujambo, mimi ni Dmitry Logvinenko - Mhandisi wa Data wa Idara ya Uchanganuzi wa kundi la kampuni za Vezet.
Nitakuambia juu ya zana nzuri ya kukuza michakato ya ETL - Apache Airflow. Lakini Airflow ni nyingi sana na ina mambo mengi kiasi kwamba unapaswa kuiangalia kwa karibu hata kama hauhusiki katika mtiririko wa data, lakini una hitaji la kuzindua michakato yoyote mara kwa mara na kufuatilia utekelezaji wao.
Na ndiyo, sitasema tu, lakini pia kuonyesha: programu ina kanuni nyingi, viwambo vya skrini na mapendekezo.
Unachoona kwa kawaida unapo google neno Airflow / Wikimedia Commons
- bora tu, na ilitengenezwa kwa madhumuni tofauti kabisa, ambayo ni (kama ilivyoandikwa kabla ya kat):
kuendesha na kufuatilia kazi kwenye idadi isiyo na kikomo ya mashine (kama Celery / Kubernetes nyingi na dhamiri yako itakuruhusu)
na kizazi chenye nguvu cha mtiririko wa kazi kutoka rahisi sana kuandika na kuelewa nambari ya Python
na uwezo wa kuunganisha hifadhidata na API zozote kwa kutumia vijenzi vilivyotengenezwa tayari na programu-jalizi zilizotengenezwa nyumbani (ambayo ni rahisi sana).
Tunatumia Apache Airflow kama hii:
tunakusanya data kutoka vyanzo mbalimbali (matukio mengi ya Seva ya SQL na PostgreSQL, API mbalimbali zilizo na vipimo vya programu, hata 1C) katika DWH na ODS (tuna Vertica na Clickhouse).
jinsi ya juu cron, ambayo huanza michakato ya ujumuishaji wa data kwenye ODS, na pia inasimamia matengenezo yao.
Hadi hivi majuzi, mahitaji yetu yalifunikwa na seva moja ndogo na cores 32 na 50 GB ya RAM. Katika Airflow, hii inafanya kazi:
zaidi Dakika 200 (kwa kweli mtiririko wa kazi, ambao tuliingiza kazi),
katika kila wastani 70 kazi,
wema huu huanza (pia kwa wastani) mara moja kwa saa.
Na kuhusu jinsi tulivyopanuka, nitaandika hapa chini, lakini sasa hebu tufafanue ΓΌber-tatizo ambalo tutasuluhisha:
Kuna vyanzo vitatu vya Seva za SQL, kila moja ikiwa na hifadhidata 50 - mifano ya mradi mmoja, mtawaliwa, zina muundo sawa (karibu kila mahali, mua-ha-ha), ambayo inamaanisha kuwa kila moja ina jedwali la Maagizo (kwa bahati nzuri, meza iliyo na hiyo). jina linaweza kusukuma katika biashara yoyote). Tunachukua data kwa kuongeza sehemu za huduma (seva chanzo, hifadhidata ya chanzo, Kitambulisho cha kazi cha ETL) na kuzitupa kwa ujinga, tuseme, Vertica.
Hebu kwenda!
Sehemu kuu, ya vitendo (na ya kinadharia kidogo)
Kwa nini sisi (na wewe)
Wakati miti ilikuwa kubwa na nilikuwa rahisi SQL-schik katika rejareja moja ya Kirusi, tulilaghai michakato ya ETL aka mtiririko wa data kwa kutumia zana mbili zinazopatikana kwetu:
Kituo cha Nguvu cha Informatica - mfumo unaoenea sana, unaozalisha sana, na vifaa vyake, matoleo yake mwenyewe. Nilitumia Mungu apishe 1% ya uwezo wake. Kwa nini? Kweli, kwanza kabisa, kiolesura hiki, mahali fulani kutoka miaka ya 380, kiliweka shinikizo juu yetu kiakili. Pili, ukandamizaji huu umeundwa kwa ajili ya michakato ya dhana sana, utumiaji wa sehemu ya hasira na mbinu zingine muhimu sana za biashara. Kuhusu kile kinachogharimu, kama mrengo wa Airbus AXNUMX / mwaka, hatutasema chochote.
Tahadhari, picha ya skrini inaweza kuumiza watu walio chini ya miaka 30 kidogo
Seva ya Ujumuishaji wa Seva ya SQL - tulimtumia mwenzetu huyu katika mtiririko wetu wa ndani ya mradi. Kweli, kwa kweli: tayari tunatumia Seva ya SQL, na itakuwa isiyo na maana kwa njia fulani kutotumia zana zake za ETL. Kila kitu ndani yake ni nzuri: interface zote mbili ni nzuri, na ripoti ya maendeleo ... Lakini hii sio kwa nini tunapenda bidhaa za programu, oh, si kwa hili. Toa toleo dtsx (ambayo ni XML iliyo na nodi zilizochanganyika kwenye kuokoa) tunaweza, lakini ni nini uhakika? Vipi kuhusu kutengeneza kifurushi cha kazi ambacho kitaburuta mamia ya meza kutoka kwa seva moja hadi nyingine? Ndio, ni mia gani, kidole chako cha index kitaanguka kutoka kwa vipande ishirini, kubonyeza kitufe cha panya. Lakini hakika inaonekana mtindo zaidi:
Hakika tulitafuta njia za kutoka. Kesi hata karibu ilikuja kwa jenereta ya kifurushi cha SSIS iliyojiandikisha ...
... na kisha kazi mpya ikanipata. Na Apache Airflow ilinipata juu yake.
Nilipogundua kuwa maelezo ya mchakato wa ETL ni nambari rahisi ya Python, sikucheza kwa furaha. Hivi ndivyo mitiririko ya data ilitolewa na kutofautishwa, na kumwaga jedwali zilizo na muundo mmoja kutoka kwa mamia ya hifadhidata hadi lengo moja ikawa suala la msimbo wa Python katika skrini moja na nusu au mbili 13 β.
Kukusanya nguzo
Wacha tusitengeneze chekechea kabisa, na tusizungumze juu ya vitu dhahiri kabisa hapa, kama kusakinisha Airflow, hifadhidata uliyochagua, Celery na kesi zingine zilizoelezewa kwenye kizimbani.
Ili tuweze kuanza majaribio mara moja, nilichora docker-compose.yml ambayo:
Hebu tuinue Airflow: Mratibu, Webserver. Flower pia itakuwa inazunguka huko ili kufuatilia kazi za Seli (kwa sababu tayari imesukumwa ndani apache/airflow:1.10.10-python3.7, lakini hatujali)
PostgreSQL, ambapo Airflow itaandika maelezo yake ya huduma (data ya kiratibu, takwimu za utekelezaji, n.k.), na Celery itaashiria kazi zilizokamilishwa;
Rejea, ambayo itafanya kama wakala wa kazi kwa Celery;
Mfanyikazi wa celery, ambayo itahusika katika utekelezaji wa moja kwa moja wa kazi.
Kwa folda ./dags tutaongeza faili zetu na maelezo ya dags. Watachukuliwa kuruka, kwa hivyo hakuna haja ya kugeuza rundo zima baada ya kila kupiga chafya.
Katika maeneo mengine, msimbo katika mifano hauonyeshwa kabisa (ili usiingie maandishi), lakini mahali fulani hurekebishwa katika mchakato. Mifano kamili ya nambari za kufanya kazi inaweza kupatikana kwenye ghala https://github.com/dm-logv/airflow-tutorial.
Katika mkusanyiko wa utungaji, nilitegemea sana picha inayojulikana puckel/docker-airflow - hakikisha kuiangalia. Labda hauitaji kitu kingine chochote katika maisha yako.
Mipangilio yote ya Airflow inapatikana sio tu kupitia airflow.cfg, lakini pia kupitia anuwai za mazingira (shukrani kwa watengenezaji), ambayo nilichukua faida yake kwa ubaya.
Kwa kawaida, haiko tayari kwa uzalishaji: kwa makusudi sikuweka mapigo ya moyo kwenye vyombo, sikujisumbua na usalama. Lakini nilifanya kiwango cha chini kinachofaa kwa majaribio yetu.
Kumbuka kuwa:
Folda ya dag lazima ipatikane na kipanga ratiba na wafanyikazi.
Vile vile hutumika kwa maktaba zote za watu wengine - lazima zote zisanikishwe kwenye mashine zilizo na mpangilio na wafanyikazi.
Kweli, sasa ni rahisi:
$ docker-compose up --scale worker=3
Baada ya kila kitu kuongezeka, unaweza kuangalia miingiliano ya wavuti:
Ikiwa haukuelewa chochote katika "dagi" hizi zote, basi hapa kuna kamusi fupi:
Kipanya - mjomba muhimu zaidi katika Airflow, kudhibiti kwamba roboti hufanya kazi kwa bidii, na si mtu: hufuatilia ratiba, kusasisha dags, kuzindua kazi.
Kwa ujumla, katika matoleo ya zamani, alikuwa na shida na kumbukumbu (hapana, sio amnesia, lakini uvujaji) na parameta ya urithi hata ilibaki kwenye usanidi. run_duration - muda wake wa kuanza upya. Lakini sasa kila kitu ni sawa.
DAG (aka "dag") - "grafu ya acyclic iliyoelekezwa", lakini ufafanuzi kama huo utaambia watu wachache, lakini kwa kweli ni chombo cha kazi zinazoingiliana (tazama hapa chini) au analog ya Package katika SSIS na Workflow katika Informatica. .
Mbali na dags, bado kunaweza kuwa na subdags, lakini uwezekano mkubwa hatutafika kwao.
Mbio za DAG - dag iliyoanzishwa, ambayo imepewa yake mwenyewe execution_date. Dagrans ya dag sawa inaweza kufanya kazi kwa sambamba (ikiwa ulifanya kazi zako kuwa zisizo na maana, bila shaka).
Opereta ni vipande vya kanuni vinavyohusika na kutekeleza kitendo maalum. Kuna aina tatu za waendeshaji:
hatuakama mpendwa wetu PythonOperator, ambayo inaweza kutekeleza msimbo wowote (sahihi) wa Python;
kuhamisha, ambayo husafirisha data kutoka mahali hadi mahali, sema, MsSqlToHiveTransfer;
sensor kwa upande mwingine, itawawezesha kuguswa au kupunguza kasi ya utekelezaji zaidi wa dag mpaka tukio hutokea. HttpSensor inaweza kuvuta sehemu ya mwisho iliyoainishwa, na wakati jibu linalohitajika linangojea, anza uhamishaji GoogleCloudStorageToS3Operator. Akili ya kudadisi itauliza: βKwa nini? Baada ya yote, unaweza kufanya marudio moja kwa moja kwenye opereta! Na kisha, ili si kuziba bwawa la kazi na waendeshaji kusimamishwa. Sensor huanza, hukagua na kufa kabla ya jaribio linalofuata.
Kazi - waendeshaji waliotangazwa, bila kujali aina, na kushikamana na dag wanapandishwa cheo cha kazi.
mfano wa kazi - wakati mpangaji mkuu aliamua kuwa ni wakati wa kutuma kazi vitani kwa watendaji-watendaji (papo hapo, ikiwa tunatumia LocalExecutor au kwa nodi ya mbali katika kesi ya CeleryExecutor), inawapa muktadha (yaani, seti ya vigezo - vigezo vya utekelezaji), huongeza amri au violezo vya hoja, na kuziweka.
Tunatengeneza kazi
Kwanza, hebu tuonyeshe mpango wa jumla wa doug yetu, na kisha tutaingia kwenye maelezo zaidi na zaidi, kwa sababu tunatumia ufumbuzi usio na maana.
Kwa hivyo, kwa fomu yake rahisi, dag kama hii itaonekana kama hii:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Hebu tufikirie:
Kwanza, tunaagiza libs muhimu na kitu kingine;
sql_server_ds - Je, List[namedtuple[str, str]] na majina ya viunganisho kutoka kwa Viunganisho vya Airflow na hifadhidata ambazo tutachukua sahani yetu;
dag - tangazo la dag yetu, ambayo lazima iwe ndani globals(), vinginevyo Airflow haitaipata. Doug pia anahitaji kusema:
jina lake ni nani orders - jina hili litaonekana kwenye kiolesura cha wavuti,
kwamba atafanya kazi kuanzia usiku wa manane tarehe nane Julai,
na inapaswa kukimbia, takriban kila masaa 6 (kwa watu wagumu hapa badala ya timedelta() inayokubalika cron- mstari 0 0 0/6 ? * * *, kwa baridi kidogo - usemi kama @daily);
workflow() itafanya kazi kuu, lakini sio sasa. Kwa sasa, tutatupa tu muktadha wetu kwenye logi.
Na sasa uchawi rahisi wa kuunda kazi:
tunapitia vyanzo vyetu;
anzisha PythonOperator, ambayo itatekeleza dummy yetu workflow(). Usisahau kutaja jina la pekee (ndani ya dag) la kazi na kumfunga dag yenyewe. Bendera provide_context kwa upande wake, itamimina hoja za ziada kwenye kazi, ambayo tutakusanya kwa uangalifu kwa kutumia **context.
Kwa sasa, ni hayo tu. Tulichopata:
dag mpya kwenye kiolesura cha wavuti,
majukumu mia moja na nusu ambayo yatatekelezwa kwa sambamba (ikiwa Airflow, mipangilio ya Celery na uwezo wa seva inaruhusu).
Naam, karibu kupata.
Nani ataweka tegemezi?
Ili kurahisisha jambo hili lote, nilijiingiza docker-compose.yml usindikaji requirements.txt kwenye nodi zote.
Sasa imepita:
miraba ya kijivu ni matukio ya kazi yanayochakatwa na kipanga ratiba.
Tunasubiri kidogo, kazi zinachukuliwa na wafanyikazi:
Wale wa kijani, bila shaka, wamekamilisha kazi yao kwa ufanisi. Nyekundu hazifanikiwa sana.
Kwa njia, hakuna folda kwenye prod yetu ./dags, hakuna maingiliano kati ya mashine - dags zote ziko ndani git kwenye Gitlab yetu, na Gitlab CI inasambaza masasisho kwa mashine wakati wa kuunganishwa master.
Kidogo kuhusu Maua
Wakati wafanyikazi wanapiga viboreshaji vyetu, tukumbuke zana nyingine ambayo inaweza kutuonyesha kitu - Maua.
Ukurasa wa kwanza kabisa ulio na habari ya muhtasari kwenye nodi za wafanyikazi:
Ukurasa mkali zaidi na kazi ambazo zilifanya kazi:
Ukurasa wa kuchosha zaidi na hali ya wakala wetu:
Ukurasa mkali zaidi una grafu za hali ya kazi na muda wa utekelezaji wake:
Tunapakia iliyopakuliwa
Kwa hivyo, kazi zote zimefanyika, unaweza kubeba waliojeruhiwa.
Na kulikuwa na wengi waliojeruhiwa - kwa sababu moja au nyingine. Katika kesi ya matumizi sahihi ya Airflow, miraba hii inaonyesha kuwa data hakika haikufika.
Unahitaji kutazama logi na uanze tena matukio ya kazi iliyoanguka.
Kwa kubofya mraba wowote, tutaona vitendo vinavyopatikana kwetu:
Unaweza kuchukua na kufanya Wazi walioanguka. Hiyo ni, tunasahau kuwa kuna kitu kimeshindwa hapo, na kazi sawa ya mfano itaenda kwa mpangaji.
Ni wazi kuwa kufanya hivi na panya na miraba nyekundu sio ya kibinadamu sana - hii sio tunayotarajia kutoka kwa Airflow. Kwa kawaida, tuna silaha za maangamizi makubwa: Browse/Task Instances
Wacha tuchague kila kitu mara moja na tuweke upya hadi sifuri, bofya kipengee sahihi:
Baada ya kusafisha, teksi zetu zinaonekana kama hii (tayari wanangojea mpangaji kupanga ratiba):
Viunganisho, ndoano na vigezo vingine
Ni wakati wa kuangalia DAG ijayo, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Je, kila mtu amewahi kusasisha ripoti? Huyu ndiye tena: kuna orodha ya vyanzo kutoka mahali pa kupata data; kuna orodha ambapo kuweka; usisahau kupiga honi wakati kila kitu kilifanyika au kuvunja (vizuri, hii sio juu yetu, hapana).
Wacha tupitie faili tena na tuangalie vitu vipya visivyo wazi:
from commons.operators import TelegramBotSendMessage - hakuna kinachotuzuia kutengeneza waendeshaji wetu wenyewe, ambao tulichukua faida kwa kutengeneza karatasi ndogo ya kutuma ujumbe kwa Haijazuiwa. (Tutazungumza zaidi juu ya mwendeshaji huyu hapa chini);
default_args={} - dag inaweza kusambaza hoja sawa kwa waendeshaji wake wote;
to='{{ var.value.all_the_kings_men }}' - shamba to hatutakuwa na misimbo ngumu, lakini itatolewa kwa nguvu kwa kutumia Jinja na kibadilishaji kilicho na orodha ya barua pepe, ambazo niliweka kwa uangalifu. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - hali ya kuanzisha opereta. Kwa upande wetu, barua itaruka kwa wakubwa tu ikiwa utegemezi wote umefanya kazi kwa mafanikio;
tg_bot_conn_id='tg_main' - hoja conn_id ukubali vitambulisho vya muunganisho ambavyo tunaunda ndani Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - ujumbe katika Telegram utaondoka tu ikiwa kuna kazi zilizoanguka;
task_concurrency=1 - tunakataza uzinduzi wa wakati mmoja wa matukio kadhaa ya kazi ya kazi moja. Vinginevyo, tutapata uzinduzi wa wakati mmoja wa kadhaa VerticaOperator (kutazama meza moja);
report_update >> [email, tg] - wote VerticaOperator ungana katika kutuma barua na ujumbe, kama hii:
Lakini kwa kuwa waendeshaji wa arifa wana hali tofauti za uzinduzi, moja tu itafanya kazi. Katika Mwonekano wa Mti, kila kitu kinaonekana kidogo kidogo:
Nitasema maneno machache kuhusu makro na marafiki zao - vigezo.
Macro ni vishikilia nafasi vya Jinja ambavyo vinaweza kubadilisha taarifa mbalimbali muhimu katika hoja za waendeshaji. Kwa mfano, kama hii:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} itapanuka hadi yaliyomo katika utofauti wa muktadha execution_date katika umbizo YYYY-MM-DD: 2020-07-14. Sehemu bora zaidi ni kwamba viambatisho vya muktadha vimetundikwa kwenye tukio maalum la kazi (mraba katika Mwonekano wa Mti), na inapowashwa upya, vishikilia nafasi vitapanuka hadi thamani sawa.
Thamani zilizokabidhiwa zinaweza kutazamwa kwa kutumia kitufe Iliyotolewa kwenye kila tukio la kazi. Hivi ndivyo kazi ya kutuma barua:
Na kwa hivyo katika kazi ya kutuma ujumbe:
Orodha kamili ya macros iliyojengwa ndani kwa toleo la hivi karibuni linalopatikana inapatikana hapa: kumbukumbu ya macros
Zaidi ya hayo, kwa msaada wa programu-jalizi, tunaweza kutangaza macros yetu wenyewe, lakini hiyo ni hadithi nyingine.
Kwa kuongezea vitu vilivyoainishwa, tunaweza kubadilisha maadili ya anuwai zetu (tayari nilitumia hii kwenye nambari iliyo hapo juu). Wacha tuunde Admin/Variables mambo kadhaa:
tumia tu njia ya ufunguo unaotaka: {{ var.json.bot_config.bot.token }}.
Nitasema neno moja na kuonyesha picha moja ya skrini kuhusu viunganisho. Kila kitu ni cha msingi hapa: kwenye ukurasa Admin/Connections tunaunda muunganisho, ongeza maingizo / nywila zetu na vigezo maalum zaidi hapo. Kama hii:
Nywila zinaweza kusimbwa (kwa undani zaidi kuliko chaguo-msingi), au unaweza kuacha aina ya unganisho (kama nilivyofanya kwa tg_main) - ukweli ni kwamba orodha ya aina ni ngumu katika mifano ya Airflow na haiwezi kupanuliwa bila kuingia kwenye misimbo ya chanzo (ikiwa ghafla sikuingia kwenye google kitu, tafadhali nirekebishe), lakini hakuna kitu kitakachotuzuia kupata mikopo tu kwa jina.
Unaweza pia kufanya viunganisho kadhaa kwa jina moja: katika kesi hii, njia BaseHook.get_connection(), ambayo hutupatia miunganisho kwa jina, itatoa nasibu kutoka kwa majina kadhaa (itakuwa ya busara zaidi kufanya Round Robin, lakini wacha tuiache kwa dhamiri ya watengenezaji wa Airflow).
Vigezo na Viunganisho hakika ni zana nzuri, lakini ni muhimu usipoteze salio: ni sehemu gani za mitiririko yako unazohifadhi katika msimbo wenyewe, na ni sehemu gani unazotoa kwa Airflow kwa hifadhi. Kwa upande mmoja, inaweza kuwa rahisi kubadili haraka thamani, kwa mfano, sanduku la barua, kupitia UI. Kwa upande mwingine, hii bado ni kurudi kwa kubofya kwa panya, ambayo sisi (mimi) tulitaka kujiondoa.
Kufanya kazi na viunganishi ni moja ya kazi kulabu. Kwa ujumla, ndoano za Airflow ni pointi za kuiunganisha kwa huduma na maktaba za watu wengine. Kwa mfano, JiraHook itatufungulia mteja ili tuwasiliane na Jira (unaweza kusogeza kazi huku na huko), na kwa usaidizi wa SambaHook unaweza kushinikiza faili ya ndani kwa smb-hatua.
Kuchanganua opereta maalum
Na tukakaribia kuangalia jinsi inavyotengenezwa TelegramBotSendMessage
Kanuni commons/operators.py na opereta halisi:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Hapa, kama kila kitu kingine katika Airflow, kila kitu ni rahisi sana:
Imerithiwa kutoka BaseOperator, ambayo hutekelezea mambo machache mahususi ya Airflow (angalia tafrija yako)
Sehemu zilizotangazwa template_fields, ambayo Jinja itatafuta macros ya kuchakata.
Kupanga hoja sahihi kwa __init__(), weka chaguo-msingi inapobidi.
Hatukusahau kuhusu kuanzishwa kwa babu pia.
Ilifungua ndoano inayolingana TelegramBotHookkupokea kitu mteja kutoka humo.
Mbinu iliyobatilishwa (iliyofafanuliwa upya). BaseOperator.execute(), ambayo Airfow itazunguka wakati unakuja wa kuzindua operator - ndani yake tutatekeleza hatua kuu, kusahau kuingia. (Tunaingia, kwa njia, moja kwa moja stdout ΠΈ stderr - Mtiririko wa hewa utakatiza kila kitu, kuifunga kwa uzuri, kuoza inapohitajika.)
Hebu tuone kile tulichonacho commons/hooks.py. Sehemu ya kwanza ya faili, na ndoano yenyewe:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Sijui hata nielezee nini hapa, nitazingatia tu mambo muhimu:
Tunarithi, fikiria juu ya hoja - katika hali nyingi itakuwa moja: conn_id;
Kupitisha mbinu za kawaida: Nilijizuia get_conn(), ambayo mimi hupata vigezo vya uunganisho kwa jina na kupata sehemu tu extra (hii ni sehemu ya JSON), ambamo mimi (kulingana na maagizo yangu!) niliweka ishara ya bot ya Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ninaunda mfano wetu TelegramBot, akiipa ishara maalum.
Ni hayo tu. Unaweza kupata mteja kutoka kwa ndoano kwa kutumia TelegramBotHook().clent au TelegramBotHook().get_conn().
Na sehemu ya pili ya faili, ambayo mimi hutengeneza microwrapper kwa Telegraph REST API, ili sio kuvuta sawa. python-telegram-bot kwa mbinu moja sendMessage.
Njia sahihi ni kuongeza yote: TelegramBotSendMessage, TelegramBotHook, TelegramBot - kwenye programu-jalizi, weka hifadhi ya umma, na uipe Open Source.
Tulipokuwa tukijifunza haya yote, masasisho yetu ya ripoti yalifaulu kushindwa na kunitumia ujumbe wa hitilafu kwenye kituo. Nitaangalia kama ni makosa...
Kitu kilivunjwa ndani ya mbwa wetu! Si ndivyo tulivyokuwa tukitarajia? Hasa!
Je, unaenda kumwaga?
Unahisi nimekosa kitu? Inaonekana kwamba aliahidi kuhamisha data kutoka SQL Server hadi Vertica, na kisha akaichukua na kuondoka kwenye mada, mhuni!
Ukatili huu ulikuwa wa kukusudia, ilibidi nikueleze istilahi fulani. Sasa unaweza kwenda mbali zaidi.
Mpango wetu ulikuwa hivi:
Kufanya dag
Tengeneza majukumu
Tazama jinsi kila kitu kilivyo nzuri
Weka nambari za kipindi ili ujaze
Pata data kutoka kwa Seva ya SQL
Weka data kwenye Vertica
Kusanya takwimu
Kwa hivyo, ili kupata haya yote na kukimbia, nilifanya nyongeza ndogo kwa yetu docker-compose.yml:
Vertica ni mwenyeji bingwa dwh na mipangilio chaguo-msingi zaidi,
matukio matatu ya SQL Server,
tunajaza hifadhidata katika mwisho na data fulani (kwa hali yoyote usiangalie mssql_init.py!)
Tunazindua mema yote kwa msaada wa amri ngumu zaidi kuliko mara ya mwisho:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Kile randomizer yetu ya muujiza ilizalisha, unaweza kutumia bidhaa Data Profiling/Ad Hoc Query:
Jambo kuu sio kuionyesha kwa wachambuzi
kufafanua Vipindi vya ETL Sitafanya, kila kitu ni kidogo huko: tunatengeneza msingi, kuna ishara ndani yake, tunafunga kila kitu na msimamizi wa muktadha, na sasa tunafanya hivi:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
kikao.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Wakati umefika kukusanya data zetu kutoka kwa meza zetu mia moja na nusu. Wacha tufanye hivi kwa msaada wa mistari isiyo na adabu sana:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Kwa msaada wa ndoano tunapata kutoka Airflow pymssql-unganisha
Wacha tubadilishe kizuizi katika mfumo wa tarehe kwenye ombi - itatupwa kwenye kazi na injini ya template.
Kulisha ombi letu pandasnani atatupata DataFrame - itakuwa na manufaa kwetu katika siku zijazo.
Ninatumia mbadala {dt} badala ya parameta ya ombi %s si kwa sababu mimi ni Pinocchio mbaya, lakini kwa sababu pandas haiwezi kushughulikia pymssql na kuteleza ya mwisho params: Listingawa anataka kweli tuple.
Pia kumbuka kuwa msanidi programu pymssql aliamua kutomuunga mkono tena, na ni wakati wa kuhama pyodbc.
Wacha tuone Airflow ilijaza hoja za kazi zetu na:
Ikiwa hakuna data, basi hakuna uhakika katika kuendelea. Lakini pia ni ajabu kuzingatia kujaza kwa mafanikio. Lakini hili si kosa. A-ah-ah, nini cha kufanya?! Na hapa ni nini:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException itaambia Airflow kuwa hakuna makosa, lakini tunaruka jukumu hilo. Interface haitakuwa na mraba wa kijani au nyekundu, lakini nyekundu.
Kitambulisho cha kipindi chetu cha mafuriko (itakuwa tofauti kwa kila kazi),
Hashi kutoka kwa chanzo na kitambulisho cha kuagiza - ili katika hifadhidata ya mwisho (ambapo kila kitu hutiwa kwenye jedwali moja) tuna kitambulisho cha kipekee cha agizo.
Hatua ya mwisho inabaki: mimina kila kitu kwenye Vertica. Na, cha ajabu, mojawapo ya njia za kuvutia zaidi na bora za kufanya hivi ni kupitia CSV!
- Naam, - alisema panya mdogo, - sivyo, sasa
Je, unasadiki kwamba mimi ndiye mnyama mbaya zaidi msituni?
Julia Donaldson, The Gruffalo
Nadhani ikiwa wenzangu na mimi tulikuwa na ushindani: ni nani atakayeunda na kuzindua mchakato wa ETL haraka kutoka mwanzo: wao na SSIS yao na panya na mimi na Airflow ... Na kisha tungelinganisha urahisi wa matengenezo ... Wow, nadhani utakubali kwamba nitawapiga pande zote!
Ikiwa kwa umakini zaidi, basi Apache Airflow - kwa kuelezea michakato katika mfumo wa nambari ya programu - ilifanya kazi yangu. sana vizuri zaidi na kufurahisha.
Upanuzi wake usio na kikomo, katika suala la programu-jalizi na utabiri wa kuongezeka, hukupa fursa ya kutumia Airflow karibu na eneo lolote: hata katika mzunguko kamili wa kukusanya, kuandaa na kuchakata data, hata katika kurusha roketi (hadi Mars, ya kozi).
Sehemu ya mwisho, kumbukumbu na habari
Reki tumekukusanyia
start_date. Ndiyo, hii tayari ni meme ya ndani. Kupitia hoja kuu ya Doug start_date wote kupita. Kwa kifupi, ikiwa utabainisha ndani start_date tarehe ya sasa, na schedule_interval - siku moja, basi DAG itaanza kesho hakuna mapema.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Na hakuna matatizo zaidi.
Kuna hitilafu nyingine ya wakati wa utekelezaji inayohusishwa nayo: Task is missing the start_date parameter, ambayo mara nyingi inaonyesha kuwa umesahau kumfunga dag operator.
Kila kitu kwenye mashine moja. Ndio, na besi (Mtiririko wa hewa yenyewe na mipako yetu), na seva ya wavuti, na mpangilio, na wafanyikazi. Na hata ilifanya kazi. Lakini baada ya muda, idadi ya kazi za huduma ilikua, na wakati PostgreSQL ilianza kujibu index katika 20 s badala ya 5 ms, tuliichukua na kuichukua.
Mtekelezaji wa Mitaa. Ndiyo, bado tumeketi juu yake, na tayari tumefika kwenye makali ya kuzimu. LocalExecutor imetutosha hadi sasa, lakini sasa ni wakati wa kupanua na angalau mfanyakazi mmoja, na itabidi tufanye bidii kuhamia CeleryExecutor. Na kwa kuzingatia ukweli kwamba unaweza kufanya kazi nayo kwenye mashine moja, hakuna kinachokuzuia kutumia Celery hata kwenye seva, ambayo "bila shaka, haitaingia katika uzalishaji, kwa uaminifu!"
Kutotumia zana zilizojengwa:
Connections kuhifadhi hati za huduma,
SLA Inakosa kujibu kazi ambazo hazifanyi kazi kwa wakati,
xcom kwa kubadilishana metadata (nilisema metadata!) kati ya kazi za dag.
Matumizi mabaya ya barua. Naam, naweza kusema nini? Arifa ziliwekwa kwa marudio yote ya kazi zilizoanguka. Sasa Gmail ya kazini ina >barua pepe 90k kutoka Airflow, na kificho cha barua pepe ya wavuti kinakataa kuchukua na kufuta zaidi ya 100 kwa wakati mmoja.
Ili tufanye kazi zaidi kwa vichwa vyetu na sio kwa mikono yetu, Airflow imetuandalia haya:
API YA REST - bado ana hali ya Majaribio, ambayo haimzuii kufanya kazi. Pamoja nayo, huwezi kupata tu habari kuhusu dags na kazi, lakini pia kuacha / kuanza dag, kuunda DAG Run au bwawa.
CLI - zana nyingi zinapatikana kupitia mstari wa amri ambazo sio tu zisizofaa kutumia kupitia WebUI, lakini kwa ujumla hazipo. Kwa mfano:
backfill inahitajika kuanzisha upya matukio ya kazi.
Kwa mfano, wachambuzi walikuja na kusema: "Na wewe, rafiki, una upuuzi katika data kutoka Januari 1 hadi 13! Irekebishe, irekebishe, irekebishe, irekebishe!" Na wewe ni hobi kama hii:
Huduma ya msingi: initdb, resetdb, upgradedb, checkdb.
run, ambayo hukuruhusu kuendesha kazi ya mfano mmoja, na hata alama kwenye utegemezi wote. Kwa kuongeza, unaweza kuiendesha kupitia LocalExecutor, hata kama una kikundi cha Celery.
Inafanya kitu sawa test, pia katika misingi haiandiki chochote.
connections inaruhusu uundaji wa wingi wa miunganisho kutoka kwa ganda.
API ya chatu - njia ngumu ya kuingiliana, ambayo imekusudiwa kwa programu-jalizi, na sio kuingia ndani yake kwa mikono kidogo. Lakini ni nani wa kutuzuia kwenda /home/airflow/dags, kukimbia ipython na kuanza kufanya fujo? Unaweza, kwa mfano, kuuza nje miunganisho yote na nambari ifuatayo:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Inaunganisha kwenye metadatabase ya Airflow. Siipendekezi kuiandikia, lakini kupata hali za kazi kwa metriki anuwai maalum inaweza kuwa haraka na rahisi zaidi kuliko kupitia API zozote.
Wacha tuseme kwamba sio kazi zetu zote hazina uwezo, lakini wakati mwingine zinaweza kuanguka, na hii ni kawaida. Lakini blockages chache tayari ni tuhuma, na itakuwa muhimu kuangalia.
Jihadharini na SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
marejeo
Na bila shaka, viungo kumi vya kwanza kutoka kwa utoaji wa Google ni yaliyomo kwenye folda ya Airflow kutoka kwa alamisho zangu.
Hati za Apache Airflow - bila shaka, lazima tuanze na ofisi. nyaraka, lakini ni nani anayesoma maagizo?
Best Practices - Naam, angalau soma mapendekezo kutoka kwa waumbaji.
Zen ya Python na Apache Airflow - usambazaji kamili wa DAG, utupaji wa muktadha katika utendaji, tena kuhusu vitegemezi, na pia kuhusu kuruka uzinduzi wa kazi.