Apache Airflow: Uľahčenie ETL

Dobrý deň, volám sa Dmitrij Logvinenko – dátový inžinier oddelenia analýzy skupiny spoločností Vezet.

Poviem vám o úžasnom nástroji na vývoj ETL procesov - Apache Airflow. Airflow je však taký všestranný a mnohostranný, že by ste sa naň mali pozrieť bližšie, aj keď nie ste zapojený do dátových tokov, ale potrebujete pravidelne spúšťať akékoľvek procesy a monitorovať ich vykonávanie.

A áno, nielen poviem, ale aj ukážem: program má veľa kódu, snímok obrazovky a odporúčaní.

Apache Airflow: Uľahčenie ETL
To, čo zvyčajne vidíte, keď si vygooglite slovo Airflow / Wikimedia Commons

obsah

Úvod

Apache Airflow je ako Django:

  • napísané v pythone
  • je tu skvelý admin panel,
  • rozširujúce sa donekonečna

- len lepšie a bolo vyrobené na úplne iné účely, a to (ako je napísané pred kat):

  • spúšťanie a monitorovanie úloh na neobmedzenom počte počítačov (koľko Celery / Kubernetes a vaše svedomie vám dovolí)
  • s dynamickým generovaním pracovného toku od veľmi jednoduchého písania a pochopenia kódu Pythonu
  • a možnosť vzájomne prepojiť ľubovoľné databázy a API pomocou hotových komponentov aj doma vyrobených pluginov (čo je mimoriadne jednoduché).

Apache Airflow používame takto:

  • zbierame údaje z rôznych zdrojov (veľa inštancií SQL Server a PostgreSQL, rôzne API s aplikačnými metrikami, dokonca aj 1C) v DWH a ODS (máme Vertica a Clickhouse).
  • ako pokročilý cron, ktorá spúšťa procesy konsolidácie dát na ODS a zároveň monitoruje ich údržbu.

Donedávna naše potreby pokrýval jeden malý server s 32 jadrami a 50 GB RAM. V Airflow to funguje:

  • viac 200 dagov (v skutočnosti pracovné postupy, do ktorých sme napchali úlohy),
  • v každom v priemere 70 úloh,
  • táto dobrota začína (aj v priemere) raz za hodinu.

A o tom, ako sme sa rozšírili, napíšem nižšie, ale teraz definujme über-problém, ktorý budeme riešiť:

Existujú tri pôvodné SQL Servery, každý s 50 databázami - inštanciami jedného projektu, respektíve, majú rovnakú štruktúru (takmer všade, mua-ha-ha), čo znamená, že každý má tabuľku Objednávky (našťastie tabuľku s tým meno možno vložiť do akéhokoľvek podnikania). Dáta berieme pridaním servisných polí (zdrojový server, zdrojová databáza, ETL task ID) a naivne ich hodíme do, povedzme, Vertica.

Poďme!

Hlavná časť, praktická (a trochu teoretická)

Prečo je to pre nás (a pre vás)

Keď boli stromy veľké a ja som bol jednoduchý SQL-schik v jednom ruskom maloobchode sme podviedli ETL procesy alias dátové toky pomocou dvoch nástrojov, ktoré máme k dispozícii:

  • Informatica Power Center - extrémne sa šíriaci systém, mimoriadne produktívny, s vlastným hardvérom, vlastným verzovaním. Použil som nedajbože 1% jeho schopností. prečo? No, v prvom rade toto rozhranie, niekde z roku 380, na nás psychicky vyvíjalo tlak. Po druhé, táto mašinka je navrhnutá pre extrémne efektné procesy, zúrivé opätovné použitie komponentov a ďalšie veľmi dôležité podnikové triky. O tom, že to stojí, ako krídlo Airbusu AXNUMX / rok, nepovieme nič.

    Pozor, snímka obrazovky môže ľuďom do 30 rokov trochu ublížiť

    Apache Airflow: Uľahčenie ETL

  • SQL Server Integration Server - tohto súdruha sme použili v našich vnútroprojektových tokoch. No v skutočnosti: SQL Server už používame a bolo by nerozumné nepoužívať jeho ETL nástroje. Všetko v ňom je dobré: aj rozhranie je krásne, aj správy o pokroku... Ale to nie je dôvod, prečo milujeme softvérové ​​produkty, nie preto. Verzia dtsx (čo je XML s uzlami premiešanými pri ukladaní) môžeme, ale aký to má zmysel? Čo tak vytvoriť balík úloh, ktorý pretiahne stovky tabuliek z jedného servera na druhý? Áno, aká stovka, z dvadsiatich kúskov vám odpadne ukazovák kliknutím na tlačidlo myši. Ale rozhodne to vyzerá módnejšie:

    Apache Airflow: Uľahčenie ETL

