Apache Airflow: ETL einfacher machen

Hallo, ich bin Dmitry Logvinenko – Dateningenieur der Analytics-Abteilung der Vezet-Unternehmensgruppe.

Ich erzähle Ihnen von einem wunderbaren Tool zur Entwicklung von ETL-Prozessen – Apache Airflow. Aber Airflow ist so vielseitig und vielfältig, dass Sie es sich auch dann genauer ansehen sollten, wenn Sie nicht an Datenflüssen beteiligt sind, sondern regelmäßig Prozesse starten und deren Ausführung überwachen müssen.

Und ja, ich werde nicht nur erzählen, sondern auch zeigen: Das Programm hat jede Menge Code, Screenshots und Empfehlungen.

Apache Airflow: ETL einfacher machen
Was Sie normalerweise sehen, wenn Sie das Wort Airflow / Wikimedia Commons googeln

Inhaltsverzeichnis

Einführung

Apache Airflow ist genau wie Django:

  • geschrieben in Python
  • Es gibt ein tolles Admin-Panel,
  • unbegrenzt erweiterbar

- nur besser, und es wurde für ganz andere Zwecke gemacht, nämlich (wie es vor der Kata geschrieben steht):

  • Ausführen und Überwachen von Aufgaben auf einer unbegrenzten Anzahl von Maschinen (so viele Celery / Kubernetes und Ihr Gewissen es zulassen)
  • mit dynamischer Workflow-Generierung aus sehr einfach zu schreibendem und verständlichem Python-Code
  • und die Möglichkeit, beliebige Datenbanken und APIs miteinander zu verbinden, indem sowohl vorgefertigte Komponenten als auch selbst erstellte Plugins verwendet werden (was äußerst einfach ist).

Wir verwenden Apache Airflow wie folgt:

  • Wir sammeln Daten aus verschiedenen Quellen (viele SQL Server- und PostgreSQL-Instanzen, verschiedene APIs mit Anwendungsmetriken, sogar 1C) in DWH und ODS (wir haben Vertica und Clickhouse).
  • wie fortgeschritten cron, der die Datenkonsolidierungsprozesse auf dem ODS startet und auch deren Wartung überwacht.

Bis vor Kurzem wurde unser Bedarf durch einen kleinen Server mit 32 Kernen und 50 GB RAM gedeckt. In Airflow funktioniert das:

  • mehr 200 dag (eigentlich Workflows, in die wir Aufgaben gestopft haben),
  • in jedem im Durchschnitt 70 Aufgaben,
  • diese Güte beginnt (auch im Durchschnitt) einmal pro Stunde.

Und darüber, wie wir expandiert haben, werde ich unten schreiben, aber jetzt definieren wir das Überproblem, das wir lösen werden:

Es gibt drei Quell-SQL-Server mit jeweils 50 Datenbanken – Instanzen eines Projekts bzw. sie haben die gleiche Struktur (fast überall, mua-ha-ha), was bedeutet, dass jeder eine Orders-Tabelle hat (zum Glück eine Tabelle damit). Name kann in jedes Unternehmen übernommen werden). Wir nehmen die Daten, indem wir Servicefelder hinzufügen (Quellserver, Quelldatenbank, ETL-Aufgaben-ID) und werfen sie naiv beispielsweise in Vertica ein.

Lassen Sie uns gehen!

Der Hauptteil, praktisch (und ein wenig theoretisch)

Warum tun wir (und Sie)

Als die Bäume groß waren und ich einfach SQL-schik In einem russischen Einzelhandelsgeschäft haben wir ETL-Prozesse, auch Datenflüsse genannt, mit zwei uns zur Verfügung stehenden Tools betrogen:

  • Informatica Power Center - ein extrem weitläufiges System, äußerst produktiv, mit eigener Hardware, eigener Versionierung. Ich habe, Gott bewahre, 1 % seiner Fähigkeiten genutzt. Warum? Zunächst einmal hat uns diese Schnittstelle, irgendwo aus den 380er Jahren, mental unter Druck gesetzt. Zweitens ist dieses Gerät für äußerst ausgefallene Prozesse, die rasante Wiederverwendung von Komponenten und andere sehr wichtige Unternehmenstricks konzipiert. Über die Tatsache, dass es kostet, wie der Flügel des Airbus AXNUMX / Jahr, werden wir nichts sagen.

    Vorsicht, ein Screenshot kann Menschen unter 30 ein wenig verletzen

    Apache Airflow: ETL einfacher machen

  • SQL Server-Integrationsserver - Wir haben diesen Kameraden in unseren projektinternen Abläufen verwendet. Nun, in der Tat: Wir verwenden bereits SQL Server und es wäre irgendwie unvernünftig, seine ETL-Tools nicht zu verwenden. Alles darin ist gut: Sowohl die Benutzeroberfläche als auch die Fortschrittsberichte sind wunderschön ... Aber das ist nicht der Grund, warum wir Softwareprodukte lieben, oh, nicht aus diesem Grund. Versionieren Sie es dtsx (das ist XML mit beim Speichern gemischten Knoten) können wir, aber wozu? Wie wäre es mit der Erstellung eines Aufgabenpakets, das Hunderte von Tabellen von einem Server auf einen anderen zieht? Ja, was für hundert, Ihr Zeigefinger wird aus zwanzig Teilen abfallen, wenn Sie mit der Maustaste klicken. Aber es sieht auf jeden Fall modischer aus:

    Apache Airflow: ETL einfacher machen

