Ahoj, jmenuji se Dmitrij Logvinenko – datový inženýr oddělení analýzy skupiny společností Vezet.
Řeknu vám o úžasném nástroji pro vývoj ETL procesů - Apache Airflow. Airflow je ale tak všestranný a mnohostranný, že byste se na něj měli podívat blíže, i když se nepodílíte na datových tocích, ale potřebujete pravidelně spouštět jakékoli procesy a sledovat jejich provádění.
A ano, nejen řeknu, ale také ukážu: program má spoustu kódu, snímků obrazovky a doporučení.
To, co obvykle uvidíte, když zadáte do googlu slovo Airflow / Wikimedia Commons
- jen lepší a byl vyroben pro úplně jiné účely, a to (jak je psáno před kat):
spouštění a sledování úloh na neomezeném počtu počítačů (jak vám mnoho Celery / Kubernetes a vaše svědomí dovolí)
s dynamickým generováním pracovních postupů od velmi snadno zapisovatelného a srozumitelného kódu Pythonu
a možnost vzájemného propojení libovolných databází a API pomocí hotových komponent i podomácku vyrobených pluginů (což je extrémně jednoduché).
Apache Airflow používáme takto:
shromažďujeme data z různých zdrojů (mnoho SQL Server a PostgreSQL instancí, různá API s aplikačními metrikami, dokonce i 1C) v DWH a ODS (máme Vertica a Clickhouse).
jak pokročilé cron, která spouští procesy konsolidace dat na ODS a zároveň sleduje jejich údržbu.
Donedávna naše potřeby pokrýval jeden malý server s 32 jádry a 50 GB RAM. V Airflow to funguje:
více 200 dagů (ve skutečnosti pracovní postupy, do kterých jsme nacpali úkoly),
v každém v průměru 70 úkolů,
tato dobrota začíná (také průměrně) jednou za hodinu.
A o tom, jak jsme se rozšířili, napíšu níže, ale nyní definujme über-problém, který budeme řešit:
Existují tři původní SQL Servery, každý s 50 databázemi - instancemi jednoho projektu, respektive, mají stejnou strukturu (téměř všude, mua-ha-ha), což znamená, že každý má tabulku Objednávky (naštěstí tabulka s tím jméno lze vložit do jakékoli firmy). Vezmeme data přidáním polí služeb (zdrojový server, zdrojová databáze, ID úlohy ETL) a naivně je hodíme do, řekněme, Vertica.
Pojďme!
Hlavní část, praktická (a trochu teoretická)
Proč my (a vy)
Když byly stromy velké a já byl jednoduchý SQL-schik v jednom ruském maloobchodě jsme podvedli ETL procesy alias datové toky pomocí dvou nástrojů, které máme k dispozici:
Informatica Power Center - extrémně se šířící systém, extrémně produktivní, s vlastním hardwarem, vlastním verzováním. Použil jsem nedej bože 1 % jeho schopností. Proč? No, za prvé, toto rozhraní, někde z roku 380, na nás psychicky tlačilo. Za druhé, toto zařízení je navrženo pro extrémně efektní procesy, zběsilé opětovné použití komponent a další velmi důležité podnikové triky. O tom, co to stojí, jako křídlo Airbusu AXNUMX / rok, nebudeme nic říkat.
Pozor, snímek obrazovky může lidem do 30 let trochu ublížit
SQL Server Integration Server - tohoto soudruha jsme použili v našich vnitroprojektových tocích. No, ve skutečnosti: SQL Server již používáme a bylo by nějak nerozumné nepoužívat jeho ETL nástroje. Všechno v něm je dobré: jak rozhraní je krásné, tak zprávy o pokroku ... Ale to není důvod, proč milujeme softwarové produkty, ach, ne pro tohle. Verze dtsx (což je XML s uzly zamíchanými při uložení) můžeme, ale jaký to má smysl? Co takhle vytvořit balíček úloh, který přetáhne stovky tabulek z jednoho serveru na druhý? Ano, jaká stovka, ukazováček vám odpadne z dvaceti kousků, klikáním na tlačítko myši. Ale rozhodně to vypadá módněji:
Určitě jsme hledali východiska. Případ dokonce téměř přišel k vlastnímu generátoru balíčků SSIS ...
...a pak si mě našla nová práce. A Apache Airflow mě na něm předběhl.
Když jsem zjistil, že popisy ETL procesů jsou jednoduchý Python kód, netancoval jsem radostí. Takto byly datové toky verzovány a rozlišovány a nalévání tabulek s jedinou strukturou ze stovek databází do jednoho cíle se stalo záležitostí kódu Python na jedné a půl nebo dvou 13” obrazovkách.
Sestavení clusteru
Nezařizujeme úplně mateřskou školu a nemluvme zde o zcela samozřejmých věcech, jako je instalace Airflow, vámi zvolené databáze, Celery a dalších případů popsaných v docích.
Abychom mohli okamžitě začít experimentovat, načrtl jsem docker-compose.yml ve kterém:
Pojďme vlastně zvýšit Airflow: Plánovač, Webový server. Květina se tam bude také točit, aby sledovala úkoly s celerem (protože už byl zatlačen do apache/airflow:1.10.10-python3.7, ale to nám nevadí)
PostgreSQL, do kterého bude Airflow zapisovat své servisní informace (data plánovače, statistiky provádění atd.) a Celery bude označovat dokončené úkoly;
Redestilát, který bude fungovat jako zprostředkovatel úkolů pro Celery;
Celer dělník, která se bude zabývat přímým plněním úkolů.
Do složky ./dags přidáme naše soubory s popisem dags. Budou vyzvednuty za chodu, takže není třeba po každém kýchnutí žonglovat s celou hromádkou.
Na některých místech není kód v příkladech zcela ukázán (aby se nezahltil text), někde je však v průběhu upravován. Kompletní příklady pracovního kódu lze nalézt v úložišti https://github.com/dm-logv/airflow-tutorial.
Při sestavování kompozice jsem z velké části vycházel ze známého obrazu puckel/docker-proud vzduchu - určitě se na to podívejte. Možná už k životu nic jiného nepotřebuješ.
Všechna nastavení proudění vzduchu jsou dostupná nejen prostřednictvím airflow.cfg, ale také prostřednictvím proměnných prostředí (díky vývojářům), čehož jsem škodolibě využil.
Přirozeně není připraven na výrobu: Záměrně jsem na kontejnery nedal tepy, neobtěžoval jsem se bezpečností. Ale udělal jsem minimum vhodné pro naše experimentátory.
Všimněte si, že:
Složka dag musí být přístupná plánovači i pracovníkům.
Totéž platí pro všechny knihovny třetích stran – všechny musí být nainstalovány na strojích s plánovačem a pracovníky.
No, teď je to jednoduché:
$ docker-compose up --scale worker=3
Až vše stoupne, můžete se podívat na webová rozhraní:
Pokud jste ze všech těchto „dagů“ ničemu nerozuměli, zde je krátký slovník:
Plánovač - nejdůležitější strýc v Airflow, který kontroluje, že roboti tvrdě pracují, a ne člověk: sleduje plán, aktualizuje dagy, spouští úkoly.
Obecně ve starších verzích měl problémy s pamětí (ne, ne amnézie, ale úniky) a parametr legacy dokonce zůstal v konfiguracích run_duration — jeho interval restartu. Ale teď je vše v pořádku.
DAG (aka "dag") - "směrovaný acyklický graf", ale taková definice řekne málokomu, ale ve skutečnosti je to kontejner pro úkoly, které se vzájemně ovlivňují (viz níže) nebo obdoba Package v SSIS a Workflow v Informatica .
Kromě dagů mohou ještě existovat poddagy, ale k těm se s největší pravděpodobností nedostaneme.
DAG Run - inicializovaný dag, kterému je přiřazen vlastní execution_date. Dagrany stejného dagu mohou fungovat paralelně (pokud jste své úkoly udělali idempotentní, samozřejmě).
Operátor jsou části kódu zodpovědné za provedení konkrétní akce. Existují tři typy operátorů:
akcejako náš oblíbený PythonOperator, který může spustit jakýkoli (platný) kód Pythonu;
transfer, které přenášejí data z místa na místo, řekněme, MsSqlToHiveTransfer;
senzor na druhou stranu vám to umožní reagovat nebo zpomalit další provádění dagu, dokud nenastane událost. HttpSensor může vytáhnout zadaný koncový bod, a když požadovaná odpověď čeká, spustit přenos GoogleCloudStorageToS3Operator. Zvídavá mysl se zeptá: „Proč? Vždyť opakování můžete dělat přímo v operátoru!“ A pak, aby nedošlo k ucpání fondu úkolů pozastavenými operátory. Senzor se spustí, zkontroluje a před dalším pokusem zemře.
Úkol - deklarované operátory bez ohledu na typ a připojené k dag jsou povýšeny do hodnosti úkolu.
instance úkolu - když se generální plánovač rozhodl, že je čas poslat úkoly do bitvy na performerech-pracovnících (přímo na místě, pokud použijeme LocalExecutor nebo do vzdáleného uzlu v případě CeleryExecutor), přiřadí jim kontext (tj. sadu proměnných – parametry provádění), rozšíří šablony příkazů nebo dotazů a sdruží je.
Vytváříme úkoly
Nejprve si nastíníme obecné schéma našeho douga a pak se budeme stále více ponořit do detailů, protože aplikujeme některá netriviální řešení.
Takže ve své nejjednodušší podobě bude takový dag vypadat takto:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Pojďme pochopit:
Nejprve importujeme potřebné knihovny a něco jiného;
sql_server_ds - Je List[namedtuple[str, str]] se jmény spojení z Airflow Connections a databázemi, ze kterých si vezmeme náš talíř;
dag - oznámení našeho dagu, které musí být nutně in globals(), jinak to Airflow nenajde. Doug také musí říct:
jak se jmenuje orders - toto jméno se následně objeví ve webovém rozhraní,
že bude pracovat od půlnoci osmého července,
a mělo by to běžet přibližně každých 6 hodin (pro tvrdé chlapy zde místo timedelta() přípustný cron-čára 0 0 0/6 ? * * *, pro méně cool - výraz jako @daily);
workflow() bude dělat hlavní práci, ale ne teď. Prozatím jen vyklopíme náš kontext do protokolu.
A nyní jednoduché kouzlo vytváření úkolů:
procházíme našimi zdroji;
inicializovat PythonOperator, který popraví naši figurínu workflow(). Nezapomeňte uvést jedinečný (v rámci dag) název úkolu a svázat samotnou dag. Vlajka provide_context na oplátku nalije do funkce další argumenty, které pečlivě shromáždíme pomocí **context.
To je prozatím vše. Co jsme dostali:
nový dag ve webovém rozhraní,
jeden a půl sta úloh, které budou prováděny paralelně (pokud to Airflow, nastavení Celery a kapacita serveru dovolí).
No, skoro jsem to pochopil.
Kdo bude instalovat závislosti?
Abych to celé zjednodušil, podělal jsem to docker-compose.yml zpracovává se requirements.txt na všech uzlech.
Teď je to pryč:
Šedé čtverečky jsou instance úloh zpracované plánovačem.
Chvíli čekáme, úkoly chystají pracovníci:
Zelení samozřejmě své dílo úspěšně dokončili. Červeným se moc nedaří.
Mimochodem, na našem prod ./dags, neexistuje žádná synchronizace mezi stroji - všechny dags leží v git na našem Gitlabu a Gitlab CI distribuuje aktualizace do počítačů při sloučení master.
Něco málo o Květině
Zatímco nám dělnice mlátí dudlíky, vzpomeňme na další nástroj, který nám může něco ukázat – Květ.
Úplně první stránka se souhrnnými informacemi o pracovních uzlech:
Nejintenzivnější stránka s úkoly, které šly do práce:
Nejnudnější stránka se statusem našeho brokera:
Nejjasnější stránka je s grafy stavu úkolů a dobou jejich provedení:
Zatížíme nedostatečně vytížené
Takže všechny úkoly vyšly, můžete odnést zraněné.
A bylo mnoho raněných – z toho či onoho důvodu. V případě správného použití Airflow právě tyto čtverečky naznačují, že data rozhodně nedorazila.
Musíte sledovat protokol a restartovat spadlé instance úloh.
Kliknutím na libovolný čtverec zobrazíme akce, které máme k dispozici:
Můžete vzít a vyčistit padlé. To znamená, že zapomeneme, že tam něco selhalo a stejná úloha instance půjde do plánovače.
Je jasné, že dělat to s myší se všemi červenými čtverečky není moc humánní – tohle od Airflow neočekáváme. Přirozeně máme zbraně hromadného ničení: Browse/Task Instances
Pojďme vybrat vše najednou a resetovat na nulu, kliknout na správnou položku:
Po úklidu vypadají naše taxíky takto (už čekají, až je plánovač naplánuje):
Spojení, háčky a další proměnné
Je čas podívat se na další DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Udělal někdy každý aktualizaci zprávy? Tohle je zase ona: je tam seznam zdrojů, odkud data získat; existuje seznam, kam umístit; nezapomeňte troubit, když se všechno stalo nebo prasklo (no, to není o nás, ne).
Pojďme si soubor znovu projít a podívat se na nové nejasné věci:
from commons.operators import TelegramBotSendMessage - nic nám nebrání ve výrobě vlastních operátorů, čehož jsme využili tím, že jsme vyrobili malý obal pro odesílání zpráv na Unblocked. (O tomto operátorovi si povíme více níže);
default_args={} - dag může distribuovat stejné argumenty všem svým operátorům;
to='{{ var.value.all_the_kings_men }}' - pole to nebudeme mít napevno, ale dynamicky generované pomocí Jinja a proměnné se seznamem e-mailů, které jsem pečlivě vložil Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — podmínka pro spuštění obsluhy. V našem případě dopis poletí k šéfům pouze v případě, že všechny závislosti fungují úspěšně;
tg_bot_conn_id='tg_main' - argumenty conn_id přijímat ID připojení, která vytvoříme Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - zprávy v telegramu odletí, pouze pokud jsou padlé úkoly;
task_concurrency=1 - zakazujeme současné spuštění několika instancí úkolu jednoho úkolu. V opačném případě dosáhneme současného spuštění několika VerticaOperator (při pohledu na jeden stůl);
report_update >> [email, tg] - Všechno VerticaOperator konvergovat v odesílání dopisů a zpráv, jako je toto:
Ale protože operátoři oznamovatelů mají různé podmínky spuštění, bude fungovat pouze jeden. Ve stromovém zobrazení vše vypadá trochu méně vizuálně:
Řeknu pár slov o makra a jejich přátelé - proměnné.
Makra jsou zástupné symboly Jinja, které mohou nahradit různé užitečné informace do argumentů operátorů. Například takto:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} se rozšíří na obsah kontextové proměnné execution_date ve formátu YYYY-MM-DD: 2020-07-14. Nejlepší na tom je, že kontextové proměnné jsou přibity ke konkrétní instanci úkolu (čtverec ve stromovém zobrazení) a po restartu se zástupné symboly rozšíří na stejné hodnoty.
Přiřazené hodnoty lze zobrazit pomocí tlačítka Rendered u každé instance úlohy. Takto probíhá úkol s odesláním dopisu:
A tak u úkolu s odesláním zprávy:
Kompletní seznam vestavěných maker pro nejnovější dostupnou verzi je k dispozici zde: odkaz na makra
Navíc pomocí pluginů můžeme deklarovat vlastní makra, ale to je jiný příběh.
Kromě předdefinovaných věcí můžeme dosadit hodnoty našich proměnných (už jsem to použil v kódu výše). Pojďme tvořit v Admin/Variables pár věcí:
stačí použít cestu k požadovanému klíči: {{ var.json.bot_config.bot.token }}.
Řeknu doslova jedno slovo a ukážu jeden snímek obrazovky připojení. Vše je základní zde: na stránce Admin/Connections vytvoříme připojení, přidáme tam naše přihlašovací jména / hesla a konkrétnější parametry. Takhle:
Hesla mohou být zašifrována (důkladněji než ve výchozím nastavení), nebo můžete vynechat typ připojení (jako jsem to udělal já tg_main) - faktem je, že seznam typů je v modelech Airflow napevno a nelze jej rozšířit bez vstupu do zdrojových kódů (pokud jsem najednou něco nevygoogloval, opravte mě), ale nic nám nebrání získat kredity jen tak název.
Můžete také vytvořit několik připojení se stejným názvem: v tomto případě metoda BaseHook.get_connection(), který nám spojení podle jména dá, dá náhodný od více jmenovců (logičtější by bylo udělat Round Robin, ale nechme to na svědomí vývojářů Airflow).
Proměnné a připojení jsou jistě skvělé nástroje, ale je důležité neztratit rovnováhu: které části vašich toků ukládáte do samotného kódu a které části dáte Airflow k uložení. Na jedné straně může být pohodlné rychle změnit hodnotu, například poštovní schránku, prostřednictvím uživatelského rozhraní. Na druhou stranu se stále jedná o návrat ke klikání myší, kterého jsme se (já) chtěli zbavit.
Práce s připojeními je jedním z úkolů háčky. Obecně jsou háčky Airflow body pro připojení ke službám a knihovnám třetích stran. Např, JiraHook otevře klienta, abychom mohli komunikovat s Jirou (úkoly můžete přesouvat tam a zpět) a pomocí SambaHook můžete odeslat místní soubor smb-směřovat.
Analýza vlastního operátora
A dostali jsme se blízko k tomu, abychom se podívali, jak se to vyrábí TelegramBotSendMessage
Kód commons/operators.py se skutečným operátorem:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Zde, stejně jako všechno ostatní v Airflow, je vše velmi jednoduché:
Zděděno od BaseOperator, která implementuje několik věcí specifických pro proudění vzduchu (podívejte se na svůj volný čas)
Deklarovaná pole template_fields, ve kterém bude Jinja hledat makra ke zpracování.
Uspořádal správné argumenty pro __init__(), v případě potřeby nastavte výchozí hodnoty.
Nezapomněli jsme ani na inicializaci předka.
Otevřel odpovídající háček TelegramBotHookod něj obdržel klientský objekt.
Přepsaná (předefinovaná) metoda BaseOperator.execute(), kterým Airfow cukne, až přijde čas spuštění operátora - v něm implementujeme hlavní akci, zapomenutí se přihlásit. (Mimochodem, hned se přihlásíme stdout и stderr - Proud vzduchu vše zachytí, krásně zabalí, rozloží tam, kde je to nutné.)
Podívejme se, co máme commons/hooks.py. První část souboru se samotným háčkem:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ani nevím, co bych zde měl vysvětlovat, jen upozorním na důležité body:
Dědíme, přemýšlejte o argumentech - ve většině případů to bude jeden: conn_id;
Nadřazené standardní metody: Omezil jsem se get_conn(), ve kterém získám parametry připojení podle názvu a získám pouze sekci extra (toto je pole JSON), do kterého jsem (podle vlastních pokynů!) vložil token robota Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Vytvořím instanci našeho TelegramBot, čímž mu poskytnete konkrétní token.
To je vše. Můžete získat klienta z háku pomocí TelegramBotHook().clent nebo TelegramBotHook().get_conn().
A druhá část souboru, ve které dělám microwrapper pro Telegram REST API, abych nepřetahoval to samé python-telegram-bot pro jednu metodu sendMessage.
Správný způsob je sečíst vše: TelegramBotSendMessage, TelegramBotHook, TelegramBot - v pluginu vložte do veřejného úložiště a dejte jej Open Source.
Zatímco jsme to všechno studovali, naše aktualizace přehledů úspěšně selhaly a poslaly mi chybovou zprávu do kanálu. Jdu zkontrolovat, jestli to není špatně...
Něco se zlomilo v našem doge! Není to to, co jsme čekali? Přesně tak!
Chystáte se nalít?
Máš pocit, že mi něco uniklo? Zdá se, že slíbil, že přenese data z SQL Serveru do Vertica, a pak to vzal a odešel od tématu, ten darebák!
Toto zvěrstvo bylo záměrné, prostě jsem vám musel rozluštit nějakou terminologii. Nyní můžete jít dále.
Náš plán byl tento:
Do dag
Generovat úkoly
Podívejte se, jak je všechno krásné
Přiřaďte výplním čísla relací
Získejte data ze serveru SQL Server
Vložte data do Vertica
Sbírejte statistiky
Abychom to všechno zprovoznili, udělal jsem malý doplněk k našemu docker-compose.yml:
Vertica jako hostitel dwh s nejvíce výchozím nastavením,
tři instance SQL Server,
databáze v posledně jmenovaném naplníme nějakými údaji (v žádném případě se do nich nedívejte mssql_init.py!)
Všechno dobré spouštíme pomocí trochu složitějšího příkazu než minule:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Co náš zázračný randomizér vygeneroval, můžete předmět použít Data Profiling/Ad Hoc Query:
Hlavní věc je neukazovat to analytikům
upřesnit ETL relace Nebudu, všechno je tam triviální: vytvoříme základnu, je v ní znak, vše zabalíme do kontextového manažera a teď uděláme toto:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Nadešel čas shromažďovat naše údaje z našich jeden a půl sta stolů. Udělejme to pomocí velmi nenáročných řádků:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Pomocí háčku získáme z Airflow pymssql-připojit
Dosadíme do požadavku omezení v podobě data - do funkce ho hodí engine šablony.
Krmení naší žádosti pandaskdo nás dostane DataFrame - bude se nám hodit v budoucnu.
Používám substituci {dt} místo parametru požadavku %s ne proto, že jsem zlý Pinocchio, ale protože pandas nezvládá pymssql a proklouzne poslední params: Listi když opravdu chce tuple.
Všimněte si také, že vývojář pymssql rozhodl, že už ho nebude podporovat, a je čas se odstěhovat pyodbc.
Podívejme se, čím Airflow nacpal argumenty našich funkcí:
Pokud nejsou data, nemá smysl pokračovat. Je ale také zvláštní považovat výplň za zdařilou. Ale to není chyba. A-ah-ah, co dělat?! A tady je co:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException říká Airflow, že nejsou žádné chyby, ale úkol přeskočíme. Rozhraní nebude mít zelený nebo červený čtverec, ale růžový.
- No, - řekla malá myška, - není to tak
Jste přesvědčeni, že jsem to nejstrašnější zvíře v lese?
Julia Donaldson, The Gruffalo
Myslím, že kdybychom měli s kolegy soutěž: kdo rychle vytvoří a spustí proces ETL od nuly: oni se svým SSIS a myší a já s Airflow ... A pak bychom také porovnali snadnost údržby ... Páni, myslím, že budete souhlasit, že je porazím na všech frontách!
Pokud trochu vážněji, tak Apache Airflow - popisem procesů ve formě programového kódu - udělal mou práci moc pohodlnější a příjemnější.
Jeho neomezená rozšiřitelnost, a to jak z hlediska zásuvných modulů, tak z hlediska predispozice ke škálovatelnosti, vám dává možnost využít Airflow téměř v jakékoli oblasti: dokonce i v celém cyklu sběru, přípravy a zpracování dat, dokonce i při vypouštění raket (na Mars, chod).
Část finále, odkaz a informace
Hrablo, které jsme pro vás nasbírali
start_date. Ano, toto je již místní meme. Přes Dougův hlavní argument start_date všechny projít. Stručně řečeno, pokud uvedete v start_date aktuální datum a schedule_interval - jeden den, pak DAG začne zítra ne dříve.
start_date = datetime(2020, 7, 7, 0, 1, 2)
A žádné další problémy.
Je s tím spojena další runtime chyba: Task is missing the start_date parameter, což nejčastěji naznačuje, že jste se zapomněli svázat s operátorem dag.
Vše na jednom stroji. Ano, a základny (samotný proud vzduchu a náš nátěr) a webový server, plánovač a pracovníci. A dokonce to fungovalo. Postupem času ale počet úloh pro služby rostl, a když PostgreSQL začal reagovat na index za 20 s místo 5 ms, vzali jsme to a odnesli.
LocalExecutor. Ano, stále na něm sedíme a už jsme došli na okraj propasti. LocalExecutor nám zatím stačil, ale nyní nastal čas rozšířit se alespoň o jednoho pracovníka a přechod na CeleryExecutor budeme muset hodně zapracovat. A vzhledem k tomu, že s ním můžete pracovat na jednom stroji, nic vám nebrání používat Celery i na serveru, který „samozřejmě nikdy nepůjde do výroby, upřímně!“
Nevyužití vestavěné nástroje:
Připojení pro uložení servisních přihlašovacích údajů,
SLA Misses reagovat na úkoly, které se nezdařily včas,
xcom pro výměnu metadat (řekl jsem metadata!) mezi úkoly dag.
Zneužívání pošty. No, co na to říct? Pro všechna opakování padlých úkolů byla nastavena upozornění. Můj pracovní Gmail má nyní více než 90 tisíc e-mailů od Airflow a náhubek webové pošty odmítá vybrat a odstranit více než 100 najednou.
Abychom mohli ještě více pracovat hlavou a ne rukama, Airflow pro nás připravil toto:
REST API - stále má status Experimentální, což mu nebrání v práci. S ním můžete nejen získat informace o dagech a úkolech, ale také zastavit/spustit dag, vytvořit DAG Run nebo pool.
CLI - Prostřednictvím příkazového řádku je k dispozici mnoho nástrojů, které nejsou jen nepohodlné pro použití prostřednictvím WebUI, ale obecně chybí. Například:
backfill potřebné k restartování instancí úloh.
Analytici například přišli a řekli: „A ty máš, soudruhu, nesmysly v datech od 1. do 13. ledna! Opravte to, opravte to, opravte to, opravte to!" A ty jsi takový holub:
Základní servis: initdb, resetdb, upgradedb, checkdb.
run, což vám umožní spustit jednu úlohu instance a dokonce skóre na všech závislostech. Navíc to můžete spustit přes LocalExecutor, i když máte celer cluster.
Dělá skoro to samé test, jen také v základech nic nepíše.
connections umožňuje hromadné vytváření spojů z pláště.
python api - spíše hardcore způsob interakce, který je určen pro pluginy, a ne hemžení se v tom s malými rukama. Ale kdo nám zabrání jít /home/airflow/dagsběh ipython a začít makat? Všechna připojení můžete například exportovat pomocí následujícího kódu:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Připojování k metadatabázi Airflow. Nedoporučuji do něj psát, ale získávání stavů úkolů pro různé konkrétní metriky může být mnohem rychlejší a jednodušší než použití kteréhokoli z API.
Řekněme, že ne všechny naše úkoly jsou idempotentní, ale někdy mohou padat, a to je normální. Pár ucpání už je ale podezřelých a bylo by potřeba to prověřit.
Pozor na SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
reference
A samozřejmě prvních deset odkazů z vydání Google je obsah složky Airflow z mých záložek.