Dobrý deň, volám sa Dmitrij Logvinenko – dátový inžinier oddelenia analýzy skupiny spoločností Vezet.
Poviem vám o úžasnom nástroji na vývoj ETL procesov - Apache Airflow. Airflow je však taký všestranný a mnohostranný, že by ste sa naň mali pozrieť bližšie, aj keď nie ste zapojený do dátových tokov, ale potrebujete pravidelne spúšťať akékoľvek procesy a monitorovať ich vykonávanie.
A áno, nielen poviem, ale aj ukážem: program má veľa kódu, snímok obrazovky a odporúčaní.

To, čo zvyčajne vidíte, keď si vygooglite slovo Airflow / Wikimedia Commons
obsah
Úvod
Apache Airflow je ako Django:
- napísané v pythone
- je tu skvelý admin panel,
- rozširujúce sa donekonečna
- len lepšie a bolo vyrobené na úplne iné účely, a to (ako je napísané pred kat):
- spúšťanie a monitorovanie úloh na neobmedzenom počte počítačov (koľko Celery / Kubernetes a vaše svedomie vám dovolí)
- s dynamickým generovaním pracovného toku od veľmi jednoduchého písania a pochopenia kódu Pythonu
- a možnosť vzájomne prepojiť ľubovoľné databázy a API pomocou hotových komponentov aj doma vyrobených pluginov (čo je mimoriadne jednoduché).
Apache Airflow používame takto:
- zbierame údaje z rôznych zdrojov (veľa inštancií SQL Server a PostgreSQL, rôzne API s aplikačnými metrikami, dokonca aj 1C) v DWH a ODS (máme Vertica a Clickhouse).
- ako pokročilý
cron, ktorá spúšťa procesy konsolidácie dát na ODS a zároveň monitoruje ich údržbu.
Donedávna naše potreby pokrýval jeden malý server s 32 jadrami a 50 GB RAM. V Airflow to funguje:
- viac 200 dagov (v skutočnosti pracovné postupy, do ktorých sme napchali úlohy),
- v každom v priemere 70 úloh,
- táto dobrota začína (aj v priemere) raz za hodinu.
A o tom, ako sme sa rozšírili, napíšem nižšie, ale teraz definujme über-problém, ktorý budeme riešiť:
Existujú tri pôvodné SQL Servery, každý s 50 databázami - inštanciami jedného projektu, respektíve, majú rovnakú štruktúru (takmer všade, mua-ha-ha), čo znamená, že každý má tabuľku Objednávky (našťastie tabuľku s tým meno možno vložiť do akéhokoľvek podnikania). Dáta berieme pridaním servisných polí (zdrojový server, zdrojová databáza, ETL task ID) a naivne ich hodíme do, povedzme, Vertica.
Poďme!
Hlavná časť, praktická (a trochu teoretická)
Prečo je to pre nás (a pre vás)
Keď boli stromy veľké a ja som bol jednoduchý SQL-schik v jednom ruskom maloobchode sme podviedli ETL procesy alias dátové toky pomocou dvoch nástrojov, ktoré máme k dispozícii:
- Informatica Power Center - extrémne sa šíriaci systém, mimoriadne produktívny, s vlastným hardvérom, vlastným verzovaním. Použil som nedajbože 1% jeho schopností. prečo? No, v prvom rade toto rozhranie, niekde z roku 380, na nás psychicky vyvíjalo tlak. Po druhé, táto mašinka je navrhnutá pre extrémne efektné procesy, zúrivé opätovné použitie komponentov a ďalšie veľmi dôležité podnikové triky. O tom, že to stojí, ako krídlo Airbusu AXNUMX / rok, nepovieme nič.
Pozor, snímka obrazovky môže ľuďom do 30 rokov trochu ublížiť