Wir haben auf jeden Fall nach Auswegen gesucht. Fall sogar fast kam zu einem selbst geschriebenen SSIS-Paketgenerator ...

…und dann hat mich ein neuer Job gefunden. Und Apache Airflow hat mich dabei überholt.

Als ich herausfand, dass ETL-Prozessbeschreibungen einfacher Python-Code sind, tanzte ich einfach nicht vor Freude. Auf diese Weise wurden Datenströme versioniert und unterschieden, und das Eingießen von Tabellen mit einer einzigen Struktur aus Hunderten von Datenbanken in ein Ziel wurde zu einer Angelegenheit von Python-Code auf eineinhalb oder zwei 13-Zoll-Bildschirmen.

Zusammenbau des Clusters

Lassen Sie uns keinen kompletten Kindergarten arrangieren und hier nicht über völlig offensichtliche Dinge sprechen, wie die Installation von Airflow, der von Ihnen gewählten Datenbank, Celery und anderen in den Docks beschriebenen Fällen.

Damit wir sofort mit den Experimenten beginnen können, habe ich skizziert docker-compose.yml indem:

  • Lasst uns tatsächlich erhöhen Airflow: Planer, Webserver. Flower wird sich dort auch drehen, um Celery-Aufgaben zu überwachen (da es bereits hineingeschoben wurde). apache/airflow:1.10.10-python3.7, aber es macht uns nichts aus)
  • PostgreSQL, in die Airflow seine Serviceinformationen (Schedulerdaten, Ausführungsstatistiken usw.) schreibt und Celery abgeschlossene Aufgaben markiert;
  • Redis, das als Aufgabenvermittler für Celery fungieren wird;
  • Selleriearbeiter, die mit der direkten Ausführung von Aufgaben beschäftigt sein wird.
  • Zum Ordner ./dags Wir werden unsere Dateien mit der Beschreibung von dags hinzufügen. Sie werden im Handumdrehen eingesammelt, sodass Sie nicht nach jedem Niesen mit dem gesamten Stapel jonglieren müssen.

An einigen Stellen wird der Code in den Beispielen nicht vollständig angezeigt (um den Text nicht zu überladen), aber an einigen Stellen wird er dabei geändert. Vollständige Beispiele für funktionierenden Code finden Sie im Repository https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • Bei der Zusammenstellung der Komposition habe ich mich weitgehend auf das bekannte Bild verlassen Puckel/Docker-Luftstrom - Schauen Sie sich das unbedingt an. Vielleicht brauchen Sie nichts anderes in Ihrem Leben.
  • Alle Airflow-Einstellungen sind nicht nur über verfügbar airflow.cfg, sondern auch durch Umgebungsvariablen (danke an die Entwickler), die ich böswillig ausgenutzt habe.
  • Natürlich ist es noch nicht produktionsbereit: Ich habe bewusst keine Heartbeats auf Container gesetzt, ich habe mich nicht um die Sicherheit gekümmert. Aber ich habe das Minimum getan, das für unsere Experimentatoren geeignet war.
  • Beachten Sie, dass:
    • Der dag-Ordner muss sowohl für den Planer als auch für die Worker zugänglich sein.
    • Das Gleiche gilt für alle Bibliotheken von Drittanbietern – sie müssen alle auf Maschinen mit einem Scheduler und Workern installiert werden.

Nun, jetzt ist es ganz einfach:

$ docker-compose up --scale worker=3

Nachdem alles aufgegangen ist, können Sie sich die Weboberflächen ansehen:

Grundlegende Konzepte

