Apache Airflow: ETL mai ușor

Bună, sunt Dmitry Logvinenko - inginer de date al departamentului de analiză al grupului de companii Vezet.

Vă voi spune despre un instrument excelent pentru dezvoltarea proceselor ETL - Apache Airflow. Dar Airflow este atât de versatil și de multiple fațete încât ar trebui să îl priviți mai atent chiar dacă nu sunteți implicat în fluxurile de date, dar aveți nevoie să lansați periodic orice procese și să monitorizați execuția acestora.

Și da, nu doar vă spun, ci vă voi arăta și: programul conține o mulțime de coduri, capturi de ecran și recomandări.

Apache Airflow: ETL mai ușor
Ce vedeți de obicei când căutați pe Google cuvântul Airflow / Wikimedia Commons

Cuprins

Introducere

Apache Airflow este la fel ca Django:

  • scris in Python,
  • există un administrator excelent,
  • extensibil nelimitat

- doar mai bine și făcut în scopuri complet diferite, și anume (așa cum este scris înainte de kata):

  • lansarea și monitorizarea sarcinilor pe un număr nelimitat de mașini (atât de multe țelină/Kubernet-uri și conștiința îți vor permite)
  • cu generare dinamică a fluxului de lucru din cod Python foarte ușor de scris și de înțeles
  • și capacitatea de a conecta orice baze de date și API-uri între ele folosind atât componente gata făcute, cât și pluginuri de casă (ceea ce se face extrem de simplu).

Folosim Apache Airflow astfel:

  • colectăm date din diverse surse (multe instanțe SQL Server și PostgreSQL, diverse API-uri cu metrici de aplicație, chiar 1C) în DWH și ODS (pentru noi este Vertica și Clickhouse).
  • ca avansat cron, care rulează procese de consolidare a datelor pe ODS și, de asemenea, monitorizează întreținerea acestora.

Până de curând, nevoile noastre erau acoperite de un server mic cu 32 de nuclee și 50 GB de RAM. În Airflow, aceasta funcționează:

  • mai mult 200 de zile (de fapt, fluxuri de lucru în care am completat sarcinile),
  • în fiecare în medie 70 de sarcini,
  • chestia asta începe (de asemenea, în medie) o dată pe oră.

Voi scrie mai jos despre cum ne-am extins, dar acum să definim sarcina ulterioară pe care o vom rezolva:

Există trei servere SQL sursă, fiecare cu 50 de baze de date - instanțele unui proiect, respectiv, structura lor este aceeași (aproape peste tot, muah-ha-ha), ceea ce înseamnă că fiecare are un tabel Comenzi (din fericire, puteți avea un tabel cu acest nume împinge în orice afacere). Luăm datele adăugând câmpuri de serviciu (server sursă, bază de date sursă, identificator de activitate ETL) și le aruncăm naiv în, de exemplu, Vertica.

Să mergem!

Partea este de bază, practică (și puțin teoretică)

De ce avem nevoie de ea (și de tine)

Când copacii erau mari și eu eram simplu SQLÎn calitate de manager într-un retail rusesc, am scalat procesele ETL, numite fluxuri de date, folosind două instrumente disponibile pentru noi:

  • Informatica Power Center - un sistem extrem de versatil, extrem de productiv, cu hardware propriu, versiune proprie. Dacă vrea Dumnezeu, am folosit 1% din capacitățile sale. De ce? Ei bine, în primul rând, această interfață a pus presiune psihologică asupra noastră undeva în anii 380. În al doilea rând, acest lucru este conceput pentru procese extrem de sofisticate, reutilizarea furioasă a componentelor și alte caracteristici foarte importante pentru întreprindere. Nu vom spune nimic despre faptul că costă la fel de mult ca o aripă Airbus AXNUMX pe an.

    Atenție, captura de ecran poate răni puțin persoanele sub 30 de ani

    Apache Airflow: ETL mai ușor

  • Server de integrare SQL Server — l-am folosit pe acest tip în fluxurile noastre interne de proiect. Ei bine, de fapt: folosim deja SQL Server și ar fi cumva nerezonabil să nu folosim instrumentele sale ETL. Totul este bun: interfața este frumoasă, iar rapoartele de progres... Dar nu de aceea ne plac produsele software, oh, nu pentru asta. Versiune dtsx (care este XML cu noduri care sunt amestecate atunci când sunt salvate) putem, dar care este rostul? Ce zici de a face un pachet de sarcini care să tragă o sută de tabele de la un server la altul? De ce, o sută, douăzeci dintre ele vor face să îți cadă degetul arătător în timp ce faci clic pe butonul mouse-ului. Dar cu siguranță arată mai la modă:

    Apache Airflow: ETL mai ușor