- SQL Server Integration Server - tohto súdruha sme použili v našich vnútroprojektových tokoch. No v skutočnosti: SQL Server už používame a bolo by nerozumné nepoužívať jeho ETL nástroje. Všetko v ňom je dobré: aj rozhranie je krásne, aj správy o pokroku... Ale to nie je dôvod, prečo milujeme softvérové produkty, nie preto. Verzia
dtsx(čo je XML s uzlami premiešanými pri ukladaní) môžeme, ale aký to má zmysel? Čo tak vytvoriť balík úloh, ktorý pretiahne stovky tabuliek z jedného servera na druhý? Áno, aká stovka, z dvadsiatich kúskov vám odpadne ukazovák kliknutím na tlačidlo myši. Ale rozhodne to vyzerá módnejšie:
Určite sme hľadali východiská. Prípad dokonca takmer prišiel k samostatne napísanému generátoru balíčkov SSIS ...
...a potom si ma našla nová práca. A Apache Airflow ma na ňom predbehol.
Keď som zistil, že popisy ETL procesov sú jednoduchý Python kód, netancoval som od radosti. Takto sa verzovali a porovnávali dátové toky a nalievanie tabuliek s jedinou štruktúrou zo stoviek databáz do jedného cieľa sa stalo záležitosťou kódu Python na jeden a pol alebo dve 13” obrazovky.
Zostavenie klastra
Nezariaďujme úplne materskú školu a nehovorme tu o úplne samozrejmých veciach, ako je inštalácia Airflow, vami vybranej databázy, zeleru a ďalších prípadov popísaných v dokoch.
Aby sme mohli okamžite začať experimentovať, načrtol som docker-compose.yml v ktorom:
- Poďme vlastne zvýšiť Airflow: Plánovač, Webový server. Kvet sa tam bude točiť aj kvôli monitorovaniu úloh zeleru (pretože už bol zatlačený
apache/airflow:1.10.10-python3.7, ale nám to nevadí) - PostgreSQL, do ktorého Airflow zapíše svoje servisné informácie (údaje plánovača, štatistiky vykonávania atď.) a Celery označí dokončené úlohy;
- Redis, ktorý bude fungovať ako sprostredkovateľ úloh pre Zeler;
- Zeler robotník, ktorá sa bude podieľať na priamom plnení úloh.
- Do priečinka
./dagspridáme naše súbory s popisom dags. Zoberú sa za chodu, takže po každom kýchnutí nie je potrebné žonglovať s celým zásobníkom.
Na niektorých miestach nie je kód v príkladoch úplne zobrazený (aby sa neprehadzoval text), niekde sa však v procese upravuje. Kompletné príklady pracovného kódu nájdete v úložisku .
prístavný robotník-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerPoznámky:
- Pri zostavovaní kompozície som sa vo veľkej miere spoliehal na známy obraz - určite si to pozrite. Možno už nič viac k životu nepotrebuješ.
- Všetky nastavenia Airflow sú dostupné nielen cez
airflow.cfg, ale aj cez premenné prostredia (vďaka vývojárom), čo som zlomyseľne využil. - Prirodzene, nie je pripravený na výrobu: Zámerne som na kontajnery nedal tepy, neobťažoval som sa bezpečnosťou. Ale urobil som minimum vhodné pre našich experimentátorov.
- Poznač si to:
- Priečinok dag musí byť prístupný plánovačovi aj pracovníkom.
- To isté platí pre všetky knižnice tretích strán – všetky musia byť nainštalované na strojoch s plánovačom a pracovníkmi.
No, teraz je to jednoduché:
$ docker-compose up --scale worker=3Keď sa všetko zdvihne, môžete sa pozrieť na webové rozhrania:
- Prúd vzduchu:
- Kvetina:
Základné pojmy
Ak ste zo všetkých týchto „dagov“ ničomu nerozumeli, tu je krátky slovník:
- Scheduler - najdôležitejší strýko v Airflow, ktorý kontroluje, či roboty tvrdo pracujú, a nie človek: monitoruje plán, aktualizuje dags, spúšťa úlohy.
Vo všeobecnosti mal v starších verziách problémy s pamäťou (nie, nie amnézia, ale úniky) a parameter legacy dokonca zostal v konfiguráciách
run_duration— jeho interval reštartu. Ale teraz je už všetko v poriadku. - DAG (aka "dag") - "riadený acyklický graf", ale takáto definícia povie málokomu, ale v skutočnosti je to kontajner na úlohy, ktoré sa navzájom ovplyvňujú (pozri nižšie) alebo analóg Package v SSIS a Workflow v Informatica .
Okrem dagov môžu ešte existovať poddagy, ale k tým sa s najväčšou pravdepodobnosťou nedostaneme.
- DAG Run - inicializovaný dag, ktorému je priradený vlastný
execution_date. Dagrany toho istého dagu môžu fungovať paralelne (samozrejme, ak ste svoje úlohy urobili idempotentnými). - operátor sú časti kódu zodpovedné za vykonanie konkrétnej akcie. Existujú tri typy operátorov:
- akčnáako náš obľúbený
PythonOperator, ktorý dokáže spustiť akýkoľvek (platný) kód Pythonu; - prevod, ktoré prenášajú dáta z miesta na miesto, povedzme,
MsSqlToHiveTransfer; - senzor na druhej strane vám umožní zareagovať alebo spomaliť ďalšie vykonávanie dag, kým nedôjde k udalosti.
HttpSensormôže stiahnuť zadaný koncový bod a keď počká požadovaná odpoveď, spustiť prenosGoogleCloudStorageToS3Operator. Zvedavá myseľ sa bude pýtať: „Prečo? Veď opakovania môžete robiť priamo v operátorovi!“ A potom, aby nedošlo k upchatiu fondu úloh pozastavenými operátormi. Senzor sa spustí, skontroluje a pred ďalším pokusom sa vypne.
- akčnáako náš obľúbený
- úloha - deklarovaní operátori bez ohľadu na typ a pripojení k dag sú povýšení do hodnosti úlohy.
- inštancia úlohy - keď sa generálny plánovač rozhodol, že je čas poslať úlohy do boja na výkonných robotníkoch (priamo na mieste, ak použijeme
LocalExecutoralebo do vzdialeného uzla v prípadeCeleryExecutor), priradí im kontext (t. j. množinu premenných – parametre vykonávania), rozšíri šablóny príkazov alebo dotazov a združí ich.
Vytvárame úlohy
Najprv si načrtneme všeobecnú schému nášho douga a potom sa budeme stále viac a viac ponoriť do detailov, pretože aplikujeme niektoré netriviálne riešenia.
Takže vo svojej najjednoduchšej podobe bude takýto dag vyzerať takto:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Poďme na to:
- Najprv importujeme potrebné knižnice a niečo iné;
sql_server_ds- jeList[namedtuple[str, str]]s názvami spojení z Airflow Connections a databázami, z ktorých si vezmeme náš tanier;dag- oznámenie nášho dagu, ktoré musí byť nevyhnutne inglobals(), inak to Airflow nenájde. Doug tiež musí povedať:- aké je jeho meno
orders- tento názov sa následne zobrazí vo webovom rozhraní, - že bude pracovať od polnoci ôsmeho júla,
- a mal by bežať približne každých 6 hodín (pre tvrdých chlapov tu namiesto
timedelta()prípustnécron-linka0 0 0/6 ? * * *, pre menej cool - výraz ako@daily);
- aké je jeho meno
workflow()bude robiť hlavnú prácu, ale nie teraz. Zatiaľ len vyklopíme náš kontext do denníka.- A teraz jednoduché kúzlo vytvárania úloh:
- prechádzame cez naše zdroje;
- inicializovať
PythonOperator, ktorá popraví našu figurínuworkflow(). Nezabudnite špecifikovať jedinečný (v rámci dag) názov úlohy a priviazať samotný dag. Vlajkaprovide_contextna oplátku naleje do funkcie ďalšie argumenty, ktoré opatrne zozbierame pomocou**context.
Zatiaľ je to všetko. Čo sme dostali:
- nový dag vo webovom rozhraní,
- jeden a pol stovky úloh, ktoré sa budú vykonávať paralelne (ak to dovoľuje Airflow, Celery nastavenia a kapacita servera).
No, skoro som to pochopil.