Určite sme hľadali východiská. Prípad dokonca takmer prišiel k samostatne napísanému generátoru balíčkov SSIS ...

...a potom si ma našla nová práca. A Apache Airflow ma na ňom predbehol.

Keď som zistil, že popisy ETL procesov sú jednoduchý Python kód, netancoval som od radosti. Takto sa verzovali a porovnávali dátové toky a nalievanie tabuliek s jedinou štruktúrou zo stoviek databáz do jedného cieľa sa stalo záležitosťou kódu Python na jeden a pol alebo dve 13” obrazovky.

Zostavenie klastra

Nezariaďujme úplne materskú školu a nehovorme tu o úplne samozrejmých veciach, ako je inštalácia Airflow, vami vybranej databázy, zeleru a ďalších prípadov popísaných v dokoch.

Aby sme mohli okamžite začať experimentovať, načrtol som docker-compose.yml v ktorom:

  • Poďme vlastne zvýšiť Airflow: Plánovač, Webový server. Kvet sa tam bude točiť aj kvôli monitorovaniu úloh zeleru (pretože už bol zatlačený apache/airflow:1.10.10-python3.7, ale nám to nevadí)
  • PostgreSQL, do ktorého Airflow zapíše svoje servisné informácie (údaje plánovača, štatistiky vykonávania atď.) a Celery označí dokončené úlohy;
  • Redis, ktorý bude fungovať ako sprostredkovateľ úloh pre Zeler;
  • Zeler robotník, ktorá sa bude podieľať na priamom plnení úloh.
  • Do priečinka ./dags pridáme naše súbory s popisom dags. Zoberú sa za chodu, takže po každom kýchnutí nie je potrebné žonglovať s celým zásobníkom.

Na niektorých miestach nie je kód v príkladoch úplne zobrazený (aby sa neprehadzoval text), niekde sa však v procese upravuje. Kompletné príklady pracovného kódu nájdete v úložisku https://github.com/dm-logv/airflow-tutorial.

prístavný robotník-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Poznámky:

  • Pri zostavovaní kompozície som sa vo veľkej miere spoliehal na známy obraz puckel/docker-prúdenie vzduchu - určite si to pozrite. Možno už nič viac k životu nepotrebuješ.
  • Všetky nastavenia Airflow sú dostupné nielen cez airflow.cfg, ale aj cez premenné prostredia (vďaka vývojárom), čo som zlomyseľne využil.
  • Prirodzene, nie je pripravený na výrobu: Zámerne som na kontajnery nedal tepy, neobťažoval som sa bezpečnosťou. Ale urobil som minimum vhodné pre našich experimentátorov.
  • Poznač si to:
    • Priečinok dag musí byť prístupný plánovačovi aj pracovníkom.
    • To isté platí pre všetky knižnice tretích strán – všetky musia byť nainštalované na strojoch s plánovačom a pracovníkmi.

No, teraz je to jednoduché:

$ docker-compose up --scale worker=3

Keď sa všetko zdvihne, môžete sa pozrieť na webové rozhrania:

Základné pojmy