Cu siguranță căutăm căi de ieșire. Este chiar aproape a ajuns la un generator de pachete SSIS auto-scris...

... și apoi m-a găsit un nou loc de muncă. Și pe el Apache Airflow m-a depășit.

Când am aflat că descrierile proceselor ETL sunt doar cod Python simplu, nu aș fi putut să dansez de bucurie. Acesta este modul în care fluxurile de date au fost supuse versiunilor și diferențierii, iar turnarea tabelelor cu o singură structură din sute de baze de date într-o singură țintă a devenit o chestiune de cod Python în unul și jumătate până la două ecrane de 13 inchi.

Asamblarea unui cluster

Să nu facem din aceasta o grădiniță completă și să nu vorbim aici despre lucruri complet evidente, cum ar fi instalarea Airflow, baza de date aleasă, țelină și alte lucruri descrise în documente.

Ca să putem începe să experimentăm imediat, am schițat docker-compose.yml in care:

  • Să ridicăm realul Debit de aer: Programator, Webserver. Flower va rula acolo și pentru monitorizarea sarcinilor de țelină (pentru că a fost deja împins în apache/airflow:1.10.10-python3.7, și nu ne deranjează);
  • PostgreSQL, în care Airflow își va scrie informațiile de serviciu (date planificatorului, statistici de execuție etc.), iar Țelina va marca sarcinile finalizate;
  • Redis, care va acționa ca broker de sarcini pentru țelină;
  • Muncitor la telina, care va îndeplini direct sarcinile.
  • În dosar ./dags Vom aduna fișierele noastre cu descrieri ale dag-urilor. Vor fi ridicați din mers, așa că nu este nevoie să mutați întregul teanc după fiecare strănut.

În unele locuri codul din exemple nu este dat în întregime (pentru a nu aglomera textul), iar în unele locuri este modificat în proces. Exemple complete de cod de lucru pot fi găsite în depozit https://github.com/dm-logv/airflow-tutorial.

Docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Note:

  • În asamblarea compoziției, m-am bazat în mare măsură pe imaginea binecunoscută pukel/docker-flux de aer - asigurați-vă că îl verificați. Poate nu vei avea nevoie de nimic altceva în viață.
  • Toate setările pentru fluxul de aer sunt disponibile nu numai prin airflow.cfg, dar și prin variabile de mediu (slavă dezvoltatorilor), de care am profitat cu răutate.
  • Desigur, nu este gata de producție: nu am pus în mod deliberat bătăi de inimă pe containere și nu m-am deranjat cu securitatea. Dar am făcut minimul care era potrivit pentru experimentatorii noștri.
  • Rețineți că:
    • Dosarul cu dags ar trebui să fie accesibil atât pentru planificator, cât și pentru lucrători.
    • Același lucru se aplică tuturor bibliotecilor terțe - toate trebuie instalate pe mașini cu un planificator și lucrători.

Ei bine, acum este simplu:

$ docker-compose up --scale worker=3

După ce totul s-a terminat, vă puteți uita la interfețele web:

Concepte de bază