Wenn Sie in all diesen „Tagen“ nichts verstanden haben, finden Sie hier ein kurzes Wörterbuch:

  • Scheduler - Der wichtigste Onkel in Airflow, der kontrolliert, dass Roboter und nicht Menschen hart arbeiten: Er überwacht den Zeitplan, aktualisiert Tagebücher und startet Aufgaben.

    Im Allgemeinen hatte er in älteren Versionen Probleme mit dem Speicher (nein, keine Amnesie, sondern Lecks) und der Legacy-Parameter blieb sogar in den Konfigurationen bestehen run_duration — sein Neustartintervall. Aber jetzt ist alles in Ordnung.

  • TAG (auch bekannt als „dag“) – „gerichteter azyklischer Graph“, aber eine solche Definition wird nur wenigen Menschen sagen, aber tatsächlich handelt es sich um einen Container für miteinander interagierende Aufgaben (siehe unten) oder ein Analogon von Package in SSIS und Workflow in Informatica .

    Zusätzlich zu den Dags kann es noch Subdags geben, aber wir werden höchstwahrscheinlich nicht auf sie zugreifen.

  • DAG-Lauf - initialisierter Tag, dem ein eigener zugewiesen ist execution_date. Dagrans desselben Tages können parallel arbeiten (natürlich nur, wenn Sie Ihre Aufgaben idempotent gemacht haben).
  • Operator sind Codeteile, die für die Ausführung einer bestimmten Aktion verantwortlich sind. Es gibt drei Arten von Operatoren:
    • Aktionwie unser Favorit PythonOperator, das jeden (gültigen) Python-Code ausführen kann;
    • privaten Transfer, die Daten von Ort zu Ort transportieren, sagen wir, MsSqlToHiveTransfer;
    • Sensor Andererseits ermöglicht es Ihnen, zu reagieren oder die weitere Ausführung des Tages zu verlangsamen, bis ein Ereignis eintritt. HttpSensor kann den angegebenen Endpunkt abrufen und die Übertragung starten, wenn die gewünschte Antwort wartet GoogleCloudStorageToS3Operator. Ein neugieriger Geist wird fragen: „Warum? Schließlich kann man Wiederholungen direkt im Operator durchführen!“ Und dann, um den Aufgabenpool nicht mit suspendierten Operatoren zu verstopfen. Der Sensor startet, prüft und stirbt vor dem nächsten Versuch.
  • Aufgabe - Deklarierte Operatoren, unabhängig vom Typ, die an den Tag angehängt sind, werden in den Rang einer Aufgabe befördert.
  • Aufgabeninstanz - als der Generalplaner entschied, dass es an der Zeit sei, Aufgaben auf Leistungsträger-Arbeiter in die Schlacht zu schicken (direkt vor Ort, wenn wir sie verwenden). LocalExecutor oder an einen entfernten Knoten im Fall von CeleryExecutor), weist es ihnen einen Kontext zu (d. h. eine Reihe von Variablen – Ausführungsparameter), erweitert Befehls- oder Abfragevorlagen und bündelt sie.

Wir generieren Aufgaben

Lassen Sie uns zunächst das allgemeine Schema unseres Dougs skizzieren, und dann werden wir mehr und mehr in die Details eintauchen, da wir einige nicht triviale Lösungen anwenden.

In seiner einfachsten Form sieht ein solcher Tag also so aus:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Lassen Sie es uns herausfinden:

  • Zuerst importieren wir die notwendigen Bibliotheken und etwas anderes;
  • sql_server_ds - Das List[namedtuple[str, str]] mit den Namen der Verbindungen von Airflow Connections und den Datenbanken, aus denen wir unsere Platte entnehmen werden;
  • dag - die Ankündigung unseres Tages, die unbedingt dabei sein muss globals(), sonst findet Airflow es nicht. Doug muss auch sagen:
    • Wie heißt er orders - dieser Name erscheint dann im Webinterface,
    • dass er am XNUMX. Juli ab Mitternacht arbeiten wird,
    • und es sollte ungefähr alle 6 Stunden laufen (für harte Jungs hier statt). timedelta() zulässig cron-Linie 0 0 0/6 ? * * *, für die weniger Coolen - ein Ausdruck wie @daily);
  • workflow() wird die Hauptaufgabe erledigen, aber nicht jetzt. Im Moment speichern wir einfach unseren Kontext im Protokoll.
  • Und nun die einfache Magie des Erstellens von Aufgaben:
    • wir gehen unsere Quellen durch;
    • initialisieren PythonOperator, was unseren Dummy ausführen wird workflow(). Vergessen Sie nicht, einen eindeutigen Namen (innerhalb des Tags) für die Aufgabe anzugeben und den Tag selbst zu verknüpfen. Flagge provide_context wiederum fügt der Funktion zusätzliche Argumente hinzu, die wir sorgfältig sammeln werden **context.

Im Moment ist das alles. Was wir bekommen haben:

  • neuer Tag im Webinterface,
  • eineinhalbhundert Aufgaben, die parallel ausgeführt werden (sofern die Airflow-, Celery-Einstellungen und die Serverkapazität dies zulassen).