Ak ste zo všetkých týchto „dagov“ ničomu nerozumeli, tu je krátky slovník:

  • Scheduler - najdôležitejší strýko v Airflow, ktorý kontroluje, či roboty tvrdo pracujú, a nie človek: monitoruje plán, aktualizuje dags, spúšťa úlohy.

    Vo všeobecnosti mal v starších verziách problémy s pamäťou (nie, nie amnézia, ale úniky) a parameter legacy dokonca zostal v konfiguráciách run_duration — jeho interval reštartu. Ale teraz je už všetko v poriadku.

  • DAG (aka "dag") - "riadený acyklický graf", ale takáto definícia povie málokomu, ale v skutočnosti je to kontajner na úlohy, ktoré sa navzájom ovplyvňujú (pozri nižšie) alebo analóg Package v SSIS a Workflow v Informatica .

    Okrem dagov môžu ešte existovať poddagy, ale k tým sa s najväčšou pravdepodobnosťou nedostaneme.

  • DAG Run - inicializovaný dag, ktorému je priradený vlastný execution_date. Dagrany toho istého dagu môžu fungovať paralelne (samozrejme, ak ste svoje úlohy urobili idempotentnými).
  • operátor sú časti kódu zodpovedné za vykonanie konkrétnej akcie. Existujú tri typy operátorov:
    • akčnáako náš obľúbený PythonOperator, ktorý dokáže spustiť akýkoľvek (platný) kód Pythonu;
    • prevod, ktoré prenášajú dáta z miesta na miesto, povedzme, MsSqlToHiveTransfer;
    • senzor na druhej strane vám umožní zareagovať alebo spomaliť ďalšie vykonávanie dag, kým nedôjde k udalosti. HttpSensor môže stiahnuť zadaný koncový bod a keď počká požadovaná odpoveď, spustiť prenos GoogleCloudStorageToS3Operator. Zvedavá myseľ sa bude pýtať: „Prečo? Veď opakovania môžete robiť priamo v operátorovi!“ A potom, aby nedošlo k upchatiu fondu úloh pozastavenými operátormi. Senzor sa spustí, skontroluje a pred ďalším pokusom sa vypne.
  • úloha - deklarovaní operátori bez ohľadu na typ a pripojení k dag sú povýšení do hodnosti úlohy.
  • inštancia úlohy - keď sa generálny plánovač rozhodol, že je čas poslať úlohy do boja na výkonných robotníkoch (priamo na mieste, ak použijeme LocalExecutor alebo do vzdialeného uzla v prípade CeleryExecutor), priradí im kontext (t. j. množinu premenných – parametre vykonávania), rozšíri šablóny príkazov alebo dotazov a združí ich.

Vytvárame úlohy

Najprv si načrtneme všeobecnú schému nášho douga a potom sa budeme stále viac a viac ponoriť do detailov, pretože aplikujeme niektoré netriviálne riešenia.

Takže vo svojej najjednoduchšej podobe bude takýto dag vyzerať takto:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Poďme na to:

  • Najprv importujeme potrebné knižnice a niečo iné;
  • sql_server_ds - je List[namedtuple[str, str]] s názvami spojení z Airflow Connections a databázami, z ktorých si vezmeme náš tanier;
  • dag - oznámenie nášho dagu, ktoré musí byť nevyhnutne in globals(), inak to Airflow nenájde. Doug tiež musí povedať:
    • aké je jeho meno orders - tento názov sa následne zobrazí vo webovom rozhraní,
    • že bude pracovať od polnoci ôsmeho júla,
    • a mal by bežať približne každých 6 hodín (pre tvrdých chlapov tu namiesto timedelta() prípustné cron-linka 0 0 0/6 ? * * *, pre menej cool - výraz ako @daily);
  • workflow() bude robiť hlavnú prácu, ale nie teraz. Zatiaľ len vyklopíme náš kontext do denníka.
  • A teraz jednoduché kúzlo vytvárania úloh:
    • prechádzame cez naše zdroje;
    • inicializovať PythonOperator, ktorá popraví našu figurínu workflow(). Nezabudnite špecifikovať jedinečný (v rámci dag) názov úlohy a priviazať samotný dag. Vlajka provide_context na oplátku naleje do funkcie ďalšie argumenty, ktoré opatrne zozbierame pomocou **context.

Zatiaľ je to všetko. Čo sme dostali:

  • nový dag vo webovom rozhraní,
  • jeden a pol stovky úloh, ktoré sa budú vykonávať paralelne (ak to dovoľuje Airflow, Celery nastavenia a kapacita servera).