Dacă nu înțelegeți nimic din toate aceste „dags”, atunci iată un scurt glosar:

  • Scheduler - cel mai important tip din Airflow, care se asigură că roboții muncesc din greu, și nu oamenii: monitorizează programul, actualizează datele, execută sarcini.

    În general, în versiunile mai vechi, avea probleme cu memoria (nu, nu amnezie, ci scurgeri) și a existat chiar și un parametru moștenit în configurații run_duration — intervalul său de repornire. Dar acum totul este bine.

  • DAG (alias „dag”) este un „graf aciclic direcționat”, dar o astfel de definiție va însemna puțin pentru oricine, dar în esență este un container pentru sarcini care interacționează între ele (a se vedea mai jos) sau un analog al pachetului în SSIS și fluxul de lucru in Informatica.

    Pe lângă dag-uri, pot fi și sabdag-uri, dar cel mai probabil nu vom ajunge la ele.

  • DAG Run — un dag inițializat, căruia îi este atribuit propriul său execution_date. Dagranurile unui dag pot funcționa în paralel (dacă, desigur, ți-ai făcut sarcinile idempotente).
  • Operator - acestea sunt bucăți de cod responsabile pentru efectuarea unei anumite acțiuni. Există trei tipuri de operatori:
    • acțiune, ca iubitul nostru PythonOperator, care poate executa orice cod Python (valid);
    • transfer, care transportă date din loc în loc, să zicem MsSqlToHiveTransfer;
    • senzor De asemenea, vă va permite să reacționați sau să încetiniți execuția ulterioară a dag-ului înainte de apariția oricărui eveniment. HttpSensor poate trage punctul final specificat și, când este primit răspunsul dorit, începe transferul GoogleCloudStorageToS3Operator. O minte curioasă se va întreba: „De ce? La urma urmei, poți face repetări chiar în operator!” Și apoi, pentru a nu înfunda grupul de activități cu operatori blocați. Senzorul pornește, testează și moare până la următoarea încercare.
  • Sarcină — operatorii declarați, indiferent de tip, și atașați unui dag sunt promovați la rangul de sarcină.
  • Instanță de sarcină — când planificatorul general a hotărât că este timpul să trimitem sarcinile în luptă împotriva lucrătorilor executanți (chiar la fața locului, dacă folosim LocalExecutor sau la un nod la distanță în cazul CeleryExecutor), le atribuie un context (adică un set de variabile - parametri de execuție), extinde șabloanele de comandă sau cerere și le pune într-un pool.

Generarea sarcinilor

Mai întâi, să schițăm schema generală a dag-ului nostru, iar apoi ne vom scufunda din ce în ce mai mult în detalii, pentru că folosim câteva soluții non-triviale.

Deci, în forma sa cea mai simplă, un astfel de dag va arăta astfel:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Să ne dăm seama:

  • Mai întâi, importați bibliotecile necesare și altceva;
  • sql_server_ds - E List[namedtuple[str, str]] cu numele conexiunilor din Airflow Connections și bazele de date din care ne vom lua placa;
  • dag - un anunț de la dag-ul nostru, care trebuie să fie în globals(), altfel Airflow nu îl va găsi. Doug mai trebuie să spună:
    • Cum îl cheamă orders - acest nume va apărea apoi în interfața web,
    • că va funcționa începând cu miezul nopții pe 8 iulie,
    • și ar trebui să se lanseze aproximativ la fiecare 6 ore (pentru băieții cool, aici în schimb timedelta() admisibile cron-linia 0 0 0/6 ? * * *, pentru cei mai puțin cool - o expresie ca @daily);
  • workflow() va face treaba principală, dar nu acum. Acum ne vom arunca pur și simplu contextul în jurnal.
  • Și acum magia simplă a creării sarcinilor:
    • Să trecem prin sursele noastre;
    • inițializați PythonOperator, care va realiza manechinul nostru workflow(). Nu uitați să specificați un nume unic (în cadrul dag) al sarcinii și să atașați dag-ul în sine. Steag provide_context la rândul său, va turna argumente suplimentare în funcție, pe care le vom colecta cu atenție folosind **context.

Asta este tot pentru acum. Ce avem:

  • nou dag în interfața web,
  • o sută și jumătate de sarcini care vor fi executate în paralel (dacă setările pentru fluxul de aer, țelina și puterea serverului o permit).

Ei bine, aproape ne-am înțeles.

