Sveiki, esu Dmitrijus Logvinenko – įmonių grupės „Vezet“ analitikos skyriaus duomenų inžinierius.
Papasakosiu apie nuostabų ETL procesų kūrimo įrankį – Apache Airflow. Tačiau „Airflow“ yra toks universalus ir daugialypis, kad turėtumėte į jį atidžiau pažvelgti, net jei nesate susiję su duomenų srautais, tačiau turite periodiškai paleisti bet kokius procesus ir stebėti jų vykdymą.
Ir taip, aš ne tik pasakysiu, bet ir parodysiu: programoje yra daug kodo, ekrano kopijų ir rekomendacijų.
Tai, ką paprastai matote, kai „Google“ ieškote žodį „Airflow“ / „Wikimedia Commons“.
- tik geriau, ir jis buvo pagamintas visai kitiems tikslams, būtent (kaip parašyta prieš katą):
vykdyti ir stebėti užduotis neribotame skaičiuje mašinų (kiek leis daug Salierų / Kubernetes ir jūsų sąžinė)
su dinamine darbo eigos generavimu iš labai lengvai rašomo ir suprantamo Python kodo
ir galimybė sujungti bet kokias duomenų bazes ir API tarpusavyje naudojant paruoštus komponentus ir namuose sukurtus įskiepius (o tai labai paprasta).
Mes naudojame „Apache Airflow“ taip:
renkame duomenis iš įvairių šaltinių (daug SQL Server ir PostgreSQL egzempliorių, įvairių API su taikomųjų programų metrika, net 1C) DWH ir ODS (turime Vertica ir Clickhouse).
kiek pažengęs cron, kuris pradeda duomenų konsolidavimo procesus ODS, taip pat stebi jų priežiūrą.
Dar visai neseniai mūsų poreikius tenkino vienas mažas serveris su 32 branduoliais ir 50 GB RAM. Oro sraute tai veikia:
daugiau 200 dagų (iš tikrųjų darbo eigos, į kurias įdėjome užduotis),
kiekviename vidutiniškai 70 užduočių,
šis gėris prasideda (taip pat vidutiniškai) kartą per valandą.
O apie tai, kaip išsiplėtėme, parašysiu žemiau, bet dabar apibrėžkime über problemą, kurią išspręsime:
Yra trys originalūs SQL serveriai, kurių kiekviename yra 50 duomenų bazių - atitinkamai vieno projekto egzemplioriai, jie turi tą pačią struktūrą (beveik visur, mua-ha-ha), o tai reiškia, kad kiekvienas turi užsakymų lentelę (laimei, lentelę su tokia). pavadinimas gali būti įtrauktas į bet kurį verslą). Duomenis paimame pridėdami paslaugų laukus (šaltinio serveris, šaltinio duomenų bazė, ETL užduoties ID) ir naiviai metame į, tarkime, Vertica.
Eikime!
Pagrindinė dalis, praktinė (ir šiek tiek teorinė)
Kodėl mes (ir jūs)
Kai medžiai buvo dideli, o aš paprastas SQL-schik vienoje Rusijos mažmeninėje prekyboje apgaudinėjome ETL procesus, dar vadinamus duomenų srautais, naudodami du mums prieinamus įrankius:
Informatikos energijos centras - itin plinta sistema, itin produktyvi, su savo technine įranga, savo versijomis. Aš panaudojau 1% jos galimybių, neduok Dieve. Kodėl? Na, visų pirma, ši sąsaja, kilusi iš 380-ųjų, darė mums psichikos spaudimą. Antra, šis įtaisas skirtas itin įmantriam procesui, įnirtingam komponentų pakartotiniam naudojimui ir kitiems labai svarbiems įmonės triukams. Apie tai, kiek kainuoja, pavyzdžiui, „Airbus AXNUMX“ sparnas per metus, nieko nesakysime.
Atsargiai, ekrano kopija gali šiek tiek pakenkti jaunesniems nei 30 metų žmonėms
SQL serverio integravimo serveris - naudojome šį draugą savo projektų srautuose. Na, iš tikrųjų: mes jau naudojame SQL Server, ir būtų kažkaip neprotinga nenaudoti jo ETL įrankių. Viskas jame gerai: ir sąsaja graži, ir pažangos ataskaitos... Bet ne dėl to mes mėgstame programinės įrangos produktus, o, ne dėl to. Versija tai dtsx (tai yra XML, kai mazgai sumaišomi išsaugant) galime, bet kokia prasmė? O kaip sukurti užduočių paketą, kuris nutemps šimtus lentelių iš vieno serverio į kitą? Taip, koks šimtas, rodomasis pirštas nukris nuo dvidešimties gabaliukų, spustelėjus pelės mygtuką. Bet tikrai atrodo madingiau:
Tikrai ieškojome išeičių. Atvejis net beveik atėjo į savarankiškai parašytą SSIS paketų generatorių ...
…ir tada mane surado naujas darbas. Ir Apache Airflow mane aplenkė.
Kai sužinojau, kad ETL procesų aprašymai yra paprastas Python kodas, aš tiesiog nešokau iš džiaugsmo. Taip duomenų srautai buvo versijuojami ir diferencijuojami, o vienos struktūros lentelių suliejimas iš šimtų duomenų bazių į vieną taikinį tapo Python kodo reikalu pusantro ar dviejuose 13 colių ekranuose.
Klasterio surinkimas
Netvarkykime visiškai vaikų darželio ir nekalbėkime čia apie visiškai akivaizdžius dalykus, tokius kaip Airflow įdiegimas, jūsų pasirinkta duomenų bazė, Salierai ir kiti doke aprašyti atvejai.
Kad galėtume nedelsiant pradėti eksperimentus, nubraižiau eskizą docker-compose.yml kuriame:
Iš tikrųjų pakelkime Oro srautas: planuoklis, žiniatinklio serveris. Gėlė taip pat suksis ten, kad stebėtų salierų užduotis (nes ji jau buvo įstumta apache/airflow:1.10.10-python3.7, bet mes neprieštaraujame)
PostgreSQL, kuriame Airflow įrašys savo paslaugų informaciją (planuotojo duomenis, vykdymo statistiką ir kt.), o Celery pažymės atliktas užduotis;
Redis, kuri veiks kaip „Selery“ užduočių tarpininkas;
Salierų darbuotojas, kuri užsiims tiesioginiu užduočių vykdymu.
Į aplanką ./dags mes pridėsime savo failus su dags aprašymu. Jie bus paimti skrendant, todėl po kiekvieno čiaudėjimo nereikia žongliruoti visa krūva.
Kai kur pavyzdžiuose esantis kodas ne iki galo parodytas (kad tekstas nebūtų perkrautas), bet kai kur jis proceso metu modifikuojamas. Išsamių darbo kodų pavyzdžių galima rasti saugykloje https://github.com/dm-logv/airflow-tutorial.
Surinkdama kompoziciją daugiausia rėmiausi gerai žinomu įvaizdžiu puckel / Docker-oro srautas - būtinai patikrink. Galbūt tau gyvenime nieko daugiau ir nereikia.
Visi oro srauto nustatymai pasiekiami ne tik per airflow.cfg, bet ir per aplinkos kintamuosius (ačiū kūrėjams), kuriais piktybiškai pasinaudojau.
Natūralu, kad jis nėra paruoštas gamybai: aš sąmoningai nedėjau širdies plakimo ant konteinerių, nesivarginau dėl saugumo. Bet aš padariau minimumą, tinkantį mūsų eksperimentuotojams.
Prisimink tai:
Aplankas dag turi būti pasiekiamas ir planuotojui, ir darbuotojams.
Tas pats pasakytina apie visas trečiųjų šalių bibliotekas – jos visos turi būti įdiegtos įrenginiuose su planuokliu ir darbuotojais.
Na, dabar viskas paprasta:
$ docker-compose up --scale worker=3
Kai viskas pakyla, galite pažvelgti į žiniatinklio sąsajas:
Jei visuose šiuose „daguose“ nieko nesupratote, čia yra trumpas žodynas:
Tvarkaraštis - svarbiausias dėdė Airflow, kontroliuojantis, kad robotai sunkiai dirbtų, o ne žmogus: stebi tvarkaraštį, atnaujina dagius, paleidžia užduotis.
Apskritai, senesnėse versijose jis turėjo problemų su atmintimi (ne, ne amnezija, o nutekėjimai), o senasis parametras net išliko konfigūracijose run_duration - jo paleidimo iš naujo intervalas. Bet dabar viskas gerai.
DAG (dar žinomas kaip "dag") - "nukreiptas aciklinis grafikas", tačiau toks apibrėžimas pasakys nedaugeliui žmonių, tačiau iš tikrųjų tai yra vienas su kitu sąveikaujančių užduočių konteineris (žr. toliau) arba SSIS paketo ir Informatikos darbo eigos analogas. .
Be dagių, dar gali būti ir subdagų, bet greičiausiai jų nepasieksime.
DAG bėgimas - inicijuotas dag, kuriam priskiriamas savas execution_date. To paties dago dagranai gali dirbti lygiagrečiai (jei savo užduotis padarėte idealias, žinoma).
operatorius yra kodo dalys, atsakingos už konkretaus veiksmo atlikimą. Yra trijų tipų operatoriai:
veiksmaskaip mūsų mėgstamiausia PythonOperator, kuris gali vykdyti bet kokį (galiojantį) Python kodą;
perkėlimas, kurie perkelia duomenis iš vienos vietos į kitą, tarkime, MsSqlToHiveTransfer;
jutiklis kita vertus, tai leis jums reaguoti arba sulėtinti tolesnį dag vykdymą, kol įvyks įvykis. HttpSensor gali patraukti nurodytą galinį tašką, o kai laukia norimas atsakymas, pradėti perkėlimą GoogleCloudStorageToS3Operator. Smalsus protas paklaus: „Kodėl? Juk pakartojimus galite daryti tiesiog operatoriuje! Ir tada, kad neužkimštų užduočių telkinio su sustabdytais operatoriais. Jutiklis įsijungia, patikrina ir užges prieš kitą bandymą.
užduotis - deklaruoti operatoriai, neatsižvelgiant į tipą, ir prijungti prie dag, pakeliami į užduoties rangą.
užduoties pavyzdys - kai generalinis planuotojas nusprendė, kad laikas pasiųsti užduotis į mūšį atlikėjams-darbininkams (iš karto, jei naudosime LocalExecutor arba į nuotolinį mazgą, jei CeleryExecutor), priskiria jiems kontekstą (t. y. kintamųjų rinkinį – vykdymo parametrus), išplečia komandų ar užklausų šablonus ir sujungia juos.
Mes generuojame užduotis
Iš pradžių apibūdinkime bendrą mūsų tešlos schemą, o tada vis labiau pasinersime į smulkmenas, nes taikome keletą nebanalių sprendimų.
Taigi paprasčiausia forma toks dag atrodys taip:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Išsiaiškinkime:
Pirma, importuojame reikiamus libs ir kažkas kito;
sql_server_ds - yra List[namedtuple[str, str]] su jungčių iš Airflow Connections pavadinimais ir duomenų bazėmis, iš kurių paimsime savo plokštelę;
dag - mūsų dienos skelbimas, kuris būtinai turi būti globals(), kitaip Airflow jo neras. Dougas taip pat turi pasakyti:
koks jo vardas orders - tada šis pavadinimas bus rodomas žiniatinklio sąsajoje,
kad dirbs nuo liepos aštuntosios vidurnakčio,
ir jis turėtų veikti maždaug kas 6 valandas (kietiems vaikinams čia vietoj timedelta() leistina cron- linija 0 0 0/6 ? * * *, mažiau šauniems - posakis kaip @daily);
workflow() atliks pagrindinį darbą, bet ne dabar. Kol kas mes tiesiog įtrauksime kontekstą į žurnalą.
O dabar paprasta užduočių kūrimo magija:
bėgame per savo šaltinius;
inicijuoti PythonOperator, kuris įvykdys mūsų manekeną workflow(). Nepamirškite nurodyti unikalaus (dagoje) užduoties pavadinimo ir susieti patį dag. Vėliava provide_context savo ruožtu į funkciją įlies papildomų argumentų, kuriuos naudodami atsargiai rinksime **context.
Kol kas tai viskas. Ką gavome:
naujas dag žiniatinklio sąsajoje,
pusantro šimto užduočių, kurios bus vykdomos lygiagrečiai (jei tai leis Airflow, Celery nustatymai ir serverio talpa).
Na, beveik supratau.
Kas įdiegs priklausomybes?
Kad visa tai būtų supaprastinta, įsukau docker-compose.yml apdorojimas requirements.txt visuose mazguose.
Dabar jo nebėra:
Pilki kvadratai yra planavimo priemonės apdorojami užduočių atvejai.
Šiek tiek palaukiame, užduotis laužo darbininkai:
Žalieji, žinoma, sėkmingai baigė savo darbą. Raudonos spalvos nėra labai sėkmingos.
Beje, mūsų gaminyje nėra aplanko ./dags, nėra sinchronizavimo tarp mašinų - visi dagiai guli git mūsų „Gitlab“, o „Gitlab CI“ platina atnaujinimus įrenginiams, kai susijungia master.
Šiek tiek apie Gėlę
Kol darbininkai tranko mūsų čiulptukus, prisiminkime dar vieną įrankį, galintį mums kažką parodyti – Gėlę.
Pats pirmasis puslapis su suvestinės informacijos apie darbuotojų mazgus:
Intensyviausias puslapis su atliktomis užduotimis:
Pats nuobodžiausias puslapis su mūsų brokerio statusu:
Ryškiausias puslapis yra su užduočių būsenos diagramomis ir jų vykdymo laiku:
Krauname per mažai pakrautą
Taigi, visos užduotys buvo įvykdytos, galite išvežti sužeistuosius.
O sužeistųjų buvo daug – dėl vienokių ar kitokių priežasčių. Teisingai naudojant „Airflow“, šie kvadratai rodo, kad duomenys tikrai nebuvo gauti.
Turite žiūrėti žurnalą ir iš naujo paleisti nukritusias užduotis.
Spustelėję bet kurį kvadratą pamatysime mums galimus veiksmus:
Galite paimti ir padaryti Išvalyti kritusius. Tai yra, pamirštame, kad ten kažkas nepavyko, ir ta pati egzemplioriaus užduotis atiteks planuokliui.
Akivaizdu, kad tai daryti su pele su visais raudonais kvadratėliais nėra labai humaniška – iš Airflow to nesitikime. Natūralu, kad turime masinio naikinimo ginklų: Browse/Task Instances
Pasirinkime viską iš karto ir iš naujo nustatykime į nulį, spustelėkite tinkamą elementą:
Po valymo mūsų taksi atrodo taip (jau laukia, kol juos suplanuos tvarkaraštis):
Jungtys, kabliukai ir kiti kintamieji
Atėjo laikas pažvelgti į kitą DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Ar visi kada nors atnaujino ataskaitą? Tai vėl ji: yra šaltinių, iš kurių gauti duomenis, sąrašas; yra sąrašas, kur įdėti; nepamirškite pagirti, kai viskas atsitiko ar sugedo (na, čia ne apie mus, ne).
Dar kartą peržiūrėkime failą ir pažvelkime į naujus neaiškius dalykus:
from commons.operators import TelegramBotSendMessage - niekas netrukdo mums pasidaryti savo operatorių, kuriais pasinaudojome sukūrę nedidelį įpakavimą žinutėms siųsti į Unblocked. (Apie šį operatorių plačiau pakalbėsime žemiau);
default_args={} - dag gali paskirstyti tuos pačius argumentus visiems savo operatoriams;
to='{{ var.value.all_the_kings_men }}' - laukas to neturėsime koduotų, o dinamiškai sugeneruotų naudojant Jinja ir kintamąjį su el. laiškų sąrašu, kurį atsargiai įdėjau Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — operatoriaus paleidimo sąlyga. Mūsų atveju laiškas nuskris į viršininkus tik tuo atveju, jei visos priklausomybės išsispręs sėkmingai;
tg_bot_conn_id='tg_main' - argumentai conn_id priimti prisijungimo ID, kuriuos sukuriame Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - žinutės „Telegram“ nuskris tik tuo atveju, jei bus užduočių;
task_concurrency=1 - Draudžiame vienu metu paleisti kelis vienos užduoties užduočių atvejus. Priešingu atveju vienu metu bus paleista keletas VerticaOperator (žiūri į vieną stalą);
report_update >> [email, tg] - viskas VerticaOperator susilieja siunčiant laiškus ir žinutes, pavyzdžiui:
Bet kadangi pranešėjų operatoriai turi skirtingas paleidimo sąlygas, veiks tik viena. Medžio rodinyje viskas atrodo šiek tiek mažiau vizualiai:
Pasakysiu keletą žodžių apie makrokomandas ir jų draugai - kintamieji.
Makrokomandos yra Jinja vietos žymekliai, kurie gali pakeisti įvairią naudingą informaciją į operatoriaus argumentus. Pavyzdžiui, taip:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} išsiplės iki kontekstinio kintamojo turinio execution_date formatu YYYY-MM-DD: 2020-07-14. Geriausia yra tai, kad konteksto kintamieji pririšami prie konkrečios užduoties egzemplioriaus (kvadratas medžio rodinyje), o paleidus iš naujo, rezervuotos vietos išsiplės iki tų pačių verčių.
Priskirtas reikšmes galima peržiūrėti naudojant kiekvieno užduoties egzemplioriaus mygtuką Pateiktas. Štai kaip atliekama laiško siuntimo užduotis:
Ir taip atliekant užduotį su žinutės siuntimu:
Visą naujausios versijos integruotų makrokomandų sąrašą rasite čia: makrokomandų nuoroda
Be to, naudodami papildinius galime deklaruoti savo makrokomandas, bet tai jau kita istorija.
Be iš anksto nustatytų dalykų, galime pakeisti savo kintamųjų reikšmes (aš tai jau naudojau aukščiau esančiame kode). Kurkime Admin/Variables pora dalykų:
tiesiog naudokite kelią į norimą raktą: {{ var.json.bot_config.bot.token }}.
Pažodžiui pasakysiu vieną žodį ir parodysiu vieną ekrano kopiją apie jungtys. Viskas čia elementaru: puslapyje Admin/Connections sukuriame ryšį, pridedame savo prisijungimus / slaptažodžius ir konkretesnius parametrus. Kaip šitas:
Slaptažodžiai gali būti užšifruoti (labiau nei numatytasis) arba galite nenurodyti ryšio tipo (kaip aš padariau tg_main) - faktas yra tas, kad tipų sąrašas yra prijungtas prie „Airflow“ modelių ir negali būti išplėstas neįsigilinus į šaltinio kodus (jei staiga ko nors nepastebėjau „Google“, pataisykite mane), bet niekas netrukdys mums gauti kreditų. vardas.
Taip pat galite užmegzti keletą jungčių tuo pačiu pavadinimu: šiuo atveju metodas BaseHook.get_connection(), kuris suteikia mums ryšius pagal pavadinimą, duos atsitiktinis iš kelių bendravardių (logiškiau būtų padaryti Round Robin, bet palikime tai ant Airflow kūrėjų sąžinės).
Kintamieji ir jungtys tikrai yra šaunūs įrankiai, tačiau svarbu neprarasti pusiausvyros: kurias srautų dalis saugote pačiame kode, o kurias atiduodate saugoti „Airflow“. Viena vertus, per vartotojo sąsają gali būti patogu greitai pakeisti vertę, pavyzdžiui, pašto dėžutės. Kita vertus, tai vis tiek yra grįžimas prie pelės paspaudimo, nuo kurio mes (aš) norėjome atsikratyti.
Darbas su ryšiais yra viena iš užduočių kabliukai. Apskritai „Airflow“ kabliukai yra taškai, skirti prijungti jį prie trečiųjų šalių paslaugų ir bibliotekų. Pvz., JiraHook atidarys klientą, kad galėtume bendrauti su Jira (galite perkelti užduotis pirmyn ir atgal), ir padedant SambaHook galite nusiųsti vietinį failą smb-taškas.
Pasirinktinis operatorius analizuojamas
Ir mes priartėjome prie to, kad pamatytume, kaip jis pagamintas TelegramBotSendMessage
Kodas commons/operators.py su tikruoju operatoriumi:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Čia, kaip ir visa kita Airflow, viskas labai paprasta:
Paveldėjo iš BaseOperator, kuriame įgyvendinama nemažai oro srautui būdingų dalykų (pažiūrėkite į savo laisvalaikį)
Deklaruoti laukai template_fields, kuriame Jinja ieškos apdorotų makrokomandų.
Surinko tinkamus argumentus už __init__(), kur reikia, nustatykite numatytuosius nustatymus.
Nepamiršome ir protėvio inicijavimo.
Atidarė atitinkamą kabliuką TelegramBotHookiš jos gavo kliento objektą.
Nepaisytas (iš naujo apibrėžtas) metodas BaseOperator.execute(), kurį Airfow trūkčios, kai ateis laikas paleisti operatorių – jame įgyvendinsime pagrindinį veiksmą, pamiršę prisijungti. (Beje, mes prisijungiame iškart stdout и stderr - Oro srautas viską sulaikys, gražiai apvynios, suskaidys, kur reikia.)
Pažiūrėkime, ką turime commons/hooks.py. Pirmoji failo dalis su pačiu kabliuku:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Net nežinau, ką čia paaiškinti, tik atkreipsiu dėmesį į svarbius dalykus:
Mes paveldime, galvojame apie argumentus - daugeliu atvejų tai bus vienas: conn_id;
Standartinių metodų nepaisymas: aš apsiribojau get_conn(), kuriame aš gaunu ryšio parametrus pagal pavadinimą ir tiesiog gaunu skyrių extra (tai yra JSON laukas), kuriame aš (pagal savo instrukcijas!) įdėjau Telegram boto prieigos raktą: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Kuriu mūsų pavyzdį TelegramBot, suteikiant jai konkretų prieigos raktą.
Tai viskas. Galite gauti klientą iš kablio naudodami TelegramBotHook().clent arba TelegramBotHook().get_conn().
Ir antroji failo dalis, kurioje darau „Telegram REST API“ mikroįvyniojimą, kad netempčiau to paties python-telegram-bot vienam metodui sendMessage.
Teisingas būdas yra viską sudėti: TelegramBotSendMessage, TelegramBotHook, TelegramBot - įskiepyje įdėkite į viešą saugyklą ir suteikite ją atvirajam šaltiniui.
Kol visa tai studijavome, mūsų ataskaitų atnaujinimai sėkmingai nepavyko ir atsiuntė man kanale klaidos pranešimą. Einu paziureti ar negerai...
Kažkas sugedo mūsų šunelyje! Ar ne to mes tikėjomės? tiksliai!
Ar ketini pilti?
Ar jauti, kad kažką praleidau? Atrodo, kad jis pažadėjo perkelti duomenis iš SQL serverio į Vertica, o tada ėmė ir nukrypo nuo temos, niekšas!
Šis žiaurumas buvo tyčinis, aš tiesiog turėjau jums iššifruoti tam tikrą terminiją. Dabar galite eiti toliau.
Mūsų planas buvo toks:
Do dag
Generuokite užduotis
Pažiūrėkite, kaip viskas gražu
Priskirkite užpildams seansų numerius
Gaukite duomenis iš SQL serverio
Įdėkite duomenis į Vertica
Rinkti statistiką
Taigi, kad visa tai pradėtų veikti, aš padariau nedidelį mūsų papildymą docker-compose.yml:
Vertica kaip šeimininkas dwh su labiausiai numatytais nustatymais,
trys SQL serverio egzemplioriai,
Pastarosiose esančias duomenų bazes užpildome kai kuriais duomenimis (jokiu būdu nesižvalgykite mssql_init.py!)
Visą gėrį paleidžiame naudodami šiek tiek sudėtingesnę komandą nei praėjusį kartą:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Tai, ką sukūrė mūsų stebuklingas atsitiktinių imčių įrankis, galite naudoti elementą Data Profiling/Ad Hoc Query:
Svarbiausia to nerodyti analitikams
detalizuoti ETL sesijos Nedarysiu, ten viskas yra nereikšminga: padarome pagrindą, jame yra ženklas, viską apvyniojame konteksto tvarkykle, o dabar darome taip:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Atėjo laikas rinkti mūsų duomenis nuo mūsų pusantro šimto stalų. Padarykime tai naudodami labai nepretenzingus eilutes:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Kabliuko pagalba gauname iš Airflow pymssql-Prisijungti
Užklausoje pakeiskime apribojimą datos forma – šablono variklis jį įmes į funkciją.
Pateikiame mūsų prašymą pandaskas mus gaus DataFrame – tai mums pravers ateityje.
Aš naudoju pakaitalą {dt} vietoj užklausos parametro %s ne todėl, kad esu piktasis Pinokis, o todėl pandas negali susitvarkyti pymssql ir paslysta paskutinis params: Listnors jis tikrai nori tuple.
Taip pat atkreipkite dėmesį, kad kūrėjas pymssql nusprendė jo neberemti, ir laikas išsikraustyti pyodbc.
Pažiūrėkime, kuo „Airflow“ užpildė mūsų funkcijų argumentus:
Jei nėra duomenų, nėra prasmės tęsti. Tačiau taip pat keista laikyti įdarą sėkmingu. Bet tai nėra klaida. A-ah-ah, ką daryti?! Ir štai kas:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException pasakys Airflow, kad klaidų nėra, bet mes praleidžiame užduotį. Sąsaja turės ne žalią ar raudoną kvadratą, o rausvą.
- Na, - tarė pelytė, - ar ne dabar
Ar tu įsitikinęs, kad aš esu baisiausias gyvūnas miške?
Julia Donaldson, Grufalas
Manau, jei su kolegomis konkuruotume: kas greitai sukurs ir pradės ETL procesą nuo nulio: jie su savo SSIS ir pele, o aš su Airflow... Ir tada dar palygintume priežiūros paprastumą... Oho, manau, sutiksite, kad aš juos įveiksiu visuose frontuose!
Jei šiek tiek rimčiau, tai Apache Airflow - aprašydamas procesus programos kodo forma - atliko mano darbą daug patogiau ir maloniau.
Jo neribotas išplėtimas tiek papildinių, tiek polinkio į mastelį atžvilgiu suteikia galimybę naudoti „Airflow“ beveik bet kurioje srityje: net per visą duomenų rinkimo, ruošimo ir apdorojimo ciklą, net paleidžiant raketas (į Marsą, kursas).
Galutinė dalis, nuoroda ir informacija
Grėblis, kurį surinkome jums
start_date. Taip, tai jau vietinis memas. Pagrindinis Dougo argumentas start_date Visi išlaikyti. Trumpai, jei nurodysite start_date dabartinė data ir schedule_interval - vieną dieną, tada DAG prasidės rytoj ne anksčiau.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Ir daugiau jokių problemų.
Su juo susijusi kita vykdymo laiko klaida: Task is missing the start_date parameter, o tai dažniausiai rodo, kad pamiršote susieti su dag operatoriumi.
Viskas vienoje mašinoje. Taip, ir bazės (pats „Airflow“ ir mūsų danga), ir žiniatinklio serveris, ir planuotojas, ir darbuotojai. Ir netgi pavyko. Tačiau laikui bėgant paslaugų užduočių skaičius augo ir kai PostgreSQL pradėjo reaguoti į indeksą per 20 s, o ne per 5 ms, mes jį paėmėme ir nunešėme.
Vietinis vykdytojas. Taip, mes vis dar sėdime ant jo ir jau priėjome prie bedugnės krašto. „LocalExecutor“ mums iki šiol pakako, bet dabar laikas plėstis bent vienu darbuotoju, o pereiti prie „CeleryExecutor“ turėsime sunkiai dirbti. Ir atsižvelgiant į tai, kad galite dirbti su juo viename įrenginyje, niekas netrukdo jums naudoti „Sellery“ net serveryje, kuris „žinoma, niekada nebus pradėtas gaminti, sąžiningai!
Nenaudojimo įmontuoti įrankiai:
Jungtys saugoti paslaugų kredencialus,
SLA Misses reaguoti į užduotis, kurios nepavyko laiku,
xcom metaduomenų mainams (sakiau metaduomenis!) tarp dag užduočių.
Pašto piktnaudžiavimas. Na, ką aš galiu pasakyti? Buvo nustatyti įspėjimai apie visus kritusių užduočių pasikartojimus. Dabar mano darbo „Gmail“ turi daugiau nei 90 100 el. laiškų iš „Airflow“, o žiniatinklio pašto snukis atsisako pasiimti ir ištrinti daugiau nei XNUMX laiškų vienu metu.
Tam, kad dar daugiau dirbtume galva, o ne rankomis, Airflow mums paruošė štai ką:
POILSIO API - jis vis dar turi Eksperimento statusą, kuris jam netrukdo dirbti. Su juo galite ne tik gauti informacijos apie dags ir užduotis, bet ir sustabdyti/paleisti dag, sukurti DAG Run ar baseiną.
CLI - Komandinėje eilutėje yra daug įrankių, kuriuos ne tik nepatogu naudoti naudojant WebUI, bet ir apskritai jų nėra. Pavyzdžiui:
backfill reikalingas norint iš naujo paleisti užduočių egzempliorius.
Pavyzdžiui, atėjo analitikai ir sako: „O tu, drauge, turi nesąmonių sausio 1–13 d. Pataisyk, pataisyk, pataisyk, pataisyk! O tu tokia kaitlentė:
Pagrindinė paslauga: initdb, resetdb, upgradedb, checkdb.
run, kuri leidžia vykdyti vieną egzemplioriaus užduotį ir netgi įvertinti visas priklausomybes. Be to, galite jį paleisti per LocalExecutor, net jei turite salierų grupę.
Veikia beveik tą patį test, tik taip pat bazėse nieko nerašo.
connections leidžia masiškai kurti ryšius iš apvalkalo.
„Python“ API - gana kietas bendravimo būdas, skirtas įskiepiams, o ne knibždantis jame mažomis rankytėmis. Bet kas mums trukdys eiti /home/airflow/dags, bėk ipython ir pradedi blaškytis? Pavyzdžiui, galite eksportuoti visus ryšius naudodami šį kodą:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Prisijungimas prie Airflow metaduomenų bazės. Nerekomenduoju į jį rašyti, tačiau gauti užduočių būsenas įvairioms konkrečioms metrikoms gali būti daug greičiau ir lengviau nei naudojant bet kurią API.
Tarkime, ne visos mūsų užduotys yra idempotentiškos, tačiau kartais jos gali nukristi, ir tai normalu. Bet keli užsikimšimai jau kelia įtarimų, ir reikėtų patikrinti.
Saugokitės SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Nuorodos
Ir, žinoma, pirmosios dešimt nuorodų iš „Google“ išleidimo yra aplanko „Airflow“ turinys iš mano žymių.
Python ir Apache oro srautas Zen - numanomas DAG persiuntimas, konteksto įvedimas į funkcijas, vėlgi apie priklausomybes, taip pat apie užduočių paleidimų praleidimą.