Apache Airflow: Usnadnění ETL

Ahoj, jmenuji se Dmitrij Logvinenko – datový inženýr oddělení analýzy skupiny společností Vezet.

Řeknu vám o úžasném nástroji pro vývoj ETL procesů - Apache Airflow. Airflow je ale tak všestranný a mnohostranný, že byste se na něj měli podívat blíže, i když se nepodílíte na datových tocích, ale potřebujete pravidelně spouštět jakékoli procesy a sledovat jejich provádění.

A ano, nejen řeknu, ale také ukážu: program má spoustu kódu, snímků obrazovky a doporučení.

Apache Airflow: Usnadnění ETL
To, co obvykle uvidíte, když zadáte do googlu slovo Airflow / Wikimedia Commons

obsah

úvod

Apache Airflow je jako Django:

  • napsaný v pythonu
  • je tam skvělý admin panel,
  • neomezeně rozšiřitelné

- jen lepší a byl vyroben pro úplně jiné účely, a to (jak je psáno před kat):

  • spouštění a sledování úloh na neomezeném počtu počítačů (jak vám mnoho Celery / Kubernetes a vaše svědomí dovolí)
  • s dynamickým generováním pracovních postupů od velmi snadno zapisovatelného a srozumitelného kódu Pythonu
  • a možnost vzájemného propojení libovolných databází a API pomocí hotových komponent i podomácku vyrobených pluginů (což je extrémně jednoduché).

Apache Airflow používáme takto:

  • shromažďujeme data z různých zdrojů (mnoho SQL Server a PostgreSQL instancí, různá API s aplikačními metrikami, dokonce i 1C) v DWH a ODS (máme Vertica a Clickhouse).
  • jak pokročilé cron, která spouští procesy konsolidace dat na ODS a zároveň sleduje jejich údržbu.

Donedávna naše potřeby pokrýval jeden malý server s 32 jádry a 50 GB RAM. V Airflow to funguje:

  • více 200 dagů (ve skutečnosti pracovní postupy, do kterých jsme nacpali úkoly),
  • v každém v průměru 70 úkolů,
  • tato dobrota začíná (také průměrně) jednou za hodinu.

A o tom, jak jsme se rozšířili, napíšu níže, ale nyní definujme über-problém, který budeme řešit:

Existují tři původní SQL Servery, každý s 50 databázemi - instancemi jednoho projektu, respektive, mají stejnou strukturu (téměř všude, mua-ha-ha), což znamená, že každý má tabulku Objednávky (naštěstí tabulka s tím jméno lze vložit do jakékoli firmy). Vezmeme data přidáním polí služeb (zdrojový server, zdrojová databáze, ID úlohy ETL) a naivně je hodíme do, řekněme, Vertica.

Pojďme!

Hlavní část, praktická (a trochu teoretická)

Proč my (a vy)

Když byly stromy velké a já byl jednoduchý SQL-schik v jednom ruském maloobchodě jsme podvedli ETL procesy alias datové toky pomocí dvou nástrojů, které máme k dispozici:

  • Informatica Power Center - extrémně se šířící systém, extrémně produktivní, s vlastním hardwarem, vlastním verzováním. Použil jsem nedej bože 1 % jeho schopností. Proč? No, za prvé, toto rozhraní, někde z roku 380, na nás psychicky tlačilo. Za druhé, toto zařízení je navrženo pro extrémně efektní procesy, zběsilé opětovné použití komponent a další velmi důležité podnikové triky. O tom, co to stojí, jako křídlo Airbusu AXNUMX / rok, nebudeme nic říkat.

    Pozor, snímek obrazovky může lidem do 30 let trochu ublížit

    Apache Airflow: Usnadnění ETL

  • SQL Server Integration Server - tohoto soudruha jsme použili v našich vnitroprojektových tocích. No, ve skutečnosti: SQL Server již používáme a bylo by nějak nerozumné nepoužívat jeho ETL nástroje. Všechno v něm je dobré: jak rozhraní je krásné, tak zprávy o pokroku ... Ale to není důvod, proč milujeme softwarové produkty, ach, ne pro tohle. Verze dtsx (což je XML s uzly zamíchanými při uložení) můžeme, ale jaký to má smysl? Co takhle vytvořit balíček úloh, který přetáhne stovky tabulek z jednoho serveru na druhý? Ano, jaká stovka, ukazováček vám odpadne z dvaceti kousků, klikáním na tlačítko myši. Ale rozhodně to vypadá módněji:

    Apache Airflow: Usnadnění ETL

Určitě jsme hledali východiska. Případ dokonce téměř přišel k vlastnímu generátoru balíčků SSIS ...

...a pak si mě našla nová práce. A Apache Airflow mě na něm předběhl.

Když jsem zjistil, že popisy ETL procesů jsou jednoduchý Python kód, netancoval jsem radostí. Takto byly datové toky verzovány a rozlišovány a nalévání tabulek s jedinou strukturou ze stovek databází do jednoho cíle se stalo záležitostí kódu Python na jedné a půl nebo dvou 13” obrazovkách.

Sestavení clusteru

Nezařizujeme úplně mateřskou školu a nemluvme zde o zcela samozřejmých věcech, jako je instalace Airflow, vámi zvolené databáze, Celery a dalších případů popsaných v docích.

Abychom mohli okamžitě začít experimentovat, načrtl jsem docker-compose.yml ve kterém:

  • Pojďme vlastně zvýšit Airflow: Plánovač, Webový server. Květina se tam bude také točit, aby sledovala úkoly s celerem (protože už byl zatlačen do apache/airflow:1.10.10-python3.7, ale to nám nevadí)
  • PostgreSQL, do kterého bude Airflow zapisovat své servisní informace (data plánovače, statistiky provádění atd.) a Celery bude označovat dokončené úkoly;
  • Redestilát, který bude fungovat jako zprostředkovatel úkolů pro Celery;
  • Celer dělník, která se bude zabývat přímým plněním úkolů.
  • Do složky ./dags přidáme naše soubory s popisem dags. Budou vyzvednuty za chodu, takže není třeba po každém kýchnutí žonglovat s celou hromádkou.

Na některých místech není kód v příkladech zcela ukázán (aby se nezahltil text), někde je však v průběhu upravován. Kompletní příklady pracovního kódu lze nalézt v úložišti https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Poznámky:

  • Při sestavování kompozice jsem z velké části vycházel ze známého obrazu puckel/docker-proud vzduchu - určitě se na to podívejte. Možná už k životu nic jiného nepotřebuješ.
  • Všechna nastavení proudění vzduchu jsou dostupná nejen prostřednictvím airflow.cfg, ale také prostřednictvím proměnných prostředí (díky vývojářům), čehož jsem škodolibě využil.
  • Přirozeně není připraven na výrobu: Záměrně jsem na kontejnery nedal tepy, neobtěžoval jsem se bezpečností. Ale udělal jsem minimum vhodné pro naše experimentátory.
  • Všimněte si, že:
    • Složka dag musí být přístupná plánovači i pracovníkům.
    • Totéž platí pro všechny knihovny třetích stran – všechny musí být nainstalovány na strojích s plánovačem a pracovníky.

No, teď je to jednoduché:

$ docker-compose up --scale worker=3

Až vše stoupne, můžete se podívat na webová rozhraní:

Základní pojmy

Pokud jste ze všech těchto „dagů“ ničemu nerozuměli, zde je krátký slovník:

  • Plánovač - nejdůležitější strýc v Airflow, který kontroluje, že roboti tvrdě pracují, a ne člověk: sleduje plán, aktualizuje dagy, spouští úkoly.

    Obecně ve starších verzích měl problémy s pamětí (ne, ne amnézie, ale úniky) a parametr legacy dokonce zůstal v konfiguracích run_duration — jeho interval restartu. Ale teď je vše v pořádku.

  • DAG (aka "dag") - "směrovaný acyklický graf", ale taková definice řekne málokomu, ale ve skutečnosti je to kontejner pro úkoly, které se vzájemně ovlivňují (viz níže) nebo obdoba Package v SSIS a Workflow v Informatica .

    Kromě dagů mohou ještě existovat poddagy, ale k těm se s největší pravděpodobností nedostaneme.

  • DAG Run - inicializovaný dag, kterému je přiřazen vlastní execution_date. Dagrany stejného dagu mohou fungovat paralelně (pokud jste své úkoly udělali idempotentní, samozřejmě).
  • Operátor jsou části kódu zodpovědné za provedení konkrétní akce. Existují tři typy operátorů:
    • akcejako náš oblíbený PythonOperator, který může spustit jakýkoli (platný) kód Pythonu;
    • transfer, které přenášejí data z místa na místo, řekněme, MsSqlToHiveTransfer;
    • senzor na druhou stranu vám to umožní reagovat nebo zpomalit další provádění dagu, dokud nenastane událost. HttpSensor může vytáhnout zadaný koncový bod, a když požadovaná odpověď čeká, spustit přenos GoogleCloudStorageToS3Operator. Zvídavá mysl se zeptá: „Proč? Vždyť opakování můžete dělat přímo v operátoru!“ A pak, aby nedošlo k ucpání fondu úkolů pozastavenými operátory. Senzor se spustí, zkontroluje a před dalším pokusem zemře.
  • Úkol - deklarované operátory bez ohledu na typ a připojené k dag jsou povýšeny do hodnosti úkolu.
  • instance úkolu - když se generální plánovač rozhodl, že je čas poslat úkoly do bitvy na performerech-pracovnících (přímo na místě, pokud použijeme LocalExecutor nebo do vzdáleného uzlu v případě CeleryExecutor), přiřadí jim kontext (tj. sadu proměnných – parametry provádění), rozšíří šablony příkazů nebo dotazů a sdruží je.

Vytváříme úkoly

Nejprve si nastíníme obecné schéma našeho douga a pak se budeme stále více ponořit do detailů, protože aplikujeme některá netriviální řešení.

Takže ve své nejjednodušší podobě bude takový dag vypadat takto:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Pojďme pochopit:

  • Nejprve importujeme potřebné knihovny a něco jiného;
  • sql_server_ds - Je List[namedtuple[str, str]] se jmény spojení z Airflow Connections a databázemi, ze kterých si vezmeme náš talíř;
  • dag - oznámení našeho dagu, které musí být nutně in globals(), jinak to Airflow nenajde. Doug také musí říct:
    • jak se jmenuje orders - toto jméno se následně objeví ve webovém rozhraní,
    • že bude pracovat od půlnoci osmého července,
    • a mělo by to běžet přibližně každých 6 hodin (pro tvrdé chlapy zde místo timedelta() přípustný cron-čára 0 0 0/6 ? * * *, pro méně cool - výraz jako @daily);
  • workflow() bude dělat hlavní práci, ale ne teď. Prozatím jen vyklopíme náš kontext do protokolu.
  • A nyní jednoduché kouzlo vytváření úkolů:
    • procházíme našimi zdroji;
    • inicializovat PythonOperator, který popraví naši figurínu workflow(). Nezapomeňte uvést jedinečný (v rámci dag) název úkolu a svázat samotnou dag. Vlajka provide_context na oplátku nalije do funkce další argumenty, které pečlivě shromáždíme pomocí **context.

To je prozatím vše. Co jsme dostali:

  • nový dag ve webovém rozhraní,
  • jeden a půl sta úloh, které budou prováděny paralelně (pokud to Airflow, nastavení Celery a kapacita serveru dovolí).

No, skoro jsem to pochopil.

Apache Airflow: Usnadnění ETL
Kdo bude instalovat závislosti?

Abych to celé zjednodušil, podělal jsem to docker-compose.yml zpracovává se requirements.txt na všech uzlech.

Teď je to pryč:

Apache Airflow: Usnadnění ETL

Šedé čtverečky jsou instance úloh zpracované plánovačem.

Chvíli čekáme, úkoly chystají pracovníci:

Apache Airflow: Usnadnění ETL

Zelení samozřejmě své dílo úspěšně dokončili. Červeným se moc nedaří.

Mimochodem, na našem prod ./dags, neexistuje žádná synchronizace mezi stroji - všechny dags leží v git na našem Gitlabu a Gitlab CI distribuuje aktualizace do počítačů při sloučení master.

Něco málo o Květině

Zatímco nám dělnice mlátí dudlíky, vzpomeňme na další nástroj, který nám může něco ukázat – Květ.

Úplně první stránka se souhrnnými informacemi o pracovních uzlech:

Apache Airflow: Usnadnění ETL

Nejintenzivnější stránka s úkoly, které šly do práce:

Apache Airflow: Usnadnění ETL

Nejnudnější stránka se statusem našeho brokera:

Apache Airflow: Usnadnění ETL

Nejjasnější stránka je s grafy stavu úkolů a dobou jejich provedení:

Apache Airflow: Usnadnění ETL

Zatížíme nedostatečně vytížené

Takže všechny úkoly vyšly, můžete odnést zraněné.

Apache Airflow: Usnadnění ETL

A bylo mnoho raněných – z toho či onoho důvodu. V případě správného použití Airflow právě tyto čtverečky naznačují, že data rozhodně nedorazila.

Musíte sledovat protokol a restartovat spadlé instance úloh.

Kliknutím na libovolný čtverec zobrazíme akce, které máme k dispozici:

Apache Airflow: Usnadnění ETL

Můžete vzít a vyčistit padlé. To znamená, že zapomeneme, že tam něco selhalo a stejná úloha instance půjde do plánovače.

Apache Airflow: Usnadnění ETL

Je jasné, že dělat to s myší se všemi červenými čtverečky není moc humánní – tohle od Airflow neočekáváme. Přirozeně máme zbraně hromadného ničení: Browse/Task Instances

Apache Airflow: Usnadnění ETL

Pojďme vybrat vše najednou a resetovat na nulu, kliknout na správnou položku:

Apache Airflow: Usnadnění ETL

Po úklidu vypadají naše taxíky takto (už čekají, až je plánovač naplánuje):

Apache Airflow: Usnadnění ETL

Spojení, háčky a další proměnné

Je čas podívat se na další DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Udělal někdy každý aktualizaci zprávy? Tohle je zase ona: je tam seznam zdrojů, odkud data získat; existuje seznam, kam umístit; nezapomeňte troubit, když se všechno stalo nebo prasklo (no, to není o nás, ne).

Pojďme si soubor znovu projít a podívat se na nové nejasné věci:

  • from commons.operators import TelegramBotSendMessage - nic nám nebrání ve výrobě vlastních operátorů, čehož jsme využili tím, že jsme vyrobili malý obal pro odesílání zpráv na Unblocked. (O tomto operátorovi si povíme více níže);
  • default_args={} - dag může distribuovat stejné argumenty všem svým operátorům;
  • to='{{ var.value.all_the_kings_men }}' - pole to nebudeme mít napevno, ale dynamicky generované pomocí Jinja a proměnné se seznamem e-mailů, které jsem pečlivě vložil Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — podmínka pro spuštění obsluhy. V našem případě dopis poletí k šéfům pouze v případě, že všechny závislosti fungují úspěšně;
  • tg_bot_conn_id='tg_main' - argumenty conn_id přijímat ID připojení, která vytvoříme Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - zprávy v telegramu odletí, pouze pokud jsou padlé úkoly;
  • task_concurrency=1 - zakazujeme současné spuštění několika instancí úkolu jednoho úkolu. V opačném případě dosáhneme současného spuštění několika VerticaOperator (při pohledu na jeden stůl);
  • report_update >> [email, tg] - Všechno VerticaOperator konvergovat v odesílání dopisů a zpráv, jako je toto:
    Apache Airflow: Usnadnění ETL

    Ale protože operátoři oznamovatelů mají různé podmínky spuštění, bude fungovat pouze jeden. Ve stromovém zobrazení vše vypadá trochu méně vizuálně:
    Apache Airflow: Usnadnění ETL

Řeknu pár slov o makra a jejich přátelé - proměnné.

Makra jsou zástupné symboly Jinja, které mohou nahradit různé užitečné informace do argumentů operátorů. Například takto:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} se rozšíří na obsah kontextové proměnné execution_date ve formátu YYYY-MM-DD: 2020-07-14. Nejlepší na tom je, že kontextové proměnné jsou přibity ke konkrétní instanci úkolu (čtverec ve stromovém zobrazení) a po restartu se zástupné symboly rozšíří na stejné hodnoty.

Přiřazené hodnoty lze zobrazit pomocí tlačítka Rendered u každé instance úlohy. Takto probíhá úkol s odesláním dopisu:

Apache Airflow: Usnadnění ETL

A tak u úkolu s odesláním zprávy:

Apache Airflow: Usnadnění ETL

Kompletní seznam vestavěných maker pro nejnovější dostupnou verzi je k dispozici zde: odkaz na makra

Navíc pomocí pluginů můžeme deklarovat vlastní makra, ale to je jiný příběh.

Kromě předdefinovaných věcí můžeme dosadit hodnoty našich proměnných (už jsem to použil v kódu výše). Pojďme tvořit v Admin/Variables pár věcí:

Apache Airflow: Usnadnění ETL

Vše, co můžete použít:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Hodnota může být skalární nebo to může být také JSON. V případě JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

stačí použít cestu k požadovanému klíči: {{ var.json.bot_config.bot.token }}.

Řeknu doslova jedno slovo a ukážu jeden snímek obrazovky připojení. Vše je základní zde: na stránce Admin/Connections vytvoříme připojení, přidáme tam naše přihlašovací jména / hesla a konkrétnější parametry. Takhle:

Apache Airflow: Usnadnění ETL

Hesla mohou být zašifrována (důkladněji než ve výchozím nastavení), nebo můžete vynechat typ připojení (jako jsem to udělal já tg_main) - faktem je, že seznam typů je v modelech Airflow napevno a nelze jej rozšířit bez vstupu do zdrojových kódů (pokud jsem najednou něco nevygoogloval, opravte mě), ale nic nám nebrání získat kredity jen tak název.

Můžete také vytvořit několik připojení se stejným názvem: v tomto případě metoda BaseHook.get_connection(), který nám spojení podle jména dá, dá náhodný od více jmenovců (logičtější by bylo udělat Round Robin, ale nechme to na svědomí vývojářů Airflow).

Proměnné a připojení jsou jistě skvělé nástroje, ale je důležité neztratit rovnováhu: které části vašich toků ukládáte do samotného kódu a které části dáte Airflow k uložení. Na jedné straně může být pohodlné rychle změnit hodnotu, například poštovní schránku, prostřednictvím uživatelského rozhraní. Na druhou stranu se stále jedná o návrat ke klikání myší, kterého jsme se (já) chtěli zbavit.

Práce s připojeními je jedním z úkolů háčky. Obecně jsou háčky Airflow body pro připojení ke službám a knihovnám třetích stran. Např, JiraHook otevře klienta, abychom mohli komunikovat s Jirou (úkoly můžete přesouvat tam a zpět) a pomocí SambaHook můžete odeslat místní soubor smb-směřovat.

Analýza vlastního operátora

A dostali jsme se blízko k tomu, abychom se podívali, jak se to vyrábí TelegramBotSendMessage

Kód commons/operators.py se skutečným operátorem:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Zde, stejně jako všechno ostatní v Airflow, je vše velmi jednoduché:

  • Zděděno od BaseOperator, která implementuje několik věcí specifických pro proudění vzduchu (podívejte se na svůj volný čas)
  • Deklarovaná pole template_fields, ve kterém bude Jinja hledat makra ke zpracování.
  • Uspořádal správné argumenty pro __init__(), v případě potřeby nastavte výchozí hodnoty.
  • Nezapomněli jsme ani na inicializaci předka.
  • Otevřel odpovídající háček TelegramBotHookod něj obdržel klientský objekt.
  • Přepsaná (předefinovaná) metoda BaseOperator.execute(), kterým Airfow cukne, až přijde čas spuštění operátora - v něm implementujeme hlavní akci, zapomenutí se přihlásit. (Mimochodem, hned se přihlásíme stdout и stderr - Proud vzduchu vše zachytí, krásně zabalí, rozloží tam, kde je to nutné.)

Podívejme se, co máme commons/hooks.py. První část souboru se samotným háčkem:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ani nevím, co bych zde měl vysvětlovat, jen upozorním na důležité body:

  • Dědíme, přemýšlejte o argumentech - ve většině případů to bude jeden: conn_id;
  • Nadřazené standardní metody: Omezil jsem se get_conn(), ve kterém získám parametry připojení podle názvu a získám pouze sekci extra (toto je pole JSON), do kterého jsem (podle vlastních pokynů!) vložil token robota Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Vytvořím instanci našeho TelegramBot, čímž mu poskytnete konkrétní token.

To je vše. Můžete získat klienta z háku pomocí TelegramBotHook().clent nebo TelegramBotHook().get_conn().

A druhá část souboru, ve které dělám microwrapper pro Telegram REST API, abych nepřetahoval to samé python-telegram-bot pro jednu metodu sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Správný způsob je sečíst vše: TelegramBotSendMessage, TelegramBotHook, TelegramBot - v pluginu vložte do veřejného úložiště a dejte jej Open Source.

Zatímco jsme to všechno studovali, naše aktualizace přehledů úspěšně selhaly a poslaly mi chybovou zprávu do kanálu. Jdu zkontrolovat, jestli to není špatně...

Apache Airflow: Usnadnění ETL
Něco se zlomilo v našem doge! Není to to, co jsme čekali? Přesně tak!

Chystáte se nalít?

Máš pocit, že mi něco uniklo? Zdá se, že slíbil, že přenese data z SQL Serveru do Vertica, a pak to vzal a odešel od tématu, ten darebák!

Toto zvěrstvo bylo záměrné, prostě jsem vám musel rozluštit nějakou terminologii. Nyní můžete jít dále.

Náš plán byl tento:

  1. Do dag
  2. Generovat úkoly
  3. Podívejte se, jak je všechno krásné
  4. Přiřaďte výplním čísla relací
  5. Získejte data ze serveru SQL Server
  6. Vložte data do Vertica
  7. Sbírejte statistiky

Abychom to všechno zprovoznili, udělal jsem malý doplněk k našemu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tam vychováváme:

  • Vertica jako hostitel dwh s nejvíce výchozím nastavením,
  • tři instance SQL Server,
  • databáze v posledně jmenovaném naplníme nějakými údaji (v žádném případě se do nich nedívejte mssql_init.py!)

Všechno dobré spouštíme pomocí trochu složitějšího příkazu než minule:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Co náš zázračný randomizér vygeneroval, můžete předmět použít Data Profiling/Ad Hoc Query:

Apache Airflow: Usnadnění ETL
Hlavní věc je neukazovat to analytikům

upřesnit ETL relace Nebudu, všechno je tam triviální: vytvoříme základnu, je v ní znak, vše zabalíme do kontextového manažera a teď uděláme toto:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Nadešel čas shromažďovat naše údaje z našich jeden a půl sta stolů. Udělejme to pomocí velmi nenáročných řádků:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Pomocí háčku získáme z Airflow pymssql-připojit
  2. Dosadíme do požadavku omezení v podobě data - do funkce ho hodí engine šablony.
  3. Krmení naší žádosti pandaskdo nás dostane DataFrame - bude se nám hodit v budoucnu.

Používám substituci {dt} místo parametru požadavku %s ne proto, že jsem zlý Pinocchio, ale protože pandas nezvládá pymssql a proklouzne poslední params: Listi když opravdu chce tuple.
Všimněte si také, že vývojář pymssql rozhodl, že už ho nebude podporovat, a je čas se odstěhovat pyodbc.

Podívejme se, čím Airflow nacpal argumenty našich funkcí:

Apache Airflow: Usnadnění ETL

Pokud nejsou data, nemá smysl pokračovat. Je ale také zvláštní považovat výplň za zdařilou. Ale to není chyba. A-ah-ah, co dělat?! A tady je co:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException říká Airflow, že nejsou žádné chyby, ale úkol přeskočíme. Rozhraní nebude mít zelený nebo červený čtverec, ale růžový.

Zahodíme svá data více sloupců:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

A to

  • Databáze, ze které jsme převzali objednávky,
  • ID naší záplavové relace (bude jiné pro každý úkol),
  • Hash ze zdroje a ID objednávky – abychom ve finální databázi (kde je vše nasypáno do jedné tabulky) měli jedinečné ID objednávky.

Zbývá předposlední krok: vše nalít do Verticy. A kupodivu jeden z nejúžasnějších a nejúčinnějších způsobů, jak toho dosáhnout, je prostřednictvím CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Vyrábíme speciální přijímač StringIO.
  2. pandas laskavě položí naše DataFrame ve formě CSV-čáry.
  3. Otevřeme spojení s naší oblíbenou Verticou háčkem.
  4. A teď s pomocí copy() pošlete naše data přímo Vertice!

Vezmeme od řidiče, kolik řádků bylo zaplněno, a řekneme manažerovi relace, že je vše v pořádku:

session.loaded_rows = cursor.rowcount
session.successful = True

To je všechno.

Při prodeji vytváříme cílovou desku ručně. Zde jsem si dovolil malý stroj:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

používám VerticaOperator() Vytvořím databázové schéma a tabulku (pokud již samozřejmě neexistují). Hlavní věc je správně uspořádat závislosti:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Sčítání

- No, - řekla malá myška, - není to tak
Jste přesvědčeni, že jsem to nejstrašnější zvíře v lese?

Julia Donaldson, The Gruffalo

Myslím, že kdybychom měli s kolegy soutěž: kdo rychle vytvoří a spustí proces ETL od nuly: oni se svým SSIS a myší a já s Airflow ... A pak bychom také porovnali snadnost údržby ... Páni, myslím, že budete souhlasit, že je porazím na všech frontách!

Pokud trochu vážněji, tak Apache Airflow - popisem procesů ve formě programového kódu - udělal mou práci moc pohodlnější a příjemnější.

Jeho neomezená rozšiřitelnost, a to jak z hlediska zásuvných modulů, tak z hlediska predispozice ke škálovatelnosti, vám dává možnost využít Airflow téměř v jakékoli oblasti: dokonce i v celém cyklu sběru, přípravy a zpracování dat, dokonce i při vypouštění raket (na Mars, chod).

Část finále, odkaz a informace

Hrablo, které jsme pro vás nasbírali

  • start_date. Ano, toto je již místní meme. Přes Dougův hlavní argument start_date všechny projít. Stručně řečeno, pokud uvedete v start_date aktuální datum a schedule_interval - jeden den, pak DAG začne zítra ne dříve.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    A žádné další problémy.

    Je s tím spojena další runtime chyba: Task is missing the start_date parameter, což nejčastěji naznačuje, že jste se zapomněli svázat s operátorem dag.

  • Vše na jednom stroji. Ano, a základny (samotný proud vzduchu a náš nátěr) a webový server, plánovač a pracovníci. A dokonce to fungovalo. Postupem času ale počet úloh pro služby rostl, a když PostgreSQL začal reagovat na index za 20 s místo 5 ms, vzali jsme to a odnesli.
  • LocalExecutor. Ano, stále na něm sedíme a už jsme došli na okraj propasti. LocalExecutor nám zatím stačil, ale nyní nastal čas rozšířit se alespoň o jednoho pracovníka a přechod na CeleryExecutor budeme muset hodně zapracovat. A vzhledem k tomu, že s ním můžete pracovat na jednom stroji, nic vám nebrání používat Celery i na serveru, který „samozřejmě nikdy nepůjde do výroby, upřímně!“
  • Nevyužití vestavěné nástroje:
    • Připojení pro uložení servisních přihlašovacích údajů,
    • SLA Misses reagovat na úkoly, které se nezdařily včas,
    • xcom pro výměnu metadat (řekl jsem metadata!) mezi úkoly dag.
  • Zneužívání pošty. No, co na to říct? Pro všechna opakování padlých úkolů byla nastavena upozornění. Můj pracovní Gmail má nyní více než 90 tisíc e-mailů od Airflow a náhubek webové pošty odmítá vybrat a odstranit více než 100 najednou.

Další úskalí: Apache Airflow Pitfails

Více nástrojů pro automatizaci

Abychom mohli ještě více pracovat hlavou a ne rukama, Airflow pro nás připravil toto:

  • REST API - stále má status Experimentální, což mu nebrání v práci. S ním můžete nejen získat informace o dagech a úkolech, ale také zastavit/spustit dag, vytvořit DAG Run nebo pool.
  • CLI - Prostřednictvím příkazového řádku je k dispozici mnoho nástrojů, které nejsou jen nepohodlné pro použití prostřednictvím WebUI, ale obecně chybí. Například:
    • backfill potřebné k restartování instancí úloh.
      Analytici například přišli a řekli: „A ty máš, soudruhu, nesmysly v datech od 1. do 13. ledna! Opravte to, opravte to, opravte to, opravte to!" A ty jsi takový holub:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Základní servis: initdb, resetdb, upgradedb, checkdb.
    • run, což vám umožní spustit jednu úlohu instance a dokonce skóre na všech závislostech. Navíc to můžete spustit přes LocalExecutor, i když máte celer cluster.
    • Dělá skoro to samé test, jen také v základech nic nepíše.
    • connections umožňuje hromadné vytváření spojů z pláště.
  • python api - spíše hardcore způsob interakce, který je určen pro pluginy, a ne hemžení se v tom s malými rukama. Ale kdo nám zabrání jít /home/airflow/dagsběh ipython a začít makat? Všechna připojení můžete například exportovat pomocí následujícího kódu:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Připojování k metadatabázi Airflow. Nedoporučuji do něj psát, ale získávání stavů úkolů pro různé konkrétní metriky může být mnohem rychlejší a jednodušší než použití kteréhokoli z API.

    Řekněme, že ne všechny naše úkoly jsou idempotentní, ale někdy mohou padat, a to je normální. Pár ucpání už je ale podezřelých a bylo by potřeba to prověřit.

    Pozor na SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

reference

A samozřejmě prvních deset odkazů z vydání Google je obsah složky Airflow z mých záložek.

A odkazy použité v článku:

Zdroj: www.habr.com