Apache Airflow: ETL mai ușor
Cine va instala dependențele?

Pentru a simplifica toată această chestiune, am introdus-o docker-compose.yml prelucrare requirements.txt pe toate nodurile.

Acum iată-ne:

Apache Airflow: ETL mai ușor

Pătratele gri sunt instanțe de activitate procesate de planificator.

Așteptăm puțin, sarcinile sunt preluate de muncitori:

Apache Airflow: ETL mai ușor

Cei verzi, desigur, au funcționat cu succes. Roșii nu prea au succes.

Apropo, nu există niciun folder pe produsul nostru ./dags, nu există sincronizare între mașini - toate datele sunt în git pe Gitlab, iar Gitlab CI distribuie actualizări pe mașini atunci când se îmbină master.

Un pic despre Floare

În timp ce muncitorii ne macină manechinele, să ne amintim despre un alt instrument care ne poate arăta ceva - Floarea.

Prima pagină cu informații rezumate despre nodurile de lucru:

Apache Airflow: ETL mai ușor

Cea mai saturată pagină cu sarcini trimise la serviciu:

Apache Airflow: ETL mai ușor

Cea mai plictisitoare pagină cu statutul brokerului nostru:

Apache Airflow: ETL mai ușor

Cea mai frapantă pagină este cu grafice ale stării sarcinilor și al timpului lor de execuție:

Apache Airflow: ETL mai ușor

Reîncărcăm cele subîncărcate

Deci, toate sarcinile au fost îndeplinite, răniții pot fi duși.

Apache Airflow: ETL mai ușor

Și au fost destul de mulți răniți - dintr-un motiv sau altul. Dacă Airflow este utilizat corect, aceleași pătrate indică faptul că datele nu au ajuns cu siguranță.

Trebuie să vă uitați la jurnal și să reporniți instanțe de sarcină căzute.

Făcând clic pe orice pătrat, vom vedea acțiunile disponibile pentru noi:

Apache Airflow: ETL mai ușor

Poți să-l iei și să faci clar pentru cel căzut. Adică, uităm că ceva a căzut acolo și aceeași instanță de sarcină va merge la programator.

Apache Airflow: ETL mai ușor

Este clar că a face asta cu mouse-ul cu toate pătratele roșii nu este foarte uman - nu este ceea ce ne așteptăm de la Airflow. Desigur, avem arme de distrugere în masă: Browse/Task Instances

Apache Airflow: ETL mai ușor

Să selectăm totul deodată și să-l resetam la zero, faceți clic pe elementul corect:

Apache Airflow: ETL mai ușor

După curățare, taxiurile noastre arată astfel (abia așteaptă ca programatorul să le programeze):

Apache Airflow: ETL mai ușor

Conexiuni, cârlige și alte variabile

Este timpul să ne uităm la următorul DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Toată lumea și-a actualizat vreodată rapoartele, nu? Iată din nou: există o listă de surse din care să obțineți date; există o listă cu unde să-l pun; nu uitați să claxonați când totul se întâmplă sau se strică (ei bine, nu este vorba despre noi, nu).

Să parcurgem din nou fișierul și să ne uităm la noile lucruri ciudate:

  • from commons.operators import TelegramBotSendMessage — nimic nu ne împiedică să ne facem proprii operatori, de care am profitat făcând un mic wrapper pentru trimiterea mesajelor către Unblocked. (Vom vorbi mai multe despre acest operator mai jos);
  • default_args={} — dag poate distribui aceleași argumente tuturor operatorilor săi;
  • to='{{ var.value.all_the_kings_men }}' - camp to al nostru nu va fi codificat, ci generat dinamic folosind Jinja și o variabilă cu o listă de e-mailuri, pe care le-am introdus cu atenție Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — starea de lansare a operatorului. În cazul nostru, scrisoarea va fi trimisă șefilor doar dacă toate dependențele au fost îndeplinite cu succes;
  • tg_bot_conn_id='tg_main' - argumente conn_id acceptați identificatorii conexiunilor pe care le creăm Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED — mesajele din Telegram vor zbura numai dacă există sarcini căzute;
  • task_concurrency=1 — interzicem lansarea simultană a mai multor instanțe ale unei sarcini. În caz contrar, vom obține mai multe lansări simultane VerticaOperator (privind la o masă);
  • report_update >> [email, tg] - toate VerticaOperator va fi de acord să trimită scrisori și mesaje, astfel:
    Apache Airflow: ETL mai ușor

    Dar din moment ce operatorii de notificare au condiții diferite de lansare, doar unul va funcționa. În Tree View totul pare puțin mai puțin clar:
    Apache Airflow: ETL mai ușor