No, skoro som to pochopil.

Apache Airflow: Uľahčenie ETL
Kto nainštaluje závislosti?

Aby som to celé zjednodušil, posral som sa docker-compose.yml spracovanie requirements.txt na všetkých uzloch.

Teraz je to preč:

Apache Airflow: Uľahčenie ETL

Sivé štvorce sú inštancie úloh spracované plánovačom.

Chvíľu počkáme, úlohy vybavia pracovníci:

Apache Airflow: Uľahčenie ETL

Zelení, samozrejme, svoje dielo úspešne dokončili. Červeným sa veľmi nedarí.

Mimochodom, na našom prod ./dags, neexistuje synchronizácia medzi strojmi - všetky dags ležia v git na našom Gitlabe a Gitlab CI distribuuje aktualizácie do počítačov pri zlúčení master.

Trochu o Kvete

Kým nám robotníci mlátia cumlíky, spomeňme si na ďalšiu pomôcku, ktorá nám môže niečo ukázať – Kvet.

Úplne prvá stránka so súhrnnými informáciami o pracovných uzloch:

Apache Airflow: Uľahčenie ETL

Najintenzívnejšia stránka s úlohami, ktoré šli do práce:

Apache Airflow: Uľahčenie ETL

Najnudnejšia stránka so statusom nášho brokera:

Apache Airflow: Uľahčenie ETL

Najjasnejšia stránka je s grafmi stavu úloh a časom ich vykonania:

Apache Airflow: Uľahčenie ETL

Podťažené zaťažíme

Takže všetky úlohy sa splnili, môžete odniesť zranených.

Apache Airflow: Uľahčenie ETL

A bolo veľa zranených - z jedného alebo druhého dôvodu. V prípade správneho použitia Airflow práve tieto štvorčeky naznačujú, že dáta rozhodne nedorazili.

Musíte sledovať denník a reštartovať spadnuté inštancie úloh.

Kliknutím na ktorýkoľvek štvorec sa nám zobrazia akcie, ktoré máme k dispozícii:

Apache Airflow: Uľahčenie ETL

Môžete vziať a urobiť Clear padlých. To znamená, že zabudneme, že tam niečo zlyhalo a rovnaká úloha inštancie prejde do plánovača.

Apache Airflow: Uľahčenie ETL

Je jasné, že robiť to s myšou so všetkými červenými štvorčekmi nie je veľmi humánne – toto od Airflow neočakávame. Prirodzene, máme zbrane hromadného ničenia: Browse/Task Instances

Apache Airflow: Uľahčenie ETL

Vyberme všetko naraz a resetujeme na nulu, klikneme na správnu položku:

Apache Airflow: Uľahčenie ETL

Naše taxíky po vyčistení vyzerajú takto (už čakajú, kým ich plánovač naplánuje):

Apache Airflow: Uľahčenie ETL

Spojenia, háčiky a iné premenné

Je čas pozrieť sa na ďalší DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Urobil niekedy každý aktualizáciu správy? Toto je opäť ona: je tu zoznam zdrojov, odkiaľ možno získať údaje; existuje zoznam, kam umiestniť; nezabudnite zatrúbiť, keď sa všetko stalo alebo zlomilo (no, toto nie je o nás, nie).