Nun, ich habe es fast geschafft.

Apache Airflow: ETL einfacher machen
Wer installiert die Abhängigkeiten?

Um das Ganze zu vereinfachen, habe ich es vermasselt docker-compose.yml wird bearbeitet requirements.txt auf allen Knoten.

Jetzt ist es weg:

Apache Airflow: ETL einfacher machen

Graue Quadrate sind vom Planer verarbeitete Aufgabeninstanzen.

Wir warten etwas, die Aufgaben werden von den Arbeitern übernommen:

Apache Airflow: ETL einfacher machen

Die Grünen haben ihre Arbeit natürlich erfolgreich abgeschlossen. Die Roten sind nicht sehr erfolgreich.

Auf unserem Produkt gibt es übrigens keinen Ordner ./dags, es gibt keine Synchronisation zwischen Maschinen - alle Tage liegen drin git auf unserem Gitlab, und Gitlab CI verteilt beim Zusammenführen Updates an Maschinen master.

Ein wenig über Blume

Während die Arbeiter unsere Schnuller zertrümmern, erinnern wir uns an ein anderes Werkzeug, das uns etwas zeigen kann – die Blume.

Die allererste Seite mit zusammenfassenden Informationen zu Worker-Knoten:

Apache Airflow: ETL einfacher machen

Die intensivste Seite mit Aufgaben, die funktioniert haben:

Apache Airflow: ETL einfacher machen

Die langweiligste Seite mit dem Status unseres Brokers:

Apache Airflow: ETL einfacher machen

Die hellste Seite enthält Aufgabenstatusdiagramme und deren Ausführungszeit:

Apache Airflow: ETL einfacher machen

Wir laden das Unterladene

Nachdem alle Aufgaben erledigt sind, können Sie die Verwundeten abtransportieren.

Apache Airflow: ETL einfacher machen

Und es gab viele Verwundete – aus dem einen oder anderen Grund. Bei korrekter Nutzung von Airflow deuten genau diese Quadrate darauf hin, dass die Daten definitiv nicht angekommen sind.

Sie müssen das Protokoll beobachten und die ausgefallenen Aufgabeninstanzen neu starten.

Durch Klicken auf ein beliebiges Quadrat sehen wir die uns zur Verfügung stehenden Aktionen:

Apache Airflow: ETL einfacher machen

Du kannst die Gefallenen nehmen und beseitigen. Das heißt, wir vergessen, dass dort etwas fehlgeschlagen ist, und die gleiche Instanzaufgabe wird an den Scheduler weitergeleitet.

Apache Airflow: ETL einfacher machen

Es ist klar, dass es nicht sehr menschlich ist, dies mit der Maus mit all den roten Quadraten zu tun – das ist nicht das, was wir von Airflow erwarten. Natürlich haben wir Massenvernichtungswaffen: Browse/Task Instances

Apache Airflow: ETL einfacher machen

Lassen Sie uns alles auf einmal auswählen und auf Null zurücksetzen. Klicken Sie auf das richtige Element:

Apache Airflow: ETL einfacher machen

Nach der Reinigung sehen unsere Taxis so aus (sie warten bereits darauf, dass der Disponent sie einplant):

Apache Airflow: ETL einfacher machen

Verbindungen, Hooks und andere Variablen

Es ist Zeit, einen Blick auf die nächste DAG zu werfen. update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Hat jeder jemals ein Berichtsupdate durchgeführt? Das ist noch einmal sie: Es gibt eine Liste von Quellen, aus denen man die Daten beziehen kann; es gibt eine Liste, wo man es ablegen kann; Vergessen Sie nicht zu hupen, wenn etwas passiert ist oder kaputt gegangen ist (naja, hier geht es nicht um uns, nein).

Gehen wir die Datei noch einmal durch und schauen uns die neuen, obskuren Dinge an:

  • from commons.operators import TelegramBotSendMessage - Nichts hindert uns daran, unsere eigenen Operatoren zu erstellen, was wir uns zunutze machten, indem wir einen kleinen Wrapper für den Versand von Nachrichten an Unblocked erstellten. (Wir werden weiter unten mehr über diesen Operator sprechen);
  • default_args={} - dag kann die gleichen Argumente an alle seine Operatoren verteilen;
  • to='{{ var.value.all_the_kings_men }}' - Feld to Wir werden nicht fest codiert, sondern dynamisch mithilfe von Jinja und einer Variablen mit einer Liste von E-Mails generiert, die ich sorgfältig eingegeben habe Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — Bedingung zum Starten des Antriebs. In unserem Fall fliegt der Brief erst dann an die Chefs, wenn alle Abhängigkeiten geklärt sind erfolgreich;
  • tg_bot_conn_id='tg_main' - Argumente conn_id Akzeptieren Sie Verbindungs-IDs, die wir erstellen Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Nachrichten im Telegramm fliegen nur dann weg, wenn Aufgaben gefallen sind;
  • task_concurrency=1 - Wir verbieten den gleichzeitigen Start mehrerer Aufgabeninstanzen einer Aufgabe. Andernfalls werden wir mehrere gleichzeitig starten VerticaOperator (schaut auf einen Tisch);
  • report_update >> [email, tg] - Alle VerticaOperator vereinen sich beim Versenden von Briefen und Nachrichten wie folgt:
    Apache Airflow: ETL einfacher machen

    Da Notifier-Betreiber jedoch unterschiedliche Startbedingungen haben, funktioniert nur eine. In der Baumansicht sieht alles etwas weniger visuell aus:
    Apache Airflow: ETL einfacher machen