Voi spune câteva cuvinte despre macro-uri si prietenii lor - variabile.

Macro-urile sunt substituenți Jinja care pot insera diverse informații utile în argumentele operatorului. De exemplu, așa:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} se va extinde la conținutul variabilei context execution_date în format YYYY-MM-DD: 2020-07-14. Cea mai bună parte este că variabilele de context sunt fixate într-o anumită instanță de activitate (un pătrat în vizualizarea arborescentă), iar când sunt repornite, substituenții se vor extinde la aceleași valori.

Valorile atribuite pot fi vizualizate folosind butonul Rendered pe fiecare instanță de activitate. Iată cum arată sarcina pentru trimiterea unei scrisori:

Apache Airflow: ETL mai ușor

Și așa pentru sarcina cu trimiterea unui mesaj:

Apache Airflow: ETL mai ușor

Lista completă a macrocomenzilor încorporate pentru cea mai recentă versiune disponibilă este disponibilă aici: Referință pentru macrocomenzi

Mai mult, cu ajutorul pluginurilor, ne putem declara propriile macrocomenzi, dar asta e cu totul altă poveste.

Pe lângă lucrurile predefinite, putem înlocui valorile variabilelor noastre (am folosit deja acest lucru mai sus în cod). Să creăm în Admin/Variables cateva bucati:

Apache Airflow: ETL mai ușor

Asta e, poți folosi:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Valoarea poate fi scalară sau poate conține și JSON. În cazul JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

utilizați doar calea către cheia dorită: {{ var.json.bot_config.bot.token }}.

Voi spune literalmente un cuvânt și voi arăta o captură de ecran despre conexiune. Totul este elementar aici: pe pagină Admin/Connections Ne creăm o conexiune, adăugăm datele de conectare/parolele noastre și parametri mai specifici acolo. Ca aceasta:

Apache Airflow: ETL mai ușor

Parolele pot fi criptate (mai atent decât în ​​opțiunea implicită) sau nu puteți specifica tipul de conexiune (cum am făcut pentru tg_main) - adevărul este că lista de tipuri este conectată la modelele Airflow și nu poate fi extinsă fără a intra în codul sursă (dacă dintr-o dată nu am căutat ceva pe Google, vă rog să mă corectați), dar nimic nu ne va împiedica să obținem credite pur și simplu prin Nume.

De asemenea, puteți face mai multe conexiuni cu același nume: în acest caz, metoda BaseHook.get_connection(), care ne aduce conexiuni după nume, va da Aleatoriu de la mai multe omonime (ar fi mai logic să facem Round Robin, dar vom lăsa asta în seama conștiinței dezvoltatorilor Airflow).

Variabilele și conexiunile sunt cu siguranță instrumente grozave, dar este important să nu pierdeți echilibrul dintre părțile fluxurilor pe care le stocați în cod și ce părți le oferiți Airflow pentru stocare. Pe de o parte, schimbarea rapidă a unei valori, de exemplu, cutia poștală, poate fi convenabilă prin interfața de utilizare. Pe de altă parte, aceasta este încă o întoarcere la clicul mouse-ului, de care am vrut (am) să scăpăm.

Lucrul cu conexiunile este una dintre sarcini cârlige. În general, cârligele Airflow sunt puncte pentru conectarea acestuia la servicii și biblioteci terțe. De exemplu, JiraHook ne va deschide un client pentru a interacționa cu Jira (puteți muta sarcini înainte și înapoi) și cu ajutorul SambaHook puteți împinge un fișier local în smb-punct.