Poďme si znova prejsť súbor a pozrieť sa na nové nejasné veci:

  • from commons.operators import TelegramBotSendMessage - nič nám nebráni vo výrobe vlastných operátorov, čo sme využili tým, že sme urobili malý obal na posielanie správ na Unblocked. (Viac o tomto operátorovi si povieme nižšie);
  • default_args={} - dag môže distribuovať rovnaké argumenty všetkým svojim operátorom;
  • to='{{ var.value.all_the_kings_men }}' - lúka to nebudeme mať napevno, ale dynamicky generované pomocou Jinja a premennej so zoznamom emailov, ktoré som opatrne vložil Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — podmienka pre spustenie operátora. V našom prípade list poletí šéfom iba vtedy, ak budú fungovať všetky závislosti úspešne;
  • tg_bot_conn_id='tg_main' - argumenty conn_id akceptovať ID pripojenia, ktoré vytvoríme Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - správy v telegrame odletia iba vtedy, ak sú tam padnuté úlohy;
  • task_concurrency=1 - zakazujeme súčasné spustenie niekoľkých inštancií úloh jednej úlohy. V opačnom prípade získame súčasné spustenie niekoľkých VerticaOperator (pri pohľade na jeden stôl);
  • report_update >> [email, tg] - všetky VerticaOperator konvergovať v odosielaní listov a správ, ako je toto:
    Apache Airflow: Uľahčenie ETL

    Ale keďže operátori oznamujúcich majú rôzne podmienky spustenia, bude fungovať iba jeden. V stromovom zobrazení všetko vyzerá trochu menej vizuálne:
    Apache Airflow: Uľahčenie ETL

Poviem pár slov o makrá a ich priatelia - premenné.

Makrá sú zástupné symboly Jinja, ktoré môžu nahradiť rôzne užitočné informácie do argumentov operátorov. Napríklad takto:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} sa rozšíri na obsah kontextovej premennej execution_date vo formáte YYYY-MM-DD: 2020-07-14. Najlepšie na tom je, že kontextové premenné sú pribité ku konkrétnej inštancii úlohy (štvorec v stromovom zobrazení) a po reštartovaní sa zástupné symboly rozšíria na rovnaké hodnoty.

Priradené hodnoty je možné zobraziť pomocou tlačidla Rendered na každej inštancii úlohy. Takto vyzerá úloha s odoslaním listu:

Apache Airflow: Uľahčenie ETL

A tak pri úlohe s odoslaním správy:

Apache Airflow: Uľahčenie ETL

Kompletný zoznam vstavaných makier pre najnovšiu dostupnú verziu je k dispozícii tu: referencia makier

Navyše pomocou pluginov môžeme deklarovať vlastné makrá, ale to je už iný príbeh.

Okrem preddefinovaných vecí môžeme nahradiť hodnoty našich premenných (už som to použil v kóde vyššie). Poďme tvoriť Admin/Variables par veci:

Apache Airflow: Uľahčenie ETL

Všetko, čo môžete použiť:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Hodnota môže byť skalárna alebo to môže byť aj JSON. V prípade JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

stačí použiť cestu k požadovanému kľúču: {{ var.json.bot_config.bot.token }}.

Doslova poviem jedno slovo a ukážem jednu snímku obrazovky prípojka. Všetko je tu elementárne: na stránke Admin/Connections vytvoríme spojenie, pridáme tam naše prihlasovacie mená / heslá a konkrétnejšie parametre. Páči sa ti to:

Apache Airflow: Uľahčenie ETL

Heslá môžu byť šifrované (dôkladnejšie ako predvolené) alebo môžete vynechať typ pripojenia (ako som to urobil v prípade tg_main) - faktom je, že zoznam typov je v modeloch Airflow pevne zapojený a nedá sa rozšíriť bez toho, aby som sa dostal do zdrojových kódov (ak som zrazu niečo nevygooglil, opravte ma), ale nič nám nezabráni získať kredity len tak názov.

Môžete tiež vytvoriť niekoľko spojení s rovnakým názvom: v tomto prípade metóda BaseHook.get_connection(), ktorý nám dostane spojenia podľa názvu, dá náhodný od viacerých menovcov (logickejšie by bolo spraviť Round Robin, ale nechajme to na svedomí vývojárov Airflow).

Premenné a pripojenia sú určite skvelé nástroje, ale je dôležité nestratiť rovnováhu: ktoré časti vašich tokov ukladáte v samotnom kóde a ktoré časti dáte Airflow na uloženie. Na jednej strane môže byť rýchla zmena hodnoty, napríklad poštovej schránky, pohodlná prostredníctvom používateľského rozhrania. Na druhej strane je to stále návrat ku klikaniu myšou, ktorého sme sa (ja) chceli zbaviť.