Ich werde ein paar Worte dazu sagen Makros und ihre Freunde - Variablen.

Makros sind Jinja-Platzhalter, die verschiedene nützliche Informationen in Operatorargumente ersetzen können. Zum Beispiel so:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} wird auf den Inhalt der Kontextvariablen erweitert execution_date im Format YYYY-MM-DD: 2020-07-14. Das Beste daran ist, dass Kontextvariablen an eine bestimmte Aufgabeninstanz (ein Quadrat in der Baumansicht) gebunden sind und die Platzhalter beim Neustart auf dieselben Werte erweitert werden.

Die zugewiesenen Werte können über die Schaltfläche „Rendered“ auf jeder Aufgabeninstanz angezeigt werden. So funktioniert die Aufgabe beim Briefversand:

Apache Airflow: ETL einfacher machen

Und so bei der Aufgabe mit dem Versenden einer Nachricht:

Apache Airflow: ETL einfacher machen

Eine vollständige Liste der integrierten Makros für die neueste verfügbare Version finden Sie hier: Makros-Referenz

Darüber hinaus können wir mit Hilfe von Plugins unsere eigenen Makros deklarieren, aber das ist eine andere Geschichte.

Zusätzlich zu den vordefinierten Dingen können wir die Werte unserer Variablen ersetzen (ich habe dies bereits im Code oben verwendet). Lasst uns kreieren Admin/Variables ein paar Dinge:

Apache Airflow: ETL einfacher machen

Alles, was Sie verwenden können:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Der Wert kann ein Skalar oder auch JSON sein. Im Fall von JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

Verwenden Sie einfach den Pfad zum gewünschten Schlüssel: {{ var.json.bot_config.bot.token }}.

Ich werde buchstäblich ein Wort sagen und einen Screenshot darüber zeigen Verbindungen. Hier ist alles elementar: auf der Seite Admin/Connections Wir erstellen eine Verbindung, fügen dort unsere Logins/Passwörter und spezifischere Parameter hinzu. So:

Apache Airflow: ETL einfacher machen

Passwörter können verschlüsselt werden (gründlicher als die Standardeinstellung) oder Sie können den Verbindungstyp weglassen (wie ich es für getan habe). tg_main) - Tatsache ist, dass die Liste der Typen in Airflow-Modellen fest verankert ist und nicht erweitert werden kann, ohne in die Quellcodes einzusteigen (falls ich plötzlich etwas nicht mehr gegoogelt habe, korrigieren Sie mich bitte), aber nichts wird uns davon abhalten, Credits einfach so zu bekommen Name.

Sie können auch mehrere Verbindungen mit demselben Namen herstellen: in diesem Fall die Methode BaseHook.get_connection(), das uns Verbindungen beim Namen verschafft, wird geben zufällig von mehreren Namensvettern (es wäre logischer, Round Robin zu machen, aber belassen wir es beim Gewissen der Airflow-Entwickler).

Variablen und Verbindungen sind sicherlich coole Tools, aber es ist wichtig, das Gleichgewicht nicht zu verlieren: welche Teile Ihrer Flows Sie im Code selbst speichern und welche Teile Sie Airflow zur Speicherung übergeben. Einerseits kann es praktisch sein, den Wert, beispielsweise ein Postfach, schnell über die Benutzeroberfläche zu ändern. Andererseits ist dies immer noch eine Rückkehr zum Mausklick, von dem wir (ich) loskommen wollten.

Zu den Aufgaben gehört die Arbeit mit Zusammenhängen Haken. Im Allgemeinen sind Airflow-Hooks Punkte für die Verbindung mit Diensten und Bibliotheken von Drittanbietern. Z.B, JiraHook öffnet einen Client für die Interaktion mit Jira (Sie können Aufgaben hin und her verschieben) und mithilfe von SambaHook Sie können eine lokale Datei dorthin verschieben smb-Punkt.