Să analizăm operatorul personalizat

Și am fost aproape să vedem cum a fost făcut TelegramBotSendMessage

Cod commons/operators.py cu operatorul însuși:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Aici, ca orice altceva în Airflow, totul este foarte simplu:

  • Moștenit de la BaseOperator, care implementează destul de multe chestii specifice fluxului de aer (verificați-l după bunul plac)
  • Câmpuri anunțate template_fields, în care Jinja va căuta macrocomenzi de procesat.
  • Organizat argumentele potrivite pentru __init__(), a plasat valorile implicite acolo unde este necesar.
  • De asemenea, nu au uitat de inițializarea strămoșului.
  • Deschis cârligul corespunzător TelegramBotHook, a primit un obiect client de la acesta.
  • Metoda suprascrisă (redefinită). BaseOperator.execute(), pe care Airfow îl va zvâcni când va veni momentul lansării operatorului - în el implementăm acțiunea principală, fără a uita să ne autentificăm. (Apropo, ne conectăm direct la stdout и stderr — Fluxul de aer va intercepta totul, îl va înfășura frumos și îl va pune acolo unde trebuie.)

Să vedem ce avem înăuntru commons/hooks.py. Prima parte a fișierului, cu cârligul în sine:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Nici măcar nu știu ce se poate explica aici, voi remarca doar punctele importante:

  • Moștenim, gândiți-vă la argumente - în cele mai multe cazuri va exista unul: conn_id;
  • Depășirea metodelor standard: m-am limitat get_conn(), în care primesc parametrii de conexiune după nume și primesc doar secțiunea extra (acesta este un câmp pentru JSON), în care eu (conform propriilor instrucțiuni!) am pus jetonul bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Creez o instanță a noastră TelegramBot, oferindu-i un simbol specific.

Asta e tot. Puteți obține un client dintr-un cârlig folosind TelegramBotHook().clent sau TelegramBotHook().get_conn().

Și a doua parte a fișierului, în care fac un micro-wrapper pentru API-ul Telegram REST, ca să nu trag același lucru python-telegram-bot de dragul unei metode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Modul corect este să adunăm totul: TelegramBotSendMessage, TelegramBotHook, TelegramBot - într-un plugin, puneți-l într-un depozit public și dați-l la Open Source.

În timp ce studiam toate acestea, actualizările rapoartelor noastre au reușit să eșueze cu succes și să trimită un mesaj de eroare canalului meu. Mă duc să verific din nou ce e în neregulă...

Apache Airflow: ETL mai ușor
Ceva s-a stricat la noi! Nu asta așteptam? Exact!

Ai de gând să-l torni?

Simți că mi-a scăpat ceva? Se pare că a promis că va transfera date de pe SQL Server pe Vertica, apoi a luat-o și a lăsat subiectul, ticălosule!

Această crimă a fost intenționată, pur și simplu a trebuit să descifrez o terminologie pentru tine. Acum poți trece mai departe.

Planul nostru era acesta:

  1. Fă-i un dag
  2. Generați sarcini
  3. Uite ce frumos este totul
  4. Atribuiți numere de sesiune umplerilor
  5. Obțineți date de la SQL Server
  6. Pune date în Vertica
  7. Colectați statistici

Așa că, pentru ca toate acestea să meargă, am făcut o mică completare docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Acolo ridicăm:

  • Vertica ca gazdă dwh cu cele mai multe setări implicite,
  • trei instanțe de SQL Server,
  • umplem bazele de date din acesta din urmă cu unele date (sub nicio formă nu analizăm mssql_init.py!)

Lansăm totul folosind o comandă puțin mai complexă decât data trecută:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ceea ce a generat randomizatorul nostru miracol se poate face folosind elementul Data Profiling/Ad Hoc Query:

Apache Airflow: ETL mai ușor
Principalul lucru este să nu-l arăți analiștilor

Stați în detaliu asupra Sesiuni ETL Nu vreau, totul este banal: creăm o bază de date, un tabel în ea, împachetăm totul cu un manager de context și acum facem asta:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