Práca s prepojeniami je jednou z úloh háčiky. Vo všeobecnosti sú háčiky Airflow body na pripojenie k službám a knižniciam tretích strán. napr. JiraHook otvorí nám klienta na interakciu s Jirou (úlohy môžete presúvať tam a späť) a pomocou SambaHook môžete odoslať lokálny súbor smb-bod.

Analýza vlastného operátora

A priblížili sme sa k tomu, ako sa to vyrába TelegramBotSendMessage

kód commons/operators.py so skutočným operátorom:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Tu, rovnako ako všetko ostatné v Airflow, je všetko veľmi jednoduché:

  • Zdedené od BaseOperator, ktorá implementuje pomerne veľa vecí špecifických pre prúdenie vzduchu (pozrite sa na svoj voľný čas)
  • Deklarované polia template_fields, v ktorom bude Jinja hľadať makrá na spracovanie.
  • Usporiadal správne argumenty pre __init__(), v prípade potreby nastavte predvolené hodnoty.
  • Nezabudli sme ani na inicializáciu predka.
  • Otvorili príslušný hák TelegramBotHookdostala od nej klientský predmet.
  • Prepísaná (predefinovaná) metóda BaseOperator.execute(), ktorým Airfow škubne, keď príde čas na spustenie operátora - v ňom implementujeme hlavnú akciu, zabudnutie na prihlásenie. (Mimochodom, prihlasujeme sa priamo stdout и stderr - Prúd vzduchu všetko zachytí, krásne zabalí, rozloží tam, kde je to potrebné.)

Pozrime sa, čo máme commons/hooks.py. Prvá časť súboru so samotným háčikom:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ani neviem, čo tu mám vysvetliť, len si všimnem dôležité body:

  • Zdedíme, premýšľajte o argumentoch - vo väčšine prípadov to bude jeden: conn_id;
  • Prevažujúce štandardné metódy: Obmedzil som sa get_conn(), v ktorom dostanem parametre pripojenia podľa názvu a dostanem len sekciu extra (toto je pole JSON), do ktorého som (podľa vlastných pokynov!) vložil token telegramového bota: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Vytváram inštanciu nášho TelegramBot, čím jej poskytnete konkrétny token.

To je všetko. Môžete získať klienta z háku pomocou TelegramBotHook().clent alebo TelegramBotHook().get_conn().

A druhá časť súboru, v ktorej robím microwrapper pre Telegram REST API, aby som neťahal to isté python-telegram-bot pre jednu metódu sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Správny spôsob je sčítať všetko: TelegramBotSendMessage, TelegramBotHook, TelegramBot - v doplnku vložte do verejného úložiska a dajte ho Open Source.

Kým sme to všetko študovali, naše aktualizácie prehľadov úspešne zlyhali a poslali mi chybové hlásenie do kanála. idem to skontrolovat, ci to nie je zle...

Apache Airflow: Uľahčenie ETL
Niečo sa zlomilo v našom doge! Nie je to to, čo sme očakávali? presne tak!

Ideš naliať?

Máš pocit, že mi niečo uniklo? Zdá sa, že sľúbil, že prenesie údaje zo servera SQL Server do Vertica, a potom to vzal a odstúpil od témy, ten darebák!

Toto zverstvo bolo zámerné, jednoducho som vám musel rozlúštiť nejakú terminológiu. Teraz môžete ísť ďalej.

Náš plán bol takýto:

  1. Do dag
  2. Generovať úlohy
  3. Pozrite sa, aké je všetko krásne
  4. Priraďte čísla relácií výplniam
  5. Získajte údaje zo servera SQL Server
  6. Vložte údaje do Vertica
  7. Zbierajte štatistiky

Aby sme to všetko rozbehli, urobil som malý doplnok k nášmu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tam vychováme:

  • Vertica ako hostiteľ dwh s najviac predvolenými nastaveniami,
  • tri inštancie SQL Server,
  • do databáz v poslednom uvedenom naplníme nejaké údaje (v žiadnom prípade sa nedívajte do mssql_init.py!)

Všetko dobré spúšťame pomocou trochu komplikovanejšieho príkazu ako naposledy:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

To, čo vygeneroval náš zázračný randomizér, môžete použiť Data Profiling/Ad Hoc Query:

Apache Airflow: Uľahčenie ETL
Hlavná vec je neukázať to analytikom

rozviesť ETL relácie Nebudem, všetko je tam triviálne: vytvoríme základňu, je v nej znak, všetko zabalíme do kontextového manažéra a teraz urobíme toto:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Čas nadišiel zbierať naše údaje z našich jeden a pol sto stolov. Urobme to pomocou veľmi nenáročných riadkov:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Pomocou háku dostaneme z Airflow pymssql-spojiť
  2. Nahraďte do požiadavky obmedzenie v podobe dátumu - do funkcie ho hodí engine šablón.
  3. Kŕmenie našej žiadosti pandaskto nás dostane DataFrame - bude nám to užitočné v budúcnosti.

Používam substitúciu {dt} namiesto parametra požiadavky %s nie preto, že som zlý Pinocchio, ale preto pandas nezvládne pymssql a pošmykne posledný params: Listhoci veľmi chce tuple.
Upozorňujeme tiež, že vývojár pymssql rozhodol, že ho už nebude podporovať, a je čas sa odsťahovať pyodbc.

Pozrime sa, čím Airflow naplnil argumenty našich funkcií:

Apache Airflow: Uľahčenie ETL

Ak nie sú žiadne údaje, nemá zmysel pokračovať. Ale je tiež zvláštne považovať výplň za vydarenú. Ale to nie je chyba. A-ah-ah, čo robiť?! A tu je čo:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException povie Airflow, že neexistujú žiadne chyby, ale úlohu preskočíme. Rozhranie nebude mať zelený alebo červený štvorec, ale ružový.

Zahoďme svoje údaje viac stĺpcov:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

A to

  • Databáza, z ktorej sme prevzali objednávky,
  • ID našej záplavovej relácie (bude iné pre každú úlohu),
  • Hash zo zdroja a ID objednávky - aby sme vo finálnej databáze (kde je všetko naliate do jednej tabuľky) mali jedinečné ID objednávky.

Zostáva predposledný krok: nalejte všetko do Verticy. A napodiv, jeden z najpozoruhodnejších a najefektívnejších spôsobov, ako to dosiahnuť, je prostredníctvom CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Vyrábame špeciálny prijímač StringIO.
  2. pandas láskavo položí náš DataFrame vo forme CSV- čiary.
  3. Otvorme háčikom spojenie s našou obľúbenou Verticou.
  4. A teraz s pomocou copy() pošlite naše údaje priamo Vertike!

Zoberieme od vodiča, koľko riadkov bolo vyplnených, a povieme manažérovi relácie, že je všetko v poriadku:

session.loaded_rows = cursor.rowcount
session.successful = True

To je všetko.

Pri predaji vytvárame cieľovú platňu ručne. Tu som si dovolil malý stroj:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

používam VerticaOperator() Vytvorím databázovú schému a tabuľku (ak ešte neexistujú, samozrejme). Hlavná vec je správne usporiadať závislosti:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Sčítanie

- Dobre, - povedala myška, - nie, teraz
Si presvedčený, že som najstrašnejšie zviera v lese?

Julia Donaldson, The Gruffalo

Myslím, že keby sme s kolegami súťažili: kto rýchlo vytvorí a spustí proces ETL od nuly: oni so svojím SSIS a myšou a ja s Airflow ... A potom by sme porovnali aj jednoduchosť údržby ... Wow, myslím, že budete súhlasiť, že ich porazím na všetkých frontoch!

Ak trochu vážnejšie, tak Apache Airflow - popisom procesov vo forme programového kódu - urobil moju prácu viac pohodlnejšie a príjemnejšie.

Jeho neobmedzená rozšíriteľnosť, čo sa týka zásuvných modulov a predispozície k škálovateľnosti, vám dáva možnosť využiť Airflow takmer v akejkoľvek oblasti: dokonca aj v celom cykle zberu, prípravy a spracovania dát, dokonca aj pri štarte rakiet (na Mars, resp. kurz).

Časť záverečná, referencie a informácie