Analysieren des benutzerdefinierten Operators

Und wir konnten uns genau ansehen, wie es hergestellt wird TelegramBotSendMessage

Code commons/operators.py mit dem eigentlichen Betreiber:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hier ist, wie alles andere in Airflow, alles ganz einfach:

  • Geerbt von BaseOperator, das einige Airflow-spezifische Dinge implementiert (schauen Sie sich nach Belieben um)
  • Deklarierte Felder template_fields, in dem Jinja nach zu verarbeitenden Makros sucht.
  • Die richtigen Argumente dafür zusammengestellt __init__(), legen Sie bei Bedarf die Standardeinstellungen fest.
  • Wir haben auch die Initialisierung des Vorfahren nicht vergessen.
  • Den entsprechenden Haken geöffnet TelegramBotHookhat davon ein Client-Objekt erhalten.
  • Überschriebene (neu definierte) Methode BaseOperator.execute(), das Airfow zucken wird, wenn es an der Zeit ist, den Operator zu starten - darin werden wir die Hauptaktion umsetzen und vergessen, uns anzumelden. (Wir loggen uns übrigens direkt ein stdout и stderr - Der Luftstrom fängt alles auf, umhüllt es schön und zersetzt es, wo es nötig ist.)

Mal sehen, was wir haben commons/hooks.py. Der erste Teil der Datei mit dem Hook selbst:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ich weiß gar nicht, was ich hier erklären soll, ich notiere nur die wichtigen Punkte:

  • Wir erben, denken Sie über die Argumente nach – in den meisten Fällen wird es eines sein: conn_id;
  • Standardmethoden außer Kraft setzen: Ich habe mich eingeschränkt get_conn(), in dem ich die Verbindungsparameter nach Namen erhalte und nur den Abschnitt erhalte extra (das ist ein JSON-Feld), in das ich (nach meinen eigenen Anweisungen!) das Telegram-Bot-Token eingefügt habe: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ich erstelle eine Instanz von uns TelegramBotund ihm ein bestimmtes Token geben.

Das ist alles. Sie können einen Client von einem Hook abrufen, indem Sie verwenden TelegramBotHook().clent oder TelegramBotHook().get_conn().

Und der zweite Teil der Datei, in dem ich einen Microwrapper für die Telegram-REST-API erstelle, um nicht dasselbe zu ziehen python-telegram-bot für eine Methode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Der richtige Weg ist, alles zusammenzuzählen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - Im Plugin ein öffentliches Repository ablegen und an Open Source übergeben.

Während wir das alles studierten, schlugen unsere Berichtsaktualisierungen erfolgreich fehl und schickten mir eine Fehlermeldung im Kanal. Ich werde nachsehen, ob es falsch ist ...

Apache Airflow: ETL einfacher machen
In unserem Dogen ist etwas kaputt gegangen! Ist das nicht das, was wir erwartet haben? Exakt!

Wirst du einschenken?

Haben Sie das Gefühl, dass ich etwas verpasst habe? Es scheint, dass er versprochen hat, Daten von SQL Server nach Vertica zu übertragen, und dann hat er es angenommen und ist vom Thema abgekommen, der Schurke!

Diese Gräueltat war Absicht, ich musste einfach ein paar Begriffe für Sie entschlüsseln. Jetzt können Sie weitermachen.

Unser Plan war folgender:

  1. Mach's gut
  2. Aufgaben generieren
  3. Sehen Sie, wie schön alles ist
  4. Weisen Sie Füllungen Sitzungsnummern zu
  5. Holen Sie sich Daten von SQL Server
  6. Geben Sie Daten in Vertica ein
  7. Statistiken sammeln

Um das alles zum Laufen zu bringen, habe ich eine kleine Ergänzung zu unserem vorgenommen docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Da erheben wir:

  • Vertica als Gastgeber dwh mit den meisten Standardeinstellungen,
  • drei Instanzen von SQL Server,
  • Wir füllen die Datenbanken in letzterem mit einigen Daten (auf keinen Fall schauen). mssql_init.py!)

Wir starten alles Gute mit Hilfe eines etwas komplizierteren Befehls als beim letzten Mal:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Was unser Wunder-Randomizer generiert hat, können Sie als Gegenstand verwenden Data Profiling/Ad Hoc Query:

Apache Airflow: ETL einfacher machen
Die Hauptsache ist, es den Analysten nicht zu zeigen