A sosit timpul ia datele noastre de la o sută și jumătate de mese ale noastre. Să facem asta cu linii foarte simple:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Folosind un cârlig, primim de la Airflow pymssql-conectați
  2. Să inserăm o constrângere sub forma unei date în cerere - motorul de șablon o va arunca în funcție.
  3. Hrănirea cererii noastre pandascine ne va lua DataFrame - Ne va fi de folos în viitor.

Folosesc înlocuirea {dt} în locul unui parametru de cerere %s nu pentru că sunt un Pinocchio rău, ci pentru că pandas nu se poate descurca pymssql și o alunecă până la ultimul params: List, deși își dorește foarte mult tuple.
De asemenea, rețineți că dezvoltatorul pymssql a decis să nu-l mai susțină și este timpul să plecăm pyodbc.

Să vedem ce a introdus Airflow în argumentele funcțiilor noastre:

Apache Airflow: ETL mai ușor

Dacă nu există date, atunci nu are rost să continui. Dar este și ciudat să considerăm umplerea reușită. Dar aceasta nu este o greșeală. Ah-ah-ah, ce să faci?! Iată ce:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow vă va spune că de fapt nu există nicio eroare, dar omitem sarcina. Interfața nu va avea un pătrat verde sau roșu, ci va fi colorată în roz.

Să ne hrănim datele mai multe coloane:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Anume

  • Baza de date din care am preluat comenzi,
  • ID-ul sesiunii noastre de încărcare (va fi diferit pentru fiecare sarcină),
  • Hash de la sursă și identificatorul de comandă - astfel încât în ​​baza de date finală (unde totul este turnat într-un singur tabel) să avem un identificator de comandă unic.

Penultimul pas rămâne: turnați totul în Vertica. Și, în mod ciudat, una dintre cele mai spectaculoase și eficiente moduri de a face acest lucru este prin CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Facem un centru special de detenție StringIO.
  2. pandas le vom pune pe ale noastre în ea DataFrame în formă CSV-linii.
  3. Să deschidem o conexiune cu iubita noastră Vertica folosind un cârlig.
  4. Și acum cu ajutorul copy() Să trimitem datele noastre direct către Vertika!

Vom lua de la șofer câte linii au fost completate și îi vom spune managerului de sesiune că totul este OK:

session.loaded_rows = cursor.rowcount
session.successful = True

Asta e tot.

În producție, creăm manual placa țintă. Aici mi-am permis o mașină mică:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

eu folosesc VerticaOperator() Creez o schemă de bază de date și un tabel (dacă nu există deja, desigur). Principalul lucru este să aranjați corect dependențele:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Rezumând

„Ei bine”, a spus șoarecele, „nu este adevărat acum?”
Ești convins că sunt cel mai groaznic animal din pădure?

Julia Donaldson, „The Gruffalo”

Cred că dacă eu și colegii mei am avea o competiție: cine ar fi cel mai rapid să creeze și să lanseze un proces ETL de la zero: ei cu SSIS și mouse-ul lor și eu cu Airflow... Și atunci am compara și ușurința de întreținere.. Uau, cred că vei fi de acord că le voi ocoli pe toate fronturile!

Pe o notă ceva mai serioasă, Apache Airflow - prin descrierea proceselor sub formă de cod de program - mi-a făcut treaba mai mult mai comod si mai placut.

Extensibilitatea sa nelimitată: atât în ​​ceea ce privește pluginurile, cât și predispoziția la scalabilitate - vă oferă posibilitatea de a utiliza Airflow în aproape orice domeniu: chiar și în întregul ciclu de colectare, pregătire și procesare a datelor, chiar și în lansarea rachetelor (pe Marte, desigur) .

Parte finală, de referință și informativă