Hrable, ktoré sme pre vás nazbierali

  • start_date. Áno, toto je už lokálny meme. Via Dougov hlavný argument start_date všetci prejdú. Stručne povedané, ak uvediete v start_date aktuálny dátum a schedule_interval - jeden deň, potom DAG začne zajtra nie skôr.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    A už žiadne problémy.

    Je s tým spojená ďalšia runtime chyba: Task is missing the start_date parameter, čo najčastejšie naznačuje, že ste sa zabudli naviazať na operátora dag.

  • Všetko na jednom stroji. Áno, a základne (samotné prúdenie vzduchu a náš náter) a webový server, plánovač a pracovníci. A dokonca to fungovalo. Postupom času však počet úloh pre služby rástol a keď PostgreSQL začal reagovať na index za 20 s namiesto 5 ms, zobrali sme to a odniesli.
  • LocalExecutor. Áno, stále na ňom sedíme a už sme prišli na okraj priepasti. LocalExecutor nám zatiaľ stačil, no teraz nastal čas na rozšírenie aspoň o jedného pracovníka a na prechod na CeleryExecutor budeme musieť tvrdo pracovať. A vzhľadom na to, že s ním môžete pracovať na jednom stroji, nič vám nebráni používať zeler ani na serveri, ktorý „samozrejme, nikdy nepôjde do výroby, úprimne!“
  • Nepoužitie vstavané nástroje:
    • pripojenie na ukladanie servisných poverení,
    • Miss SLA reagovať na úlohy, ktoré nefungovali včas,
    • xcom na výmenu metadát (povedal som metadát!) medzi dag úlohami.
  • Zneužívanie pošty. No, čo môžem povedať? Na všetky opakovania padlých úloh boli nastavené upozornenia. Teraz má môj pracovný Gmail viac ako 90 100 e-mailov od Airflow a náhubok webovej pošty odmieta vyzdvihnúť a odstrániť viac ako XNUMX naraz.

Ďalšie úskalia: Apache Airflow Pitfails

Viac nástrojov na automatizáciu

Aby sme mohli ešte viac pracovať hlavou a nie rukami, Airflow si pre nás pripravil toto:

  • REST API - stále má status Experimental, čo mu však nebráni v práci. S ním môžete nielen získať informácie o dagoch ​​a úlohách, ale aj zastaviť/spustiť dag, vytvoriť DAG Run alebo pool.
  • CLI - cez príkazový riadok je dostupných veľa nástrojov, ktoré nie sú len nepohodlné na používanie cez WebUI, ale vo všeobecnosti chýbajú. Napríklad:
    • backfill potrebné na reštartovanie inštancií úloh.
      Napríklad analytici prišli a povedali: „A ty, súdruh, máš nezmysly v údajoch od 1. do 13. januára! Opravte to, opravte to, opravte to, opravte to!" A ty si taká varná doska:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Základná služba: initdb, resetdb, upgradedb, checkdb.
    • run, ktorá vám umožňuje spustiť jednu úlohu inštancie a dokonca skóre na všetkých závislostiach. Navyše ho môžete spustiť cez LocalExecutor, aj keď máte zeler klaster.
    • Robí skoro to isté test, len take v zakladoch nepise nic.
    • connections umožňuje hromadné vytváranie spojov zo škrupiny.
  • python api - pomerne tvrdý spôsob interakcie, ktorý je určený pre pluginy, a nie s malými rukami. Ale kto nám zabráni ísť /home/airflow/dags, beh ipython a začať makať? Môžete napríklad exportovať všetky pripojenia pomocou nasledujúceho kódu:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Pripája sa k metadatabáze Airflow. Neodporúčam do nej písať, ale získanie stavov úloh pre rôzne špecifické metriky môže byť oveľa rýchlejšie a jednoduchšie ako cez ktorékoľvek z API.

    Povedzme, že nie všetky naše úlohy sú idempotentné, ale niekedy môžu padnúť, a to je normálne. Ale pár upchávok je už podozrivých a bolo by potrebné skontrolovať.

    Pozor na SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referencie

A samozrejme, prvých desať odkazov z vydania Google je obsah priečinka Airflow z mojich záložiek.

A odkazy použité v článku:

Zdroj: hab.com