Dobrý deň, volám sa Dmitrij Logvinenko – dátový inžinier oddelenia analýzy skupiny spoločností Vezet.
Poviem vám o úžasnom nástroji na vývoj ETL procesov - Apache Airflow. Airflow je však taký všestranný a mnohostranný, že by ste sa naň mali pozrieť bližšie, aj keď nie ste zapojený do dátových tokov, ale potrebujete pravidelne spúšťať akékoľvek procesy a monitorovať ich vykonávanie.
A áno, nielen poviem, ale aj ukážem: program má veľa kódu, snímok obrazovky a odporúčaní.
To, čo zvyčajne vidíte, keď si vygooglite slovo Airflow / Wikimedia Commons
- len lepšie a bolo vyrobené na úplne iné účely, a to (ako je napísané pred kat):
spúšťanie a monitorovanie úloh na neobmedzenom počte počítačov (koľko Celery / Kubernetes a vaše svedomie vám dovolí)
s dynamickým generovaním pracovného toku od veľmi jednoduchého písania a pochopenia kódu Pythonu
a možnosť vzájomne prepojiť ľubovoľné databázy a API pomocou hotových komponentov aj doma vyrobených pluginov (čo je mimoriadne jednoduché).
Apache Airflow používame takto:
zbierame údaje z rôznych zdrojov (veľa inštancií SQL Server a PostgreSQL, rôzne API s aplikačnými metrikami, dokonca aj 1C) v DWH a ODS (máme Vertica a Clickhouse).
ako pokročilý cron, ktorá spúšťa procesy konsolidácie dát na ODS a zároveň monitoruje ich údržbu.
Donedávna naše potreby pokrýval jeden malý server s 32 jadrami a 50 GB RAM. V Airflow to funguje:
viac 200 dagov (v skutočnosti pracovné postupy, do ktorých sme napchali úlohy),
v každom v priemere 70 úloh,
táto dobrota začína (aj v priemere) raz za hodinu.
A o tom, ako sme sa rozšírili, napíšem nižšie, ale teraz definujme über-problém, ktorý budeme riešiť:
Existujú tri pôvodné SQL Servery, každý s 50 databázami - inštanciami jedného projektu, respektíve, majú rovnakú štruktúru (takmer všade, mua-ha-ha), čo znamená, že každý má tabuľku Objednávky (našťastie tabuľku s tým meno možno vložiť do akéhokoľvek podnikania). Dáta berieme pridaním servisných polí (zdrojový server, zdrojová databáza, ETL task ID) a naivne ich hodíme do, povedzme, Vertica.
Poďme!
Hlavná časť, praktická (a trochu teoretická)
Prečo je to pre nás (a pre vás)
Keď boli stromy veľké a ja som bol jednoduchý SQL-schik v jednom ruskom maloobchode sme podviedli ETL procesy alias dátové toky pomocou dvoch nástrojov, ktoré máme k dispozícii:
Informatica Power Center - extrémne sa šíriaci systém, mimoriadne produktívny, s vlastným hardvérom, vlastným verzovaním. Použil som nedajbože 1% jeho schopností. prečo? No, v prvom rade toto rozhranie, niekde z roku 380, na nás psychicky vyvíjalo tlak. Po druhé, táto mašinka je navrhnutá pre extrémne efektné procesy, zúrivé opätovné použitie komponentov a ďalšie veľmi dôležité podnikové triky. O tom, že to stojí, ako krídlo Airbusu AXNUMX / rok, nepovieme nič.
Pozor, snímka obrazovky môže ľuďom do 30 rokov trochu ublížiť
SQL Server Integration Server - tohto súdruha sme použili v našich vnútroprojektových tokoch. No v skutočnosti: SQL Server už používame a bolo by nerozumné nepoužívať jeho ETL nástroje. Všetko v ňom je dobré: aj rozhranie je krásne, aj správy o pokroku... Ale to nie je dôvod, prečo milujeme softvérové produkty, nie preto. Verzia dtsx (čo je XML s uzlami premiešanými pri ukladaní) môžeme, ale aký to má zmysel? Čo tak vytvoriť balík úloh, ktorý pretiahne stovky tabuliek z jedného servera na druhý? Áno, aká stovka, z dvadsiatich kúskov vám odpadne ukazovák kliknutím na tlačidlo myši. Ale rozhodne to vyzerá módnejšie:
Určite sme hľadali východiská. Prípad dokonca takmer prišiel k samostatne napísanému generátoru balíčkov SSIS ...
...a potom si ma našla nová práca. A Apache Airflow ma na ňom predbehol.
Keď som zistil, že popisy ETL procesov sú jednoduchý Python kód, netancoval som od radosti. Takto sa verzovali a porovnávali dátové toky a nalievanie tabuliek s jedinou štruktúrou zo stoviek databáz do jedného cieľa sa stalo záležitosťou kódu Python na jeden a pol alebo dve 13” obrazovky.
Zostavenie klastra
Nezariaďujme úplne materskú školu a nehovorme tu o úplne samozrejmých veciach, ako je inštalácia Airflow, vami vybranej databázy, zeleru a ďalších prípadov popísaných v dokoch.
Aby sme mohli okamžite začať experimentovať, načrtol som docker-compose.yml v ktorom:
Poďme vlastne zvýšiť Airflow: Plánovač, Webový server. Kvet sa tam bude točiť aj kvôli monitorovaniu úloh zeleru (pretože už bol zatlačený apache/airflow:1.10.10-python3.7, ale nám to nevadí)
PostgreSQL, do ktorého Airflow zapíše svoje servisné informácie (údaje plánovača, štatistiky vykonávania atď.) a Celery označí dokončené úlohy;
Redis, ktorý bude fungovať ako sprostredkovateľ úloh pre Zeler;
Zeler robotník, ktorá sa bude podieľať na priamom plnení úloh.
Do priečinka ./dags pridáme naše súbory s popisom dags. Zoberú sa za chodu, takže po každom kýchnutí nie je potrebné žonglovať s celým zásobníkom.
Na niektorých miestach nie je kód v príkladoch úplne zobrazený (aby sa neprehadzoval text), niekde sa však v procese upravuje. Kompletné príklady pracovného kódu nájdete v úložisku https://github.com/dm-logv/airflow-tutorial.
Pri zostavovaní kompozície som sa vo veľkej miere spoliehal na známy obraz puckel/docker-prúdenie vzduchu - určite si to pozrite. Možno už nič viac k životu nepotrebuješ.
Všetky nastavenia Airflow sú dostupné nielen cez airflow.cfg, ale aj cez premenné prostredia (vďaka vývojárom), čo som zlomyseľne využil.
Prirodzene, nie je pripravený na výrobu: Zámerne som na kontajnery nedal tepy, neobťažoval som sa bezpečnosťou. Ale urobil som minimum vhodné pre našich experimentátorov.
Poznač si to:
Priečinok dag musí byť prístupný plánovačovi aj pracovníkom.
To isté platí pre všetky knižnice tretích strán – všetky musia byť nainštalované na strojoch s plánovačom a pracovníkmi.
No, teraz je to jednoduché:
$ docker-compose up --scale worker=3
Keď sa všetko zdvihne, môžete sa pozrieť na webové rozhrania:
Ak ste zo všetkých týchto „dagov“ ničomu nerozumeli, tu je krátky slovník:
Scheduler - najdôležitejší strýko v Airflow, ktorý kontroluje, či roboty tvrdo pracujú, a nie človek: monitoruje plán, aktualizuje dags, spúšťa úlohy.
Vo všeobecnosti mal v starších verziách problémy s pamäťou (nie, nie amnézia, ale úniky) a parameter legacy dokonca zostal v konfiguráciách run_duration — jeho interval reštartu. Ale teraz je už všetko v poriadku.
DAG (aka "dag") - "riadený acyklický graf", ale takáto definícia povie málokomu, ale v skutočnosti je to kontajner na úlohy, ktoré sa navzájom ovplyvňujú (pozri nižšie) alebo analóg Package v SSIS a Workflow v Informatica .
Okrem dagov môžu ešte existovať poddagy, ale k tým sa s najväčšou pravdepodobnosťou nedostaneme.
DAG Run - inicializovaný dag, ktorému je priradený vlastný execution_date. Dagrany toho istého dagu môžu fungovať paralelne (samozrejme, ak ste svoje úlohy urobili idempotentnými).
operátor sú časti kódu zodpovedné za vykonanie konkrétnej akcie. Existujú tri typy operátorov:
akčnáako náš obľúbený PythonOperator, ktorý dokáže spustiť akýkoľvek (platný) kód Pythonu;
prevod, ktoré prenášajú dáta z miesta na miesto, povedzme, MsSqlToHiveTransfer;
senzor na druhej strane vám umožní zareagovať alebo spomaliť ďalšie vykonávanie dag, kým nedôjde k udalosti. HttpSensor môže stiahnuť zadaný koncový bod a keď počká požadovaná odpoveď, spustiť prenos GoogleCloudStorageToS3Operator. Zvedavá myseľ sa bude pýtať: „Prečo? Veď opakovania môžete robiť priamo v operátorovi!“ A potom, aby nedošlo k upchatiu fondu úloh pozastavenými operátormi. Senzor sa spustí, skontroluje a pred ďalším pokusom sa vypne.
úloha - deklarovaní operátori bez ohľadu na typ a pripojení k dag sú povýšení do hodnosti úlohy.
inštancia úlohy - keď sa generálny plánovač rozhodol, že je čas poslať úlohy do boja na výkonných robotníkoch (priamo na mieste, ak použijeme LocalExecutor alebo do vzdialeného uzla v prípade CeleryExecutor), priradí im kontext (t. j. množinu premenných – parametre vykonávania), rozšíri šablóny príkazov alebo dotazov a združí ich.
Vytvárame úlohy
Najprv si načrtneme všeobecnú schému nášho douga a potom sa budeme stále viac a viac ponoriť do detailov, pretože aplikujeme niektoré netriviálne riešenia.
Takže vo svojej najjednoduchšej podobe bude takýto dag vyzerať takto:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Poďme na to:
Najprv importujeme potrebné knižnice a niečo iné;
sql_server_ds - je List[namedtuple[str, str]] s názvami spojení z Airflow Connections a databázami, z ktorých si vezmeme náš tanier;
dag - oznámenie nášho dagu, ktoré musí byť nevyhnutne in globals(), inak to Airflow nenájde. Doug tiež musí povedať:
aké je jeho meno orders - tento názov sa následne zobrazí vo webovom rozhraní,
že bude pracovať od polnoci ôsmeho júla,
a mal by bežať približne každých 6 hodín (pre tvrdých chlapov tu namiesto timedelta() prípustné cron-linka 0 0 0/6 ? * * *, pre menej cool - výraz ako @daily);
workflow() bude robiť hlavnú prácu, ale nie teraz. Zatiaľ len vyklopíme náš kontext do denníka.
A teraz jednoduché kúzlo vytvárania úloh:
prechádzame cez naše zdroje;
inicializovať PythonOperator, ktorá popraví našu figurínu workflow(). Nezabudnite špecifikovať jedinečný (v rámci dag) názov úlohy a priviazať samotný dag. Vlajka provide_context na oplátku naleje do funkcie ďalšie argumenty, ktoré opatrne zozbierame pomocou **context.
Zatiaľ je to všetko. Čo sme dostali:
nový dag vo webovom rozhraní,
jeden a pol stovky úloh, ktoré sa budú vykonávať paralelne (ak to dovoľuje Airflow, Celery nastavenia a kapacita servera).
No, skoro som to pochopil.
Kto nainštaluje závislosti?
Aby som to celé zjednodušil, posral som sa docker-compose.yml spracovanie requirements.txt na všetkých uzloch.
Teraz je to preč:
Sivé štvorce sú inštancie úloh spracované plánovačom.
Chvíľu počkáme, úlohy vybavia pracovníci:
Zelení, samozrejme, svoje dielo úspešne dokončili. Červeným sa veľmi nedarí.
Mimochodom, na našom prod ./dags, neexistuje synchronizácia medzi strojmi - všetky dags ležia v git na našom Gitlabe a Gitlab CI distribuuje aktualizácie do počítačov pri zlúčení master.
Trochu o Kvete
Kým nám robotníci mlátia cumlíky, spomeňme si na ďalšiu pomôcku, ktorá nám môže niečo ukázať – Kvet.
Úplne prvá stránka so súhrnnými informáciami o pracovných uzloch:
Najintenzívnejšia stránka s úlohami, ktoré šli do práce:
Najnudnejšia stránka so statusom nášho brokera:
Najjasnejšia stránka je s grafmi stavu úloh a časom ich vykonania:
Podťažené zaťažíme
Takže všetky úlohy sa splnili, môžete odniesť zranených.
A bolo veľa zranených - z jedného alebo druhého dôvodu. V prípade správneho použitia Airflow práve tieto štvorčeky naznačujú, že dáta rozhodne nedorazili.
Musíte sledovať denník a reštartovať spadnuté inštancie úloh.
Kliknutím na ktorýkoľvek štvorec sa nám zobrazia akcie, ktoré máme k dispozícii:
Môžete vziať a urobiť Clear padlých. To znamená, že zabudneme, že tam niečo zlyhalo a rovnaká úloha inštancie prejde do plánovača.
Je jasné, že robiť to s myšou so všetkými červenými štvorčekmi nie je veľmi humánne – toto od Airflow neočakávame. Prirodzene, máme zbrane hromadného ničenia: Browse/Task Instances
Vyberme všetko naraz a resetujeme na nulu, klikneme na správnu položku:
Naše taxíky po vyčistení vyzerajú takto (už čakajú, kým ich plánovač naplánuje):
Spojenia, háčiky a iné premenné
Je čas pozrieť sa na ďalší DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Urobil niekedy každý aktualizáciu správy? Toto je opäť ona: je tu zoznam zdrojov, odkiaľ možno získať údaje; existuje zoznam, kam umiestniť; nezabudnite zatrúbiť, keď sa všetko stalo alebo zlomilo (no, toto nie je o nás, nie).
Poďme si znova prejsť súbor a pozrieť sa na nové nejasné veci:
from commons.operators import TelegramBotSendMessage - nič nám nebráni vo výrobe vlastných operátorov, čo sme využili tým, že sme urobili malý obal na posielanie správ na Unblocked. (Viac o tomto operátorovi si povieme nižšie);
default_args={} - dag môže distribuovať rovnaké argumenty všetkým svojim operátorom;
to='{{ var.value.all_the_kings_men }}' - lúka to nebudeme mať napevno, ale dynamicky generované pomocou Jinja a premennej so zoznamom emailov, ktoré som opatrne vložil Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — podmienka pre spustenie operátora. V našom prípade list poletí šéfom iba vtedy, ak budú fungovať všetky závislosti úspešne;
tg_bot_conn_id='tg_main' - argumenty conn_id akceptovať ID pripojenia, ktoré vytvoríme Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - správy v telegrame odletia iba vtedy, ak sú tam padnuté úlohy;
task_concurrency=1 - zakazujeme súčasné spustenie niekoľkých inštancií úloh jednej úlohy. V opačnom prípade získame súčasné spustenie niekoľkých VerticaOperator (pri pohľade na jeden stôl);
report_update >> [email, tg] - všetky VerticaOperator konvergovať v odosielaní listov a správ, ako je toto:
Ale keďže operátori oznamujúcich majú rôzne podmienky spustenia, bude fungovať iba jeden. V stromovom zobrazení všetko vyzerá trochu menej vizuálne:
Poviem pár slov o makrá a ich priatelia - premenné.
Makrá sú zástupné symboly Jinja, ktoré môžu nahradiť rôzne užitočné informácie do argumentov operátorov. Napríklad takto:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} sa rozšíri na obsah kontextovej premennej execution_date vo formáte YYYY-MM-DD: 2020-07-14. Najlepšie na tom je, že kontextové premenné sú pribité ku konkrétnej inštancii úlohy (štvorec v stromovom zobrazení) a po reštartovaní sa zástupné symboly rozšíria na rovnaké hodnoty.
Priradené hodnoty je možné zobraziť pomocou tlačidla Rendered na každej inštancii úlohy. Takto vyzerá úloha s odoslaním listu:
A tak pri úlohe s odoslaním správy:
Kompletný zoznam vstavaných makier pre najnovšiu dostupnú verziu je k dispozícii tu: referencia makier
Navyše pomocou pluginov môžeme deklarovať vlastné makrá, ale to je už iný príbeh.
Okrem preddefinovaných vecí môžeme nahradiť hodnoty našich premenných (už som to použil v kóde vyššie). Poďme tvoriť Admin/Variables par veci:
stačí použiť cestu k požadovanému kľúču: {{ var.json.bot_config.bot.token }}.
Doslova poviem jedno slovo a ukážem jednu snímku obrazovky prípojka. Všetko je tu elementárne: na stránke Admin/Connections vytvoríme spojenie, pridáme tam naše prihlasovacie mená / heslá a konkrétnejšie parametre. Páči sa ti to:
Heslá môžu byť šifrované (dôkladnejšie ako predvolené) alebo môžete vynechať typ pripojenia (ako som to urobil v prípade tg_main) - faktom je, že zoznam typov je v modeloch Airflow pevne zapojený a nedá sa rozšíriť bez toho, aby som sa dostal do zdrojových kódov (ak som zrazu niečo nevygooglil, opravte ma), ale nič nám nezabráni získať kredity len tak názov.
Môžete tiež vytvoriť niekoľko spojení s rovnakým názvom: v tomto prípade metóda BaseHook.get_connection(), ktorý nám dostane spojenia podľa názvu, dá náhodný od viacerých menovcov (logickejšie by bolo spraviť Round Robin, ale nechajme to na svedomí vývojárov Airflow).
Premenné a pripojenia sú určite skvelé nástroje, ale je dôležité nestratiť rovnováhu: ktoré časti vašich tokov ukladáte v samotnom kóde a ktoré časti dáte Airflow na uloženie. Na jednej strane môže byť rýchla zmena hodnoty, napríklad poštovej schránky, pohodlná prostredníctvom používateľského rozhrania. Na druhej strane je to stále návrat ku klikaniu myšou, ktorého sme sa (ja) chceli zbaviť.
Práca s prepojeniami je jednou z úloh háčiky. Vo všeobecnosti sú háčiky Airflow body na pripojenie k službám a knižniciam tretích strán. napr. JiraHook otvorí nám klienta na interakciu s Jirou (úlohy môžete presúvať tam a späť) a pomocou SambaHook môžete odoslať lokálny súbor smb-bod.
Analýza vlastného operátora
A priblížili sme sa k tomu, ako sa to vyrába TelegramBotSendMessage
kód commons/operators.py so skutočným operátorom:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Tu, rovnako ako všetko ostatné v Airflow, je všetko veľmi jednoduché:
Zdedené od BaseOperator, ktorá implementuje pomerne veľa vecí špecifických pre prúdenie vzduchu (pozrite sa na svoj voľný čas)
Deklarované polia template_fields, v ktorom bude Jinja hľadať makrá na spracovanie.
Usporiadal správne argumenty pre __init__(), v prípade potreby nastavte predvolené hodnoty.
Nezabudli sme ani na inicializáciu predka.
Otvorili príslušný hák TelegramBotHookdostala od nej klientský predmet.
Prepísaná (predefinovaná) metóda BaseOperator.execute(), ktorým Airfow škubne, keď príde čas na spustenie operátora - v ňom implementujeme hlavnú akciu, zabudnutie na prihlásenie. (Mimochodom, prihlasujeme sa priamo stdout и stderr - Prúd vzduchu všetko zachytí, krásne zabalí, rozloží tam, kde je to potrebné.)
Pozrime sa, čo máme commons/hooks.py. Prvá časť súboru so samotným háčikom:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ani neviem, čo tu mám vysvetliť, len si všimnem dôležité body:
Zdedíme, premýšľajte o argumentoch - vo väčšine prípadov to bude jeden: conn_id;
Prevažujúce štandardné metódy: Obmedzil som sa get_conn(), v ktorom dostanem parametre pripojenia podľa názvu a dostanem len sekciu extra (toto je pole JSON), do ktorého som (podľa vlastných pokynov!) vložil token telegramového bota: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Vytváram inštanciu nášho TelegramBot, čím jej poskytnete konkrétny token.
To je všetko. Môžete získať klienta z háku pomocou TelegramBotHook().clent alebo TelegramBotHook().get_conn().
A druhá časť súboru, v ktorej robím microwrapper pre Telegram REST API, aby som neťahal to isté python-telegram-bot pre jednu metódu sendMessage.
Správny spôsob je sčítať všetko: TelegramBotSendMessage, TelegramBotHook, TelegramBot - v doplnku vložte do verejného úložiska a dajte ho Open Source.
Kým sme to všetko študovali, naše aktualizácie prehľadov úspešne zlyhali a poslali mi chybové hlásenie do kanála. idem to skontrolovat, ci to nie je zle...
Niečo sa zlomilo v našom doge! Nie je to to, čo sme očakávali? presne tak!
Ideš naliať?
Máš pocit, že mi niečo uniklo? Zdá sa, že sľúbil, že prenesie údaje zo servera SQL Server do Vertica, a potom to vzal a odstúpil od témy, ten darebák!
Toto zverstvo bolo zámerné, jednoducho som vám musel rozlúštiť nejakú terminológiu. Teraz môžete ísť ďalej.
Náš plán bol takýto:
Do dag
Generovať úlohy
Pozrite sa, aké je všetko krásne
Priraďte čísla relácií výplniam
Získajte údaje zo servera SQL Server
Vložte údaje do Vertica
Zbierajte štatistiky
Aby sme to všetko rozbehli, urobil som malý doplnok k nášmu docker-compose.yml:
Vertica ako hostiteľ dwh s najviac predvolenými nastaveniami,
tri inštancie SQL Server,
do databáz v poslednom uvedenom naplníme nejaké údaje (v žiadnom prípade sa nedívajte do mssql_init.py!)
Všetko dobré spúšťame pomocou trochu komplikovanejšieho príkazu ako naposledy:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
To, čo vygeneroval náš zázračný randomizér, môžete použiť Data Profiling/Ad Hoc Query:
Hlavná vec je neukázať to analytikom
rozviesť ETL relácie Nebudem, všetko je tam triviálne: vytvoríme základňu, je v nej znak, všetko zabalíme do kontextového manažéra a teraz urobíme toto:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Čas nadišiel zbierať naše údaje z našich jeden a pol sto stolov. Urobme to pomocou veľmi nenáročných riadkov:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Pomocou háku dostaneme z Airflow pymssql-spojiť
Nahraďte do požiadavky obmedzenie v podobe dátumu - do funkcie ho hodí engine šablón.
Kŕmenie našej žiadosti pandaskto nás dostane DataFrame - bude nám to užitočné v budúcnosti.
Používam substitúciu {dt} namiesto parametra požiadavky %s nie preto, že som zlý Pinocchio, ale preto pandas nezvládne pymssql a pošmykne posledný params: Listhoci veľmi chce tuple.
Upozorňujeme tiež, že vývojár pymssql rozhodol, že ho už nebude podporovať, a je čas sa odsťahovať pyodbc.
Pozrime sa, čím Airflow naplnil argumenty našich funkcií:
Ak nie sú žiadne údaje, nemá zmysel pokračovať. Ale je tiež zvláštne považovať výplň za vydarenú. Ale to nie je chyba. A-ah-ah, čo robiť?! A tu je čo:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException povie Airflow, že neexistujú žiadne chyby, ale úlohu preskočíme. Rozhranie nebude mať zelený alebo červený štvorec, ale ružový.
ID našej záplavovej relácie (bude iné pre každú úlohu),
Hash zo zdroja a ID objednávky - aby sme vo finálnej databáze (kde je všetko naliate do jednej tabuľky) mali jedinečné ID objednávky.
Zostáva predposledný krok: nalejte všetko do Verticy. A napodiv, jeden z najpozoruhodnejších a najefektívnejších spôsobov, ako to dosiahnuť, je prostredníctvom CSV!
- Dobre, - povedala myška, - nie, teraz
Si presvedčený, že som najstrašnejšie zviera v lese?
Julia Donaldson, The Gruffalo
Myslím, že keby sme s kolegami súťažili: kto rýchlo vytvorí a spustí proces ETL od nuly: oni so svojím SSIS a myšou a ja s Airflow ... A potom by sme porovnali aj jednoduchosť údržby ... Wow, myslím, že budete súhlasiť, že ich porazím na všetkých frontoch!
Ak trochu vážnejšie, tak Apache Airflow - popisom procesov vo forme programového kódu - urobil moju prácu viac pohodlnejšie a príjemnejšie.
Jeho neobmedzená rozšíriteľnosť, čo sa týka zásuvných modulov a predispozície k škálovateľnosti, vám dáva možnosť využiť Airflow takmer v akejkoľvek oblasti: dokonca aj v celom cykle zberu, prípravy a spracovania dát, dokonca aj pri štarte rakiet (na Mars, resp. kurz).
Časť záverečná, referencie a informácie
Hrable, ktoré sme pre vás nazbierali
start_date. Áno, toto je už lokálny meme. Via Dougov hlavný argument start_date všetci prejdú. Stručne povedané, ak uvediete v start_date aktuálny dátum a schedule_interval - jeden deň, potom DAG začne zajtra nie skôr.
start_date = datetime(2020, 7, 7, 0, 1, 2)
A už žiadne problémy.
Je s tým spojená ďalšia runtime chyba: Task is missing the start_date parameter, čo najčastejšie naznačuje, že ste sa zabudli naviazať na operátora dag.
Všetko na jednom stroji. Áno, a základne (samotné prúdenie vzduchu a náš náter) a webový server, plánovač a pracovníci. A dokonca to fungovalo. Postupom času však počet úloh pre služby rástol a keď PostgreSQL začal reagovať na index za 20 s namiesto 5 ms, zobrali sme to a odniesli.
LocalExecutor. Áno, stále na ňom sedíme a už sme prišli na okraj priepasti. LocalExecutor nám zatiaľ stačil, no teraz nastal čas na rozšírenie aspoň o jedného pracovníka a na prechod na CeleryExecutor budeme musieť tvrdo pracovať. A vzhľadom na to, že s ním môžete pracovať na jednom stroji, nič vám nebráni používať zeler ani na serveri, ktorý „samozrejme, nikdy nepôjde do výroby, úprimne!“
Nepoužitie vstavané nástroje:
pripojenie na ukladanie servisných poverení,
Miss SLA reagovať na úlohy, ktoré nefungovali včas,
xcom na výmenu metadát (povedal som metadát!) medzi dag úlohami.
Zneužívanie pošty. No, čo môžem povedať? Na všetky opakovania padlých úloh boli nastavené upozornenia. Teraz má môj pracovný Gmail viac ako 90 100 e-mailov od Airflow a náhubok webovej pošty odmieta vyzdvihnúť a odstrániť viac ako XNUMX naraz.
Aby sme mohli ešte viac pracovať hlavou a nie rukami, Airflow si pre nás pripravil toto:
REST API - stále má status Experimental, čo mu však nebráni v práci. S ním môžete nielen získať informácie o dagoch a úlohách, ale aj zastaviť/spustiť dag, vytvoriť DAG Run alebo pool.
CLI - cez príkazový riadok je dostupných veľa nástrojov, ktoré nie sú len nepohodlné na používanie cez WebUI, ale vo všeobecnosti chýbajú. Napríklad:
backfill potrebné na reštartovanie inštancií úloh.
Napríklad analytici prišli a povedali: „A ty, súdruh, máš nezmysly v údajoch od 1. do 13. januára! Opravte to, opravte to, opravte to, opravte to!" A ty si taká varná doska:
Základná služba: initdb, resetdb, upgradedb, checkdb.
run, ktorá vám umožňuje spustiť jednu úlohu inštancie a dokonca skóre na všetkých závislostiach. Navyše ho môžete spustiť cez LocalExecutor, aj keď máte zeler klaster.
Robí skoro to isté test, len take v zakladoch nepise nic.
connections umožňuje hromadné vytváranie spojov zo škrupiny.
python api - pomerne tvrdý spôsob interakcie, ktorý je určený pre pluginy, a nie s malými rukami. Ale kto nám zabráni ísť /home/airflow/dags, beh ipython a začať makať? Môžete napríklad exportovať všetky pripojenia pomocou nasledujúceho kódu:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Pripája sa k metadatabáze Airflow. Neodporúčam do nej písať, ale získanie stavov úloh pre rôzne špecifické metriky môže byť oveľa rýchlejšie a jednoduchšie ako cez ktorékoľvek z API.
Povedzme, že nie všetky naše úlohy sú idempotentné, ale niekedy môžu padnúť, a to je normálne. Ale pár upchávok je už podozrivých a bolo by potrebné skontrolovať.
Pozor na SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
referencie
A samozrejme, prvých desať odkazov z vydania Google je obsah priečinka Airflow z mojich záložiek.