näher ausführen ETL-Sitzungen Das werde ich nicht tun, da ist alles trivial: Wir erstellen eine Basis, es gibt ein Zeichen darin, wir verpacken alles mit einem Kontextmanager und jetzt machen wir Folgendes:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Die Zeit ist gekommen sammeln unsere Daten von unseren eineinhalbhundert Tischen. Machen wir das mit Hilfe sehr unprätentiöser Zeilen:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Mit Hilfe eines Hakens bekommen wir von Airflow pymssql-verbinden
  2. Ersetzen wir eine Einschränkung in Form eines Datums in der Anfrage – diese wird von der Template-Engine in die Funktion geworfen.
  3. Füttere unsere Bitte pandasWer wird uns kriegen? DataFrame - Es wird uns in Zukunft nützlich sein.

Ich verwende Substitution {dt} anstelle eines Anforderungsparameters %s Nicht weil ich ein böser Pinocchio bin, sondern weil pandas kann es nicht bewältigen pymssql und rutscht den letzten aus params: Listobwohl er es wirklich will tuple.
Beachten Sie auch, dass der Entwickler pymssql beschlossen, ihn nicht mehr zu unterstützen, und es ist Zeit, auszuziehen pyodbc.

Schauen wir uns an, womit Airflow die Argumente unserer Funktionen vollgestopft hat:

Apache Airflow: ETL einfacher machen

Wenn keine Daten vorhanden sind, macht es keinen Sinn, fortzufahren. Aber es ist auch seltsam, die Füllung als gelungen zu betrachten. Aber das ist kein Fehler. A-ah-ah, was tun?! Und hier ist was:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException wird Airflow mitteilen, dass keine Fehler vorliegen, aber wir überspringen die Aufgabe. Die Schnittstelle wird kein grünes oder rotes Quadrat haben, sondern rosa.

Werfen wir unsere Daten weg mehrere Spalten:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Diese sind:

  • Die Datenbank, aus der wir die Bestellungen entnommen haben,
  • ID unserer Überschwemmungssitzung (es wird anders sein für jede Aufgabe),
  • Ein Hash aus der Quelle und der Bestell-ID – so dass wir in der endgültigen Datenbank (in der alles in einer Tabelle zusammengefasst wird) eine eindeutige Bestell-ID haben.

Bleibt noch der vorletzte Schritt: Alles in Vertica füllen. Und seltsamerweise ist CSV eine der spektakulärsten und effizientesten Möglichkeiten, dies zu tun!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Wir stellen einen besonderen Empfänger her StringIO.
  2. pandas Wir werden uns freundlicherweise zur Verfügung stellen DataFrame als CSV-Linien.
  3. Lassen Sie uns mit einem Haken eine Verbindung zu unserem Lieblings-Vertica herstellen.
  4. Und jetzt mit der Hilfe copy() Senden Sie unsere Daten direkt an Vertika!

Wir entnehmen dem Fahrer, wie viele Zeilen gefüllt wurden, und teilen dem Sitzungsmanager mit, dass alles in Ordnung ist:

session.loaded_rows = cursor.rowcount
session.successful = True

Das ist alles.

Beim Verkauf erstellen wir die Zieltafel manuell. Hier habe ich mir eine kleine Maschine gegönnt:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ich benutze VerticaOperator() Ich erstelle ein Datenbankschema und eine Tabelle (sofern diese noch nicht vorhanden sind, natürlich). Die Hauptsache ist, die Abhängigkeiten richtig anzuordnen:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Zusammenfassend

- Na ja, - sagte die kleine Maus, - nicht wahr?
Sind Sie davon überzeugt, dass ich das schrecklichste Tier im Wald bin?

Julia Donaldson, Der Grüffelo

Ich denke, wenn meine Kollegen und ich einen Wettbewerb hätten: Wer würde schnell einen ETL-Prozess von Grund auf erstellen und starten: Sie mit ihrem SSIS und einer Maus und ich mit Airflow ... Und dann würden wir auch die Wartungsfreundlichkeit vergleichen ... Wow, ich denke, Sie werden mir zustimmen, dass ich sie an allen Fronten schlagen werde!

Wenn es etwas ernster ist, dann hat Apache Airflow – indem es Prozesse in Form von Programmcode beschreibt – meine Aufgabe erfüllt viel komfortabler und angenehmer.

Seine unbegrenzte Erweiterbarkeit, sowohl in Bezug auf Plug-Ins als auch auf die Skalierbarkeit, gibt Ihnen die Möglichkeit, Airflow in nahezu jedem Bereich einzusetzen: sogar im gesamten Zyklus der Datenerfassung, -aufbereitung und -verarbeitung, sogar beim Raketenstart (z. B. zum Mars). Kurs).

Teil endgültig, Referenz und Informationen

Den Rechen haben wir für Sie zusammengestellt

  • start_date. Ja, das ist bereits ein lokales Meme. Über Dougs Hauptargument start_date alle gehen vorbei. Kurz gesagt, wenn Sie in angeben start_date aktuelles Datum und schedule_interval - eines Tages, dann startet die DAG morgen frühestens.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Und keine Probleme mehr.

    Damit ist ein weiterer Laufzeitfehler verbunden: Task is missing the start_date parameter, was meistens darauf hindeutet, dass Sie vergessen haben, sich an den dag-Operator zu binden.

  • Alles auf einer Maschine. Ja, und Basen (Airflow selbst und unsere Beschichtung) und ein Webserver und ein Planer und Arbeiter. Und es hat sogar funktioniert. Aber im Laufe der Zeit wuchs die Anzahl der Aufgaben für Dienste, und als PostgreSQL begann, innerhalb von 20 s statt 5 ms auf den Index zu reagieren, haben wir es übernommen und mitgenommen.
  • LocalExecutor. Ja, wir sitzen immer noch darauf und sind bereits am Rande des Abgrunds angelangt. Bisher hat uns LocalExecutor ausgereicht, aber jetzt ist es an der Zeit, mit mindestens einem Mitarbeiter zu expandieren, und wir müssen hart arbeiten, um auf CeleryExecutor umzusteigen. Und angesichts der Tatsache, dass man damit auf einer Maschine arbeiten kann, hindert Sie nichts daran, Celery auch auf einem Server zu verwenden, der „selbstverständlich nie in Produktion gehen wird, ehrlich gesagt!“
  • Nichtbenutzung integrierte Werkzeuge:
    • Verbindungen um Dienstanmeldeinformationen zu speichern,
    • SLA-Fehlschüsse auf Aufgaben zu reagieren, die nicht rechtzeitig erledigt wurden,
    • xcom für den Metadatenaustausch (sagte ich MetaDaten!) zwischen Dag-Aufgaben.
  • E-Mail-Missbrauch. Nun was soll ich sagen? Für alle Wiederholungen fehlgeschlagener Aufgaben wurden Benachrichtigungen eingerichtet. Jetzt hat mein geschäftliches Gmail mehr als 90 E-Mails von Airflow und der Webmail-Maulkorb weigert sich, mehr als 100 gleichzeitig abzurufen und zu löschen.

Weitere Fallstricke: Apache Airflow-Pitfails

Weitere Automatisierungstools

Damit wir noch mehr mit dem Kopf und nicht mit den Händen arbeiten können, hat Airflow Folgendes für uns vorbereitet:

  • REST API - Er hat immer noch den Status „Experimental“, was ihn nicht von der Arbeit abhält. Damit können Sie nicht nur Informationen über Dags und Aufgaben abrufen, sondern auch einen Dag stoppen/starten, einen DAG-Run oder einen Pool erstellen.
  • CLI – Über die Befehlszeile sind viele Tools verfügbar, deren Verwendung über die WebUI nicht nur umständlich ist, sondern die im Allgemeinen nicht vorhanden sind. Zum Beispiel:
    • backfill erforderlich, um Aufgabeninstanzen neu zu starten.
      Zum Beispiel kamen Analysten und sagten: „Und Sie, Genosse, haben Unsinn in den Daten vom 1. bis 13. Januar!“ Repariere es, repariere es, repariere es, repariere es!“ Und du bist so ein Koch:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Basisservice: initdb, resetdb, upgradedb, checkdb.
    • run, wodurch Sie eine Instanzaufgabe ausführen und sogar für alle Abhängigkeiten punkten können. Darüber hinaus können Sie es über ausführen LocalExecutor, auch wenn Sie einen Sellerie-Cluster haben.
    • Macht so ziemlich das Gleiche test, nur schreibt auch in Basen nichts.
    • connections ermöglicht die Massenerstellung von Verbindungen aus der Shell.
  • Python-API - eine eher knallharte Art der Interaktion, die für Plugins gedacht ist und nicht mit kleinen Händen darin herumschwärmt. Aber wer soll uns davon abhalten, dorthin zu gehen? /home/airflow/dags, laufen ipython und anfangen herumzualbern? Sie können beispielsweise alle Verbindungen mit dem folgenden Code exportieren:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Herstellen einer Verbindung zur Airflow-Metadatenbank. Ich empfehle nicht, darin zu schreiben, aber das Abrufen von Aufgabenstatus für verschiedene spezifische Metriken kann viel schneller und einfacher sein als die Verwendung einer der APIs.

    Nehmen wir an, nicht alle unsere Aufgaben sind idempotent, aber sie können manchmal fallen, und das ist normal. Einige Blockaden sind jedoch bereits verdächtig und sollten überprüft werden.

    Vorsicht SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Referenzen

Und natürlich sind die ersten zehn Links aus der Ausgabe von Google der Inhalt des Airflow-Ordners aus meinen Lesezeichen.

Und die im Artikel verwendeten Links:

Source: habr.com