Kto nainštaluje závislosti?
Aby som to celé zjednodušil, posral som sa docker-compose.yml spracovanie requirements.txt na všetkých uzloch.
Teraz je to preč:

Sivé štvorce sú inštancie úloh spracované plánovačom.
Chvíľu počkáme, úlohy vybavia pracovníci:

Zelení, samozrejme, svoje dielo úspešne dokončili. Červeným sa veľmi nedarí.
Mimochodom, na našom prod
./dags, neexistuje synchronizácia medzi strojmi - všetky dags ležia vgitna našom Gitlabe a Gitlab CI distribuuje aktualizácie do počítačov pri zlúčenímaster.
Trochu o Kvete
Kým nám robotníci mlátia cumlíky, spomeňme si na ďalšiu pomôcku, ktorá nám môže niečo ukázať – Kvet.
Úplne prvá stránka so súhrnnými informáciami o pracovných uzloch:

Najintenzívnejšia stránka s úlohami, ktoré šli do práce:

Najnudnejšia stránka so statusom nášho brokera:

Najjasnejšia stránka je s grafmi stavu úloh a časom ich vykonania:

Podťažené zaťažíme
Takže všetky úlohy sa splnili, môžete odniesť zranených.

A bolo veľa zranených - z jedného alebo druhého dôvodu. V prípade správneho použitia Airflow práve tieto štvorčeky naznačujú, že dáta rozhodne nedorazili.
Musíte sledovať denník a reštartovať spadnuté inštancie úloh.
Kliknutím na ktorýkoľvek štvorec sa nám zobrazia akcie, ktoré máme k dispozícii:

Môžete vziať a urobiť Clear padlých. To znamená, že zabudneme, že tam niečo zlyhalo a rovnaká úloha inštancie prejde do plánovača.

Je jasné, že robiť to s myšou so všetkými červenými štvorčekmi nie je veľmi humánne – toto od Airflow neočakávame. Prirodzene, máme zbrane hromadného ničenia: Browse/Task Instances

Vyberme všetko naraz a resetujeme na nulu, klikneme na správnu položku:

Naše taxíky po vyčistení vyzerajú takto (už čakajú, kým ich plánovač naplánuje):

Spojenia, háčiky a iné premenné
Je čas pozrieť sa na ďalší DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Urobil niekedy každý aktualizáciu správy? Toto je opäť ona: je tu zoznam zdrojov, odkiaľ možno získať údaje; existuje zoznam, kam umiestniť; nezabudnite zatrúbiť, keď sa všetko stalo alebo zlomilo (no, toto nie je o nás, nie).
Poďme si znova prejsť súbor a pozrieť sa na nové nejasné veci:
from commons.operators import TelegramBotSendMessage- nič nám nebráni vo výrobe vlastných operátorov, čo sme využili tým, že sme urobili malý obal na posielanie správ na Unblocked. (Viac o tomto operátorovi si povieme nižšie);default_args={}- dag môže distribuovať rovnaké argumenty všetkým svojim operátorom;to='{{ var.value.all_the_kings_men }}'- lúkatonebudeme mať napevno, ale dynamicky generované pomocou Jinja a premennej so zoznamom emailov, ktoré som opatrne vložilAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— podmienka pre spustenie operátora. V našom prípade list poletí šéfom iba vtedy, ak budú fungovať všetky závislosti úspešne;tg_bot_conn_id='tg_main'- argumentyconn_idakceptovať ID pripojenia, ktoré vytvorímeAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- správy v telegrame odletia iba vtedy, ak sú tam padnuté úlohy;task_concurrency=1- zakazujeme súčasné spustenie niekoľkých inštancií úloh jednej úlohy. V opačnom prípade získame súčasné spustenie niekoľkýchVerticaOperator(pri pohľade na jeden stôl);report_update >> [email, tg]- všetkyVerticaOperatorkonvergovať v odosielaní listov a správ, ako je toto:

Ale keďže operátori oznamujúcich majú rôzne podmienky spustenia, bude fungovať iba jeden. V stromovom zobrazení všetko vyzerá trochu menej vizuálne:

Poviem pár slov o makrá a ich priatelia - premenné.
Makrá sú zástupné symboly Jinja, ktoré môžu nahradiť rôzne užitočné informácie do argumentov operátorov. Napríklad takto:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} sa rozšíri na obsah kontextovej premennej execution_date vo formáte YYYY-MM-DD: 2020-07-14. Najlepšie na tom je, že kontextové premenné sú pribité ku konkrétnej inštancii úlohy (štvorec v stromovom zobrazení) a po reštartovaní sa zástupné symboly rozšíria na rovnaké hodnoty.
Priradené hodnoty je možné zobraziť pomocou tlačidla Rendered na každej inštancii úlohy. Takto vyzerá úloha s odoslaním listu:

A tak pri úlohe s odoslaním správy:

Kompletný zoznam vstavaných makier pre najnovšiu dostupnú verziu je k dispozícii tu:
Navyše pomocou pluginov môžeme deklarovať vlastné makrá, ale to je už iný príbeh.
Okrem preddefinovaných vecí môžeme nahradiť hodnoty našich premenných (už som to použil v kóde vyššie). Poďme tvoriť Admin/Variables par veci:

Všetko, čo môžete použiť:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Hodnota môže byť skalárna alebo to môže byť aj JSON. V prípade JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}stačí použiť cestu k požadovanému kľúču: {{ var.json.bot_config.bot.token }}.
Doslova poviem jedno slovo a ukážem jednu snímku obrazovky prípojka. Všetko je tu elementárne: na stránke Admin/Connections vytvoríme spojenie, pridáme tam naše prihlasovacie mená / heslá a konkrétnejšie parametre. Páči sa ti to:

Heslá môžu byť šifrované (dôkladnejšie ako predvolené) alebo môžete vynechať typ pripojenia (ako som to urobil v prípade tg_main) - faktom je, že zoznam typov je v modeloch Airflow pevne zapojený a nedá sa rozšíriť bez toho, aby som sa dostal do zdrojových kódov (ak som zrazu niečo nevygooglil, opravte ma), ale nič nám nezabráni získať kredity len tak názov.
Môžete tiež vytvoriť niekoľko spojení s rovnakým názvom: v tomto prípade metóda BaseHook.get_connection(), ktorý nám dostane spojenia podľa názvu, dá náhodný od viacerých menovcov (logickejšie by bolo spraviť Round Robin, ale nechajme to na svedomí vývojárov Airflow).
Premenné a pripojenia sú určite skvelé nástroje, ale je dôležité nestratiť rovnováhu: ktoré časti vašich tokov ukladáte v samotnom kóde a ktoré časti dáte Airflow na uloženie. Na jednej strane môže byť rýchla zmena hodnoty, napríklad poštovej schránky, pohodlná prostredníctvom používateľského rozhrania. Na druhej strane je to stále návrat ku klikaniu myšou, ktorého sme sa (ja) chceli zbaviť.
Práca s prepojeniami je jednou z úloh háčiky. Vo všeobecnosti sú háčiky Airflow body na pripojenie k službám a knižniciam tretích strán. napr. JiraHook otvorí nám klienta na interakciu s Jirou (úlohy môžete presúvať tam a späť) a pomocou SambaHook môžete odoslať lokálny súbor smb-bod.
Analýza vlastného operátora
A priblížili sme sa k tomu, ako sa to vyrába TelegramBotSendMessage
kód commons/operators.py so skutočným operátorom:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Tu, rovnako ako všetko ostatné v Airflow, je všetko veľmi jednoduché:
- Zdedené od
BaseOperator, ktorá implementuje pomerne veľa vecí špecifických pre prúdenie vzduchu (pozrite sa na svoj voľný čas) - Deklarované polia
template_fields, v ktorom bude Jinja hľadať makrá na spracovanie. - Usporiadal správne argumenty pre
__init__(), v prípade potreby nastavte predvolené hodnoty. - Nezabudli sme ani na inicializáciu predka.
- Otvorili príslušný hák
TelegramBotHookdostala od nej klientský predmet. - Prepísaná (predefinovaná) metóda
BaseOperator.execute(), ktorým Airfow škubne, keď príde čas na spustenie operátora - v ňom implementujeme hlavnú akciu, zabudnutie na prihlásenie. (Mimochodom, prihlasujeme sa priamostdoutиstderr- Prúd vzduchu všetko zachytí, krásne zabalí, rozloží tam, kde je to potrebné.)
Pozrime sa, čo máme commons/hooks.py. Prvá časť súboru so samotným háčikom:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientAni neviem, čo tu mám vysvetliť, len si všimnem dôležité body:
- Zdedíme, premýšľajte o argumentoch - vo väčšine prípadov to bude jeden:
conn_id; - Prevažujúce štandardné metódy: Obmedzil som sa
get_conn(), v ktorom dostanem parametre pripojenia podľa názvu a dostanem len sekciuextra(toto je pole JSON), do ktorého som (podľa vlastných pokynov!) vložil token telegramového bota:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Vytváram inštanciu nášho
TelegramBot, čím jej poskytnete konkrétny token.
To je všetko. Môžete získať klienta z háku pomocou TelegramBotHook().clent alebo TelegramBotHook().get_conn().
A druhá časť súboru, v ktorej robím microwrapper pre Telegram REST API, aby som neťahal to isté pre jednu metódu sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Správny spôsob je sčítať všetko:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- v doplnku vložte do verejného úložiska a dajte ho Open Source.
Kým sme to všetko študovali, naše aktualizácie prehľadov úspešne zlyhali a poslali mi chybové hlásenie do kanála. idem to skontrolovat, ci to nie je zle...

Niečo sa zlomilo v našom doge! Nie je to to, čo sme očakávali? presne tak!
Ideš naliať?
Máš pocit, že mi niečo uniklo? Zdá sa, že sľúbil, že prenesie údaje zo servera SQL Server do Vertica, a potom to vzal a odstúpil od témy, ten darebák!
Toto zverstvo bolo zámerné, jednoducho som vám musel rozlúštiť nejakú terminológiu. Teraz môžete ísť ďalej.
Náš plán bol takýto:
- Do dag
- Generovať úlohy
- Pozrite sa, aké je všetko krásne
- Priraďte čísla relácií výplniam
- Získajte údaje zo servera SQL Server
- Vložte údaje do Vertica
- Zbierajte štatistiky
Aby sme to všetko rozbehli, urobil som malý doplnok k nášmu docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyTam vychováme:
- Vertica ako hostiteľ
dwhs najviac predvolenými nastaveniami, - tri inštancie SQL Server,
- do databáz v poslednom uvedenom naplníme nejaké údaje (v žiadnom prípade sa nedívajte do
mssql_init.py!)
Všetko dobré spúšťame pomocou trochu komplikovanejšieho príkazu ako naposledy:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3To, čo vygeneroval náš zázračný randomizér, môžete použiť Data Profiling/Ad Hoc Query:

Hlavná vec je neukázať to analytikom
rozviesť ETL relácie Nebudem, všetko je tam triviálne: vytvoríme základňu, je v nej znak, všetko zabalíme do kontextového manažéra a teraz urobíme toto:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passČas nadišiel zbierať naše údaje z našich jeden a pol sto stolov. Urobme to pomocou veľmi nenáročných riadkov:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Pomocou háku dostaneme z Airflow
pymssql-spojiť - Nahraďte do požiadavky obmedzenie v podobe dátumu - do funkcie ho hodí engine šablón.
- Kŕmenie našej žiadosti
pandaskto nás dostaneDataFrame- bude nám to užitočné v budúcnosti.
Používam substitúciu
{dt}namiesto parametra požiadavky%snie preto, že som zlý Pinocchio, ale pretopandasnezvládnepymssqla pošmykne poslednýparams: Listhoci veľmi chcetuple.
Upozorňujeme tiež, že vývojárpymssqlrozhodol, že ho už nebude podporovať, a je čas sa odsťahovaťpyodbc.
Pozrime sa, čím Airflow naplnil argumenty našich funkcií:

Ak nie sú žiadne údaje, nemá zmysel pokračovať. Ale je tiež zvláštne považovať výplň za vydarenú. Ale to nie je chyba. A-ah-ah, čo robiť?! A tu je čo:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException povie Airflow, že neexistujú žiadne chyby, ale úlohu preskočíme. Rozhranie nebude mať zelený alebo červený štvorec, ale ružový.
Zahoďme svoje údaje viac stĺpcov:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])A to
- Databáza, z ktorej sme prevzali objednávky,
- ID našej záplavovej relácie (bude iné pre každú úlohu),
- Hash zo zdroja a ID objednávky - aby sme vo finálnej databáze (kde je všetko naliate do jednej tabuľky) mali jedinečné ID objednávky.
Zostáva predposledný krok: nalejte všetko do Verticy. A napodiv, jeden z najpozoruhodnejších a najefektívnejších spôsobov, ako to dosiahnuť, je prostredníctvom CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Vyrábame špeciálny prijímač
StringIO. pandasláskavo položí nášDataFramevo formeCSV- čiary.- Otvorme háčikom spojenie s našou obľúbenou Verticou.
- A teraz s pomocou
copy()pošlite naše údaje priamo Vertike!
Zoberieme od vodiča, koľko riadkov bolo vyplnených, a povieme manažérovi relácie, že je všetko v poriadku:
session.loaded_rows = cursor.rowcount
session.successful = TrueTo je všetko.
Pri predaji vytvárame cieľovú platňu ručne. Tu som si dovolil malý stroj:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)používam
VerticaOperator()Vytvorím databázovú schému a tabuľku (ak ešte neexistujú, samozrejme). Hlavná vec je správne usporiadať závislosti:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadSčítanie
- Dobre, - povedala myška, - nie, teraz
Si presvedčený, že som najstrašnejšie zviera v lese?
Julia Donaldson, The Gruffalo
Myslím, že keby sme s kolegami súťažili: kto rýchlo vytvorí a spustí proces ETL od nuly: oni so svojím SSIS a myšou a ja s Airflow ... A potom by sme porovnali aj jednoduchosť údržby ... Wow, myslím, že budete súhlasiť, že ich porazím na všetkých frontoch!
Ak trochu vážnejšie, tak Apache Airflow - popisom procesov vo forme programového kódu - urobil moju prácu viac pohodlnejšie a príjemnejšie.
Jeho neobmedzená rozšíriteľnosť, čo sa týka zásuvných modulov a predispozície k škálovateľnosti, vám dáva možnosť využiť Airflow takmer v akejkoľvek oblasti: dokonca aj v celom cykle zberu, prípravy a spracovania dát, dokonca aj pri štarte rakiet (na Mars, resp. kurz).
Časť záverečná, referencie a informácie
Hrable, ktoré sme pre vás nazbierali
start_date. Áno, toto je už lokálny meme. Via Dougov hlavný argumentstart_datevšetci prejdú. Stručne povedané, ak uvediete vstart_dateaktuálny dátum aschedule_interval- jeden deň, potom DAG začne zajtra nie skôr.start_date = datetime(2020, 7, 7, 0, 1, 2)A už žiadne problémy.
Je s tým spojená ďalšia runtime chyba:
Task is missing the start_date parameter, čo najčastejšie naznačuje, že ste sa zabudli naviazať na operátora dag.- Všetko na jednom stroji. Áno, a základne (samotné prúdenie vzduchu a náš náter) a webový server, plánovač a pracovníci. A dokonca to fungovalo. Postupom času však počet úloh pre služby rástol a keď PostgreSQL začal reagovať na index za 20 s namiesto 5 ms, zobrali sme to a odniesli.
- LocalExecutor. Áno, stále na ňom sedíme a už sme prišli na okraj priepasti. LocalExecutor nám zatiaľ stačil, no teraz nastal čas na rozšírenie aspoň o jedného pracovníka a na prechod na CeleryExecutor budeme musieť tvrdo pracovať. A vzhľadom na to, že s ním môžete pracovať na jednom stroji, nič vám nebráni používať zeler ani na serveri, ktorý „samozrejme, nikdy nepôjde do výroby, úprimne!“
- Nepoužitie vstavané nástroje:
- pripojenie na ukladanie servisných poverení,
- Miss SLA reagovať na úlohy, ktoré nefungovali včas,
- xcom na výmenu metadát (povedal som metadát!) medzi dag úlohami.
- Zneužívanie pošty. No, čo môžem povedať? Na všetky opakovania padlých úloh boli nastavené upozornenia. Teraz má môj pracovný Gmail viac ako 90 100 e-mailov od Airflow a náhubok webovej pošty odmieta vyzdvihnúť a odstrániť viac ako XNUMX naraz.
Ďalšie úskalia:
Viac nástrojov na automatizáciu
Aby sme mohli ešte viac pracovať hlavou a nie rukami, Airflow si pre nás pripravil toto:
- - stále má status Experimental, čo mu však nebráni v práci. S ním môžete nielen získať informácie o dagoch a úlohách, ale aj zastaviť/spustiť dag, vytvoriť DAG Run alebo pool.
- - cez príkazový riadok je dostupných veľa nástrojov, ktoré nie sú len nepohodlné na používanie cez WebUI, ale vo všeobecnosti chýbajú. Napríklad:
backfillpotrebné na reštartovanie inštancií úloh.
Napríklad analytici prišli a povedali: „A ty, súdruh, máš nezmysly v údajoch od 1. do 13. januára! Opravte to, opravte to, opravte to, opravte to!" A ty si taká varná doska:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Základná služba:
initdb,resetdb,upgradedb,checkdb. run, ktorá vám umožňuje spustiť jednu úlohu inštancie a dokonca skóre na všetkých závislostiach. Navyše ho môžete spustiť cezLocalExecutor, aj keď máte zeler klaster.- Robí skoro to isté
test, len take v zakladoch nepise nic. connectionsumožňuje hromadné vytváranie spojov zo škrupiny.
- - pomerne tvrdý spôsob interakcie, ktorý je určený pre pluginy, a nie s malými rukami. Ale kto nám zabráni ísť
/home/airflow/dags, behipythona začať makať? Môžete napríklad exportovať všetky pripojenia pomocou nasledujúceho kódu:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Pripája sa k metadatabáze Airflow. Neodporúčam do nej písať, ale získanie stavov úloh pre rôzne špecifické metriky môže byť oveľa rýchlejšie a jednoduchšie ako cez ktorékoľvek z API.
Povedzme, že nie všetky naše úlohy sú idempotentné, ale niekedy môžu padnúť, a to je normálne. Ale pár upchávok je už podozrivých a bolo by potrebné skontrolovať.
Pozor na SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
referencie
A samozrejme, prvých desať odkazov z vydania Google je obsah priečinka Airflow z mojich záložiek.
- - Samozrejme, musíme začať kanceláriou. dokumentáciu, ale kto číta návod?
- - No, prečítajte si aspoň odporúčania od tvorcov.
- - úplný začiatok: používateľské rozhranie v obrázkoch
- - základné pojmy sú dobre popísané, ak ste (náhle!) niečomu odo mňa nerozumeli.
- - krátky návod na nastavenie klastra Airflow.
- - takmer rovnaký zaujímavý článok, možno až na viac formalizmu a menej príkladov.
- — o spolupráci so zelerom.
- - o idempotencii úloh, načítavaní podľa ID namiesto dátumu, transformácii, štruktúre súborov a iných zaujímavostiach.
- - závislosti úloh a Trigger Rule, ktoré som spomenul len okrajovo.
- - ako prekonať niektoré "funguje podľa plánu" v plánovači, načítať stratené dáta a uprednostniť úlohy.
- — užitočné SQL dotazy na metadáta Airflow.
- - je tu užitočná sekcia o vytvorení vlastného snímača.
- — zaujímavá krátka poznámka o budovaní infraštruktúry na AWS pre Data Science.
- - časté chyby (keď si stále niekto neprečíta návod).
- - Usmejte sa, ako ľudia berlú ukladajú heslá, hoci môžete použiť len Connections.
- - implicitné preposielanie DAG, kontextové hádzanie funkcií, opäť o závislostiach a tiež o preskakovaní spúšťania úloh.
- - o použití
default argumentsиparamsv šablónach, ako aj Premenné a Spojenia. - - príbeh o tom, ako sa plánovač pripravuje na Airflow 2.0.
- - trochu zastaraný článok o nasadení nášho klastra v
docker-compose. - - dynamické úlohy využívajúce šablóny a kontextové preposielanie.
- — štandardné a vlastné oznámenia poštou a Slack.
- - Úlohy vetvenia, makrá a XCom.
A odkazy použité v článku:
- - zástupné symboly dostupné na použitie v šablónach.
- — Časté chyby pri tvorbe dagov.
- -
docker-composena experimentovanie, ladenie a ďalšie. - — Python wrapper pre Telegram REST API.
Zdroj: hab.com