Grebla pe care am colectat-o ​​pentru tine

  • start_date. Da, acesta este deja un meme local. Prin argumentul principal al lui Dag start_date toată lumea trece. Pe scurt, dacă specificați în start_date data curentă și pe schedule_interval - într-o zi, apoi DAG se va lansa nu mai devreme de mâine.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Și fără probleme.

    O altă eroare de execuție este asociată cu aceasta: Task is missing the start_date parameter, ceea ce indică cel mai adesea că ați uitat să legați dag de operator.

  • Totul pe o singură mașină. Da, și baze de date (Airflow în sine și acoperirea noastră), și un server web, și un planificator și lucrători. Și chiar a funcționat. Dar, de-a lungul timpului, numărul de sarcini pentru servicii a crescut, iar când PostgreSQL a început să răspundă la index în 20 s în loc de 5 ms, l-am luat și l-am dus.
  • LocalExecutor. Da, încă stăm pe el și am ajuns deja la marginea prăpastiei. LocalExecutor a fost suficient pentru noi până acum, dar acum este timpul să ne extindem cu cel puțin un lucrător și va trebui să muncim mai mult pentru a trece la CeleryExecutor. Și, din moment ce puteți lucra cu el pe o singură mașină, nimic nu vă împiedică să utilizați țelina chiar și pe un server, care „în mod firesc nu va intra niciodată în producție, sincer!”
  • Neutilizare instrumente încorporate:
    • Conexiuni pentru a stoca acreditările de serviciu,
    • SLA Ras pentru a răspunde sarcinilor care nu au fost finalizate la timp,
    • xcom pentru a face schimb de metadate (am spus metadate!) între sarcinile lui Dag.
  • Abuzul de corespondență. Pai ce pot sa spun? Au fost create alerte pentru toate repetările sarcinilor abandonate. Acum, în serviciul meu Gmail, există > 90 de scrisori de la Airflow, iar fața de e-mail web refuză să ia și să ștergă mai mult de 100 de bucăți simultan.

Mai multe capcane: Apache Airflow Pitfails

Mijloace de automatizare și mai mare

Pentru ca noi să lucrăm și mai mult cu capul și nu cu mâinile, Airflow ne-a pregătit asta:

  • API-ul REST — are încă statutul experimental, ceea ce nu îl împiedică să funcționeze. Cu ajutorul acestuia, nu numai că puteți primi informații despre dag-uri și sarcini, ci și să opriți/porniți un dag, să creați un DAG Run sau un pool.
  • CLI — multe instrumente sunt disponibile prin linia de comandă care nu numai că sunt incomod de utilizat prin WebUI, dar sunt complet absente. De exemplu:
    • backfill necesare pentru a reporni instanțe de activitate.
      De exemplu, analiștii au venit și au spus: „Și datele tale, tovarășe, sunt o prostie de la 1 ianuarie până la 13 ianuarie! Repară-l, repară-l, repară-l, repară-l! Și ești așa un hoba:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Întreținere de bază: initdb, resetdb, upgradedb, checkdb.
    • run, care vă permite să lansați o instanță de activitate și chiar să uitați de toate dependențele. În plus, îl puteți rula prin intermediul LocalExecutor, chiar dacă aveți un ciorchine de țelină.
    • Face cam același lucru test, dar nu scrie nimic în baza de date.
    • connections vă permite să creați conexiuni în vrac din shell.
  • API Python - o metodă de interacțiune mai degrabă hardcore, care este destinată pluginurilor, și nu a le mânui cu mâinile. Dar cine ne va opri să mergem la /home/airflow/dags, alerga ipython și începi să te încurci? Puteți, de exemplu, să exportați toate conexiunile folosind acest cod:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Conectați-vă la baza de date cu metadate Airflow. Nu recomand să scrieți la el, dar puteți obține stări de sarcini pentru diferite valori specifice mult mai rapid și mai ușor decât prin oricare dintre API-uri.

    Să spunem că nu toate sarcinile noastre sunt idempotente, dar uneori pot eșua și acest lucru este normal. Dar câteva moloz sunt deja suspecte și trebuie să le verificăm.

    Atenție, SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referințe

Ei bine, desigur, primele zece link-uri de la Google sunt conținutul folderului Airflow din marcajele mele.

Și link-uri implicate în articol:

Sursa: www.habr.com