Hallo, ich bin Dmitry Logvinenko – Dateningenieur der Analytics-Abteilung der Vezet-Unternehmensgruppe.
Ich erzähle Ihnen von einem wunderbaren Tool zur Entwicklung von ETL-Prozessen – Apache Airflow. Aber Airflow ist so vielseitig und vielfältig, dass Sie es sich auch dann genauer ansehen sollten, wenn Sie nicht an Datenflüssen beteiligt sind, sondern regelmäßig Prozesse starten und deren Ausführung überwachen müssen.
Und ja, ich werde nicht nur erzählen, sondern auch zeigen: Das Programm hat jede Menge Code, Screenshots und Empfehlungen.

Was Sie normalerweise sehen, wenn Sie das Wort Airflow / Wikimedia Commons googeln
Inhaltsverzeichnis
Einführung
Apache Airflow ist genau wie Django:
- geschrieben in Python
- Es gibt ein tolles Admin-Panel,
- unbegrenzt erweiterbar
- nur besser, und es wurde für ganz andere Zwecke gemacht, nämlich (wie es vor der Kata geschrieben steht):
- Ausführen und Überwachen von Aufgaben auf einer unbegrenzten Anzahl von Maschinen (so viele Celery / Kubernetes und Ihr Gewissen es zulassen)
- mit dynamischer Workflow-Generierung aus sehr einfach zu schreibendem und verständlichem Python-Code
- und die Möglichkeit, beliebige Datenbanken und APIs miteinander zu verbinden, indem sowohl vorgefertigte Komponenten als auch selbst erstellte Plugins verwendet werden (was äußerst einfach ist).
Wir verwenden Apache Airflow wie folgt:
- Wir sammeln Daten aus verschiedenen Quellen (viele SQL Server- und PostgreSQL-Instanzen, verschiedene APIs mit Anwendungsmetriken, sogar 1C) in DWH und ODS (wir haben Vertica und Clickhouse).
- wie fortgeschritten
cron, der die Datenkonsolidierungsprozesse auf dem ODS startet und auch deren Wartung überwacht.
Bis vor Kurzem wurde unser Bedarf durch einen kleinen Server mit 32 Kernen und 50 GB RAM gedeckt. In Airflow funktioniert das:
- mehr 200 dag (eigentlich Workflows, in die wir Aufgaben gestopft haben),
- in jedem im Durchschnitt 70 Aufgaben,
- diese Güte beginnt (auch im Durchschnitt) einmal pro Stunde.
Und darüber, wie wir expandiert haben, werde ich unten schreiben, aber jetzt definieren wir das Überproblem, das wir lösen werden:
Es gibt drei Quell-SQL-Server mit jeweils 50 Datenbanken – Instanzen eines Projekts bzw. sie haben die gleiche Struktur (fast überall, mua-ha-ha), was bedeutet, dass jeder eine Orders-Tabelle hat (zum Glück eine Tabelle damit). Name kann in jedes Unternehmen übernommen werden). Wir nehmen die Daten, indem wir Servicefelder hinzufügen (Quellserver, Quelldatenbank, ETL-Aufgaben-ID) und werfen sie naiv beispielsweise in Vertica ein.
Lassen Sie uns gehen!
Der Hauptteil, praktisch (und ein wenig theoretisch)
Warum tun wir (und Sie)
Als die Bäume groß waren und ich einfach SQL-schik In einem russischen Einzelhandelsgeschäft haben wir ETL-Prozesse, auch Datenflüsse genannt, mit zwei uns zur Verfügung stehenden Tools betrogen:
- Informatica Power Center - ein extrem weitläufiges System, äußerst produktiv, mit eigener Hardware, eigener Versionierung. Ich habe, Gott bewahre, 1 % seiner Fähigkeiten genutzt. Warum? Zunächst einmal hat uns diese Schnittstelle, irgendwo aus den 380er Jahren, mental unter Druck gesetzt. Zweitens ist dieses Gerät für äußerst ausgefallene Prozesse, die rasante Wiederverwendung von Komponenten und andere sehr wichtige Unternehmenstricks konzipiert. Über die Tatsache, dass es kostet, wie der Flügel des Airbus AXNUMX / Jahr, werden wir nichts sagen.
Vorsicht, ein Screenshot kann Menschen unter 30 ein wenig verletzen

- SQL Server-Integrationsserver - Wir haben diesen Kameraden in unseren projektinternen Abläufen verwendet. Nun, in der Tat: Wir verwenden bereits SQL Server und es wäre irgendwie unvernünftig, seine ETL-Tools nicht zu verwenden. Alles darin ist gut: Sowohl die Benutzeroberfläche als auch die Fortschrittsberichte sind wunderschön ... Aber das ist nicht der Grund, warum wir Softwareprodukte lieben, oh, nicht aus diesem Grund. Versionieren Sie es
dtsx(das ist XML mit beim Speichern gemischten Knoten) können wir, aber wozu? Wie wäre es mit der Erstellung eines Aufgabenpakets, das Hunderte von Tabellen von einem Server auf einen anderen zieht? Ja, was für hundert, Ihr Zeigefinger wird aus zwanzig Teilen abfallen, wenn Sie mit der Maustaste klicken. Aber es sieht auf jeden Fall modischer aus:
Wir haben auf jeden Fall nach Auswegen gesucht. Fall sogar fast kam zu einem selbst geschriebenen SSIS-Paketgenerator ...
…und dann hat mich ein neuer Job gefunden. Und Apache Airflow hat mich dabei überholt.
Als ich herausfand, dass ETL-Prozessbeschreibungen einfacher Python-Code sind, tanzte ich einfach nicht vor Freude. Auf diese Weise wurden Datenströme versioniert und unterschieden, und das Eingießen von Tabellen mit einer einzigen Struktur aus Hunderten von Datenbanken in ein Ziel wurde zu einer Angelegenheit von Python-Code auf eineinhalb oder zwei 13-Zoll-Bildschirmen.
Zusammenbau des Clusters
Lassen Sie uns keinen kompletten Kindergarten arrangieren und hier nicht über völlig offensichtliche Dinge sprechen, wie die Installation von Airflow, der von Ihnen gewählten Datenbank, Celery und anderen in den Docks beschriebenen Fällen.
Damit wir sofort mit den Experimenten beginnen können, habe ich skizziert docker-compose.yml indem:
- Lasst uns tatsächlich erhöhen Airflow: Planer, Webserver. Flower wird sich dort auch drehen, um Celery-Aufgaben zu überwachen (da es bereits hineingeschoben wurde).
apache/airflow:1.10.10-python3.7, aber es macht uns nichts aus) - PostgreSQL, in die Airflow seine Serviceinformationen (Schedulerdaten, Ausführungsstatistiken usw.) schreibt und Celery abgeschlossene Aufgaben markiert;
- Redis, das als Aufgabenvermittler für Celery fungieren wird;
- Selleriearbeiter, die mit der direkten Ausführung von Aufgaben beschäftigt sein wird.
- Zum Ordner
./dagsWir werden unsere Dateien mit der Beschreibung von dags hinzufügen. Sie werden im Handumdrehen eingesammelt, sodass Sie nicht nach jedem Niesen mit dem gesamten Stapel jonglieren müssen.
An einigen Stellen wird der Code in den Beispielen nicht vollständig angezeigt (um den Text nicht zu überladen), aber an einigen Stellen wird er dabei geändert. Vollständige Beispiele für funktionierenden Code finden Sie im Repository .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotes:
- Bei der Zusammenstellung der Komposition habe ich mich weitgehend auf das bekannte Bild verlassen - Schauen Sie sich das unbedingt an. Vielleicht brauchen Sie nichts anderes in Ihrem Leben.
- Alle Airflow-Einstellungen sind nicht nur über verfügbar
airflow.cfg, sondern auch durch Umgebungsvariablen (danke an die Entwickler), die ich böswillig ausgenutzt habe. - Natürlich ist es noch nicht produktionsbereit: Ich habe bewusst keine Heartbeats auf Container gesetzt, ich habe mich nicht um die Sicherheit gekümmert. Aber ich habe das Minimum getan, das für unsere Experimentatoren geeignet war.
- Beachten Sie, dass:
- Der dag-Ordner muss sowohl für den Planer als auch für die Worker zugänglich sein.
- Das Gleiche gilt für alle Bibliotheken von Drittanbietern – sie müssen alle auf Maschinen mit einem Scheduler und Workern installiert werden.
Nun, jetzt ist es ganz einfach:
$ docker-compose up --scale worker=3Nachdem alles aufgegangen ist, können Sie sich die Weboberflächen ansehen:
- Luftzug:
- Blume:
Grundlegende Konzepte
Wenn Sie in all diesen „Tagen“ nichts verstanden haben, finden Sie hier ein kurzes Wörterbuch:
- Scheduler - Der wichtigste Onkel in Airflow, der kontrolliert, dass Roboter und nicht Menschen hart arbeiten: Er überwacht den Zeitplan, aktualisiert Tagebücher und startet Aufgaben.
Im Allgemeinen hatte er in älteren Versionen Probleme mit dem Speicher (nein, keine Amnesie, sondern Lecks) und der Legacy-Parameter blieb sogar in den Konfigurationen bestehen
run_duration— sein Neustartintervall. Aber jetzt ist alles in Ordnung. - TAG (auch bekannt als „dag“) – „gerichteter azyklischer Graph“, aber eine solche Definition wird nur wenigen Menschen sagen, aber tatsächlich handelt es sich um einen Container für miteinander interagierende Aufgaben (siehe unten) oder ein Analogon von Package in SSIS und Workflow in Informatica .
Zusätzlich zu den Dags kann es noch Subdags geben, aber wir werden höchstwahrscheinlich nicht auf sie zugreifen.
- DAG-Lauf - initialisierter Tag, dem ein eigener zugewiesen ist
execution_date. Dagrans desselben Tages können parallel arbeiten (natürlich nur, wenn Sie Ihre Aufgaben idempotent gemacht haben). - Operator sind Codeteile, die für die Ausführung einer bestimmten Aktion verantwortlich sind. Es gibt drei Arten von Operatoren:
- Aktionwie unser Favorit
PythonOperator, das jeden (gültigen) Python-Code ausführen kann; - privaten Transfer, die Daten von Ort zu Ort transportieren, sagen wir,
MsSqlToHiveTransfer; - Sensor Andererseits ermöglicht es Ihnen, zu reagieren oder die weitere Ausführung des Tages zu verlangsamen, bis ein Ereignis eintritt.
HttpSensorkann den angegebenen Endpunkt abrufen und die Übertragung starten, wenn die gewünschte Antwort wartetGoogleCloudStorageToS3Operator. Ein neugieriger Geist wird fragen: „Warum? Schließlich kann man Wiederholungen direkt im Operator durchführen!“ Und dann, um den Aufgabenpool nicht mit suspendierten Operatoren zu verstopfen. Der Sensor startet, prüft und stirbt vor dem nächsten Versuch.
- Aktionwie unser Favorit
- Aufgabe - Deklarierte Operatoren, unabhängig vom Typ, die an den Tag angehängt sind, werden in den Rang einer Aufgabe befördert.
- Aufgabeninstanz - als der Generalplaner entschied, dass es an der Zeit sei, Aufgaben auf Leistungsträger-Arbeiter in die Schlacht zu schicken (direkt vor Ort, wenn wir sie verwenden).
LocalExecutoroder an einen entfernten Knoten im Fall vonCeleryExecutor), weist es ihnen einen Kontext zu (d. h. eine Reihe von Variablen – Ausführungsparameter), erweitert Befehls- oder Abfragevorlagen und bündelt sie.
Wir generieren Aufgaben
Lassen Sie uns zunächst das allgemeine Schema unseres Dougs skizzieren, und dann werden wir mehr und mehr in die Details eintauchen, da wir einige nicht triviale Lösungen anwenden.
In seiner einfachsten Form sieht ein solcher Tag also so aus:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Lassen Sie es uns herausfinden:
- Zuerst importieren wir die notwendigen Bibliotheken und etwas anderes;
sql_server_ds- DasList[namedtuple[str, str]]mit den Namen der Verbindungen von Airflow Connections und den Datenbanken, aus denen wir unsere Platte entnehmen werden;dag- die Ankündigung unseres Tages, die unbedingt dabei sein mussglobals(), sonst findet Airflow es nicht. Doug muss auch sagen:- Wie heißt er
orders- dieser Name erscheint dann im Webinterface, - dass er am XNUMX. Juli ab Mitternacht arbeiten wird,
- und es sollte ungefähr alle 6 Stunden laufen (für harte Jungs hier statt).
timedelta()zulässigcron-Linie0 0 0/6 ? * * *, für die weniger Coolen - ein Ausdruck wie@daily);
- Wie heißt er
workflow()wird die Hauptaufgabe erledigen, aber nicht jetzt. Im Moment speichern wir einfach unseren Kontext im Protokoll.- Und nun die einfache Magie des Erstellens von Aufgaben:
- wir gehen unsere Quellen durch;
- initialisieren
PythonOperator, was unseren Dummy ausführen wirdworkflow(). Vergessen Sie nicht, einen eindeutigen Namen (innerhalb des Tags) für die Aufgabe anzugeben und den Tag selbst zu verknüpfen. Flaggeprovide_contextwiederum fügt der Funktion zusätzliche Argumente hinzu, die wir sorgfältig sammeln werden**context.
Im Moment ist das alles. Was wir bekommen haben:
- neuer Tag im Webinterface,
- eineinhalbhundert Aufgaben, die parallel ausgeführt werden (sofern die Airflow-, Celery-Einstellungen und die Serverkapazität dies zulassen).
Nun, ich habe es fast geschafft.

Wer installiert die Abhängigkeiten?
Um das Ganze zu vereinfachen, habe ich es vermasselt docker-compose.yml wird bearbeitet requirements.txt auf allen Knoten.
Jetzt ist es weg:

Graue Quadrate sind vom Planer verarbeitete Aufgabeninstanzen.
Wir warten etwas, die Aufgaben werden von den Arbeitern übernommen:

Die Grünen haben ihre Arbeit natürlich erfolgreich abgeschlossen. Die Roten sind nicht sehr erfolgreich.
Auf unserem Produkt gibt es übrigens keinen Ordner
./dags, es gibt keine Synchronisation zwischen Maschinen - alle Tage liegen dringitauf unserem Gitlab, und Gitlab CI verteilt beim Zusammenführen Updates an Maschinenmaster.
Ein wenig über Blume
Während die Arbeiter unsere Schnuller zertrümmern, erinnern wir uns an ein anderes Werkzeug, das uns etwas zeigen kann – die Blume.
Die allererste Seite mit zusammenfassenden Informationen zu Worker-Knoten:

Die intensivste Seite mit Aufgaben, die funktioniert haben:

Die langweiligste Seite mit dem Status unseres Brokers:

Die hellste Seite enthält Aufgabenstatusdiagramme und deren Ausführungszeit:

Wir laden das Unterladene
Nachdem alle Aufgaben erledigt sind, können Sie die Verwundeten abtransportieren.

Und es gab viele Verwundete – aus dem einen oder anderen Grund. Bei korrekter Nutzung von Airflow deuten genau diese Quadrate darauf hin, dass die Daten definitiv nicht angekommen sind.
Sie müssen das Protokoll beobachten und die ausgefallenen Aufgabeninstanzen neu starten.
Durch Klicken auf ein beliebiges Quadrat sehen wir die uns zur Verfügung stehenden Aktionen:

Du kannst die Gefallenen nehmen und beseitigen. Das heißt, wir vergessen, dass dort etwas fehlgeschlagen ist, und die gleiche Instanzaufgabe wird an den Scheduler weitergeleitet.

Es ist klar, dass es nicht sehr menschlich ist, dies mit der Maus mit all den roten Quadraten zu tun – das ist nicht das, was wir von Airflow erwarten. Natürlich haben wir Massenvernichtungswaffen: Browse/Task Instances

Lassen Sie uns alles auf einmal auswählen und auf Null zurücksetzen. Klicken Sie auf das richtige Element:

Nach der Reinigung sehen unsere Taxis so aus (sie warten bereits darauf, dass der Disponent sie einplant):

Verbindungen, Hooks und andere Variablen
Es ist Zeit, einen Blick auf die nächste DAG zu werfen. update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Hat jeder jemals ein Berichtsupdate durchgeführt? Das ist noch einmal sie: Es gibt eine Liste von Quellen, aus denen man die Daten beziehen kann; es gibt eine Liste, wo man es ablegen kann; Vergessen Sie nicht zu hupen, wenn etwas passiert ist oder kaputt gegangen ist (naja, hier geht es nicht um uns, nein).
Gehen wir die Datei noch einmal durch und schauen uns die neuen, obskuren Dinge an:
from commons.operators import TelegramBotSendMessage- Nichts hindert uns daran, unsere eigenen Operatoren zu erstellen, was wir uns zunutze machten, indem wir einen kleinen Wrapper für den Versand von Nachrichten an Unblocked erstellten. (Wir werden weiter unten mehr über diesen Operator sprechen);default_args={}- dag kann die gleichen Argumente an alle seine Operatoren verteilen;to='{{ var.value.all_the_kings_men }}'- FeldtoWir werden nicht fest codiert, sondern dynamisch mithilfe von Jinja und einer Variablen mit einer Liste von E-Mails generiert, die ich sorgfältig eingegeben habeAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— Bedingung zum Starten des Antriebs. In unserem Fall fliegt der Brief erst dann an die Chefs, wenn alle Abhängigkeiten geklärt sind erfolgreich;tg_bot_conn_id='tg_main'- Argumenteconn_idAkzeptieren Sie Verbindungs-IDs, die wir erstellenAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Nachrichten im Telegramm fliegen nur dann weg, wenn Aufgaben gefallen sind;task_concurrency=1- Wir verbieten den gleichzeitigen Start mehrerer Aufgabeninstanzen einer Aufgabe. Andernfalls werden wir mehrere gleichzeitig startenVerticaOperator(schaut auf einen Tisch);report_update >> [email, tg]- AlleVerticaOperatorvereinen sich beim Versenden von Briefen und Nachrichten wie folgt:

Da Notifier-Betreiber jedoch unterschiedliche Startbedingungen haben, funktioniert nur eine. In der Baumansicht sieht alles etwas weniger visuell aus:

Ich werde ein paar Worte dazu sagen Makros und ihre Freunde - Variablen.
Makros sind Jinja-Platzhalter, die verschiedene nützliche Informationen in Operatorargumente ersetzen können. Zum Beispiel so:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} wird auf den Inhalt der Kontextvariablen erweitert execution_date im Format YYYY-MM-DD: 2020-07-14. Das Beste daran ist, dass Kontextvariablen an eine bestimmte Aufgabeninstanz (ein Quadrat in der Baumansicht) gebunden sind und die Platzhalter beim Neustart auf dieselben Werte erweitert werden.
Die zugewiesenen Werte können über die Schaltfläche „Rendered“ auf jeder Aufgabeninstanz angezeigt werden. So funktioniert die Aufgabe beim Briefversand:

Und so bei der Aufgabe mit dem Versenden einer Nachricht:

Eine vollständige Liste der integrierten Makros für die neueste verfügbare Version finden Sie hier:
Darüber hinaus können wir mit Hilfe von Plugins unsere eigenen Makros deklarieren, aber das ist eine andere Geschichte.
Zusätzlich zu den vordefinierten Dingen können wir die Werte unserer Variablen ersetzen (ich habe dies bereits im Code oben verwendet). Lasst uns kreieren Admin/Variables ein paar Dinge:

Alles, was Sie verwenden können:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Der Wert kann ein Skalar oder auch JSON sein. Im Fall von JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}Verwenden Sie einfach den Pfad zum gewünschten Schlüssel: {{ var.json.bot_config.bot.token }}.
Ich werde buchstäblich ein Wort sagen und einen Screenshot darüber zeigen Verbindungen. Hier ist alles elementar: auf der Seite Admin/Connections Wir erstellen eine Verbindung, fügen dort unsere Logins/Passwörter und spezifischere Parameter hinzu. So:

Passwörter können verschlüsselt werden (gründlicher als die Standardeinstellung) oder Sie können den Verbindungstyp weglassen (wie ich es für getan habe). tg_main) - Tatsache ist, dass die Liste der Typen in Airflow-Modellen fest verankert ist und nicht erweitert werden kann, ohne in die Quellcodes einzusteigen (falls ich plötzlich etwas nicht mehr gegoogelt habe, korrigieren Sie mich bitte), aber nichts wird uns davon abhalten, Credits einfach so zu bekommen Name.
Sie können auch mehrere Verbindungen mit demselben Namen herstellen: in diesem Fall die Methode BaseHook.get_connection(), das uns Verbindungen beim Namen verschafft, wird geben zufällig von mehreren Namensvettern (es wäre logischer, Round Robin zu machen, aber belassen wir es beim Gewissen der Airflow-Entwickler).
Variablen und Verbindungen sind sicherlich coole Tools, aber es ist wichtig, das Gleichgewicht nicht zu verlieren: welche Teile Ihrer Flows Sie im Code selbst speichern und welche Teile Sie Airflow zur Speicherung übergeben. Einerseits kann es praktisch sein, den Wert, beispielsweise ein Postfach, schnell über die Benutzeroberfläche zu ändern. Andererseits ist dies immer noch eine Rückkehr zum Mausklick, von dem wir (ich) loskommen wollten.
Zu den Aufgaben gehört die Arbeit mit Zusammenhängen Haken. Im Allgemeinen sind Airflow-Hooks Punkte für die Verbindung mit Diensten und Bibliotheken von Drittanbietern. Z.B, JiraHook öffnet einen Client für die Interaktion mit Jira (Sie können Aufgaben hin und her verschieben) und mithilfe von SambaHook Sie können eine lokale Datei dorthin verschieben smb-Punkt.
Analysieren des benutzerdefinierten Operators
Und wir konnten uns genau ansehen, wie es hergestellt wird TelegramBotSendMessage
Code commons/operators.py mit dem eigentlichen Betreiber:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Hier ist, wie alles andere in Airflow, alles ganz einfach:
- Geerbt von
BaseOperator, das einige Airflow-spezifische Dinge implementiert (schauen Sie sich nach Belieben um) - Deklarierte Felder
template_fields, in dem Jinja nach zu verarbeitenden Makros sucht. - Die richtigen Argumente dafür zusammengestellt
__init__(), legen Sie bei Bedarf die Standardeinstellungen fest. - Wir haben auch die Initialisierung des Vorfahren nicht vergessen.
- Den entsprechenden Haken geöffnet
TelegramBotHookhat davon ein Client-Objekt erhalten. - Überschriebene (neu definierte) Methode
BaseOperator.execute(), das Airfow zucken wird, wenn es an der Zeit ist, den Operator zu starten - darin werden wir die Hauptaktion umsetzen und vergessen, uns anzumelden. (Wir loggen uns übrigens direkt einstdoutиstderr- Der Luftstrom fängt alles auf, umhüllt es schön und zersetzt es, wo es nötig ist.)
Mal sehen, was wir haben commons/hooks.py. Der erste Teil der Datei mit dem Hook selbst:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientIch weiß gar nicht, was ich hier erklären soll, ich notiere nur die wichtigen Punkte:
- Wir erben, denken Sie über die Argumente nach – in den meisten Fällen wird es eines sein:
conn_id; - Standardmethoden außer Kraft setzen: Ich habe mich eingeschränkt
get_conn(), in dem ich die Verbindungsparameter nach Namen erhalte und nur den Abschnitt erhalteextra(das ist ein JSON-Feld), in das ich (nach meinen eigenen Anweisungen!) das Telegram-Bot-Token eingefügt habe:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ich erstelle eine Instanz von uns
TelegramBotund ihm ein bestimmtes Token geben.
Das ist alles. Sie können einen Client von einem Hook abrufen, indem Sie verwenden TelegramBotHook().clent oder TelegramBotHook().get_conn().
Und der zweite Teil der Datei, in dem ich einen Microwrapper für die Telegram-REST-API erstelle, um nicht dasselbe zu ziehen für eine Methode sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Der richtige Weg ist, alles zusammenzuzählen:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- Im Plugin ein öffentliches Repository ablegen und an Open Source übergeben.
Während wir das alles studierten, schlugen unsere Berichtsaktualisierungen erfolgreich fehl und schickten mir eine Fehlermeldung im Kanal. Ich werde nachsehen, ob es falsch ist ...

In unserem Dogen ist etwas kaputt gegangen! Ist das nicht das, was wir erwartet haben? Exakt!
Wirst du einschenken?
Haben Sie das Gefühl, dass ich etwas verpasst habe? Es scheint, dass er versprochen hat, Daten von SQL Server nach Vertica zu übertragen, und dann hat er es angenommen und ist vom Thema abgekommen, der Schurke!
Diese Gräueltat war Absicht, ich musste einfach ein paar Begriffe für Sie entschlüsseln. Jetzt können Sie weitermachen.
Unser Plan war folgender:
- Mach's gut
- Aufgaben generieren
- Sehen Sie, wie schön alles ist
- Weisen Sie Füllungen Sitzungsnummern zu
- Holen Sie sich Daten von SQL Server
- Geben Sie Daten in Vertica ein
- Statistiken sammeln
Um das alles zum Laufen zu bringen, habe ich eine kleine Ergänzung zu unserem vorgenommen docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyDa erheben wir:
- Vertica als Gastgeber
dwhmit den meisten Standardeinstellungen, - drei Instanzen von SQL Server,
- Wir füllen die Datenbanken in letzterem mit einigen Daten (auf keinen Fall schauen).
mssql_init.py!)
Wir starten alles Gute mit Hilfe eines etwas komplizierteren Befehls als beim letzten Mal:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Was unser Wunder-Randomizer generiert hat, können Sie als Gegenstand verwenden Data Profiling/Ad Hoc Query:

Die Hauptsache ist, es den Analysten nicht zu zeigen
näher ausführen ETL-Sitzungen Das werde ich nicht tun, da ist alles trivial: Wir erstellen eine Basis, es gibt ein Zeichen darin, wir verpacken alles mit einem Kontextmanager und jetzt machen wir Folgendes:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passDie Zeit ist gekommen sammeln unsere Daten von unseren eineinhalbhundert Tischen. Machen wir das mit Hilfe sehr unprätentiöser Zeilen:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Mit Hilfe eines Hakens bekommen wir von Airflow
pymssql-verbinden - Ersetzen wir eine Einschränkung in Form eines Datums in der Anfrage – diese wird von der Template-Engine in die Funktion geworfen.
- Füttere unsere Bitte
pandasWer wird uns kriegen?DataFrame- Es wird uns in Zukunft nützlich sein.
Ich verwende Substitution
{dt}anstelle eines Anforderungsparameters%sNicht weil ich ein böser Pinocchio bin, sondern weilpandaskann es nicht bewältigenpymssqlund rutscht den letzten ausparams: Listobwohl er es wirklich willtuple.
Beachten Sie auch, dass der Entwicklerpymssqlbeschlossen, ihn nicht mehr zu unterstützen, und es ist Zeit, auszuziehenpyodbc.
Schauen wir uns an, womit Airflow die Argumente unserer Funktionen vollgestopft hat:

Wenn keine Daten vorhanden sind, macht es keinen Sinn, fortzufahren. Aber es ist auch seltsam, die Füllung als gelungen zu betrachten. Aber das ist kein Fehler. A-ah-ah, was tun?! Und hier ist was:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException wird Airflow mitteilen, dass keine Fehler vorliegen, aber wir überspringen die Aufgabe. Die Schnittstelle wird kein grünes oder rotes Quadrat haben, sondern rosa.
Werfen wir unsere Daten weg mehrere Spalten:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Diese sind:
- Die Datenbank, aus der wir die Bestellungen entnommen haben,
- ID unserer Überschwemmungssitzung (es wird anders sein für jede Aufgabe),
- Ein Hash aus der Quelle und der Bestell-ID – so dass wir in der endgültigen Datenbank (in der alles in einer Tabelle zusammengefasst wird) eine eindeutige Bestell-ID haben.
Bleibt noch der vorletzte Schritt: Alles in Vertica füllen. Und seltsamerweise ist CSV eine der spektakulärsten und effizientesten Möglichkeiten, dies zu tun!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Wir stellen einen besonderen Empfänger her
StringIO. pandasWir werden uns freundlicherweise zur Verfügung stellenDataFramealsCSV-Linien.- Lassen Sie uns mit einem Haken eine Verbindung zu unserem Lieblings-Vertica herstellen.
- Und jetzt mit der Hilfe
copy()Senden Sie unsere Daten direkt an Vertika!
Wir entnehmen dem Fahrer, wie viele Zeilen gefüllt wurden, und teilen dem Sitzungsmanager mit, dass alles in Ordnung ist:
session.loaded_rows = cursor.rowcount
session.successful = TrueDas ist alles.
Beim Verkauf erstellen wir die Zieltafel manuell. Hier habe ich mir eine kleine Maschine gegönnt:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ich benutze
VerticaOperator()Ich erstelle ein Datenbankschema und eine Tabelle (sofern diese noch nicht vorhanden sind, natürlich). Die Hauptsache ist, die Abhängigkeiten richtig anzuordnen:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadZusammenfassend
- Na ja, - sagte die kleine Maus, - nicht wahr?
Sind Sie davon überzeugt, dass ich das schrecklichste Tier im Wald bin?
Julia Donaldson, Der Grüffelo
Ich denke, wenn meine Kollegen und ich einen Wettbewerb hätten: Wer würde schnell einen ETL-Prozess von Grund auf erstellen und starten: Sie mit ihrem SSIS und einer Maus und ich mit Airflow ... Und dann würden wir auch die Wartungsfreundlichkeit vergleichen ... Wow, ich denke, Sie werden mir zustimmen, dass ich sie an allen Fronten schlagen werde!
Wenn es etwas ernster ist, dann hat Apache Airflow – indem es Prozesse in Form von Programmcode beschreibt – meine Aufgabe erfüllt viel komfortabler und angenehmer.
Seine unbegrenzte Erweiterbarkeit, sowohl in Bezug auf Plug-Ins als auch auf die Skalierbarkeit, gibt Ihnen die Möglichkeit, Airflow in nahezu jedem Bereich einzusetzen: sogar im gesamten Zyklus der Datenerfassung, -aufbereitung und -verarbeitung, sogar beim Raketenstart (z. B. zum Mars). Kurs).
Teil endgültig, Referenz und Informationen
Den Rechen haben wir für Sie zusammengestellt
start_date. Ja, das ist bereits ein lokales Meme. Über Dougs Hauptargumentstart_datealle gehen vorbei. Kurz gesagt, wenn Sie in angebenstart_dateaktuelles Datum undschedule_interval- eines Tages, dann startet die DAG morgen frühestens.start_date = datetime(2020, 7, 7, 0, 1, 2)Und keine Probleme mehr.
Damit ist ein weiterer Laufzeitfehler verbunden:
Task is missing the start_date parameter, was meistens darauf hindeutet, dass Sie vergessen haben, sich an den dag-Operator zu binden.- Alles auf einer Maschine. Ja, und Basen (Airflow selbst und unsere Beschichtung) und ein Webserver und ein Planer und Arbeiter. Und es hat sogar funktioniert. Aber im Laufe der Zeit wuchs die Anzahl der Aufgaben für Dienste, und als PostgreSQL begann, innerhalb von 20 s statt 5 ms auf den Index zu reagieren, haben wir es übernommen und mitgenommen.
- LocalExecutor. Ja, wir sitzen immer noch darauf und sind bereits am Rande des Abgrunds angelangt. Bisher hat uns LocalExecutor ausgereicht, aber jetzt ist es an der Zeit, mit mindestens einem Mitarbeiter zu expandieren, und wir müssen hart arbeiten, um auf CeleryExecutor umzusteigen. Und angesichts der Tatsache, dass man damit auf einer Maschine arbeiten kann, hindert Sie nichts daran, Celery auch auf einem Server zu verwenden, der „selbstverständlich nie in Produktion gehen wird, ehrlich gesagt!“
- Nichtbenutzung integrierte Werkzeuge:
- Verbindungen um Dienstanmeldeinformationen zu speichern,
- SLA-Fehlschüsse auf Aufgaben zu reagieren, die nicht rechtzeitig erledigt wurden,
- xcom für den Metadatenaustausch (sagte ich MetaDaten!) zwischen Dag-Aufgaben.
- E-Mail-Missbrauch. Nun was soll ich sagen? Für alle Wiederholungen fehlgeschlagener Aufgaben wurden Benachrichtigungen eingerichtet. Jetzt hat mein geschäftliches Gmail mehr als 90 E-Mails von Airflow und der Webmail-Maulkorb weigert sich, mehr als 100 gleichzeitig abzurufen und zu löschen.
Weitere Fallstricke:
Weitere Automatisierungstools
Damit wir noch mehr mit dem Kopf und nicht mit den Händen arbeiten können, hat Airflow Folgendes für uns vorbereitet:
- - Er hat immer noch den Status „Experimental“, was ihn nicht von der Arbeit abhält. Damit können Sie nicht nur Informationen über Dags und Aufgaben abrufen, sondern auch einen Dag stoppen/starten, einen DAG-Run oder einen Pool erstellen.
- – Über die Befehlszeile sind viele Tools verfügbar, deren Verwendung über die WebUI nicht nur umständlich ist, sondern die im Allgemeinen nicht vorhanden sind. Zum Beispiel:
backfillerforderlich, um Aufgabeninstanzen neu zu starten.
Zum Beispiel kamen Analysten und sagten: „Und Sie, Genosse, haben Unsinn in den Daten vom 1. bis 13. Januar!“ Repariere es, repariere es, repariere es, repariere es!“ Und du bist so ein Koch:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Basisservice:
initdb,resetdb,upgradedb,checkdb. run, wodurch Sie eine Instanzaufgabe ausführen und sogar für alle Abhängigkeiten punkten können. Darüber hinaus können Sie es über ausführenLocalExecutor, auch wenn Sie einen Sellerie-Cluster haben.- Macht so ziemlich das Gleiche
test, nur schreibt auch in Basen nichts. connectionsermöglicht die Massenerstellung von Verbindungen aus der Shell.
- - eine eher knallharte Art der Interaktion, die für Plugins gedacht ist und nicht mit kleinen Händen darin herumschwärmt. Aber wer soll uns davon abhalten, dorthin zu gehen?
/home/airflow/dags, laufenipythonund anfangen herumzualbern? Sie können beispielsweise alle Verbindungen mit dem folgenden Code exportieren:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Herstellen einer Verbindung zur Airflow-Metadatenbank. Ich empfehle nicht, darin zu schreiben, aber das Abrufen von Aufgabenstatus für verschiedene spezifische Metriken kann viel schneller und einfacher sein als die Verwendung einer der APIs.
Nehmen wir an, nicht alle unsere Aufgaben sind idempotent, aber sie können manchmal fallen, und das ist normal. Einige Blockaden sind jedoch bereits verdächtig und sollten überprüft werden.
Vorsicht SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Referenzen
Und natürlich sind die ersten zehn Links aus der Ausgabe von Google der Inhalt des Airflow-Ordners aus meinen Lesezeichen.
- - Natürlich müssen wir mit dem Büro beginnen. Dokumentation, aber wer liest die Anleitung?
- - Nun, lesen Sie zumindest die Empfehlungen der Macher.
- - ganz am Anfang: die Benutzeroberfläche in Bildern
- - Die Grundkonzepte sind gut beschrieben, falls Sie (plötzlich!) etwas von mir nicht verstanden haben.
- – eine kurze Anleitung zum Einrichten eines Airflow-Clusters.
- – fast derselbe interessante Artikel, außer vielleicht mehr Formalismus und weniger Beispiele.
- – über die Zusammenarbeit mit Celery.
- - über die Idempotenz von Aufgaben, das Laden nach ID statt nach Datum, Transformation, Dateistruktur und andere interessante Dinge.
- - Abhängigkeiten von Aufgaben und Triggerregel, die ich nur am Rande erwähnt habe.
- - wie man einige „funktioniert wie vorgesehen“ im Planer überwindet, verlorene Daten lädt und Aufgaben priorisiert.
- – nützliche SQL-Abfragen für Airflow-Metadaten.
- - Es gibt einen nützlichen Abschnitt zum Erstellen eines benutzerdefinierten Sensors.
- – eine interessante kurze Anmerkung zum Aufbau einer Infrastruktur auf AWS für Data Science.
- - häufige Fehler (wenn jemand die Anweisungen immer noch nicht liest).
- - Lächeln Sie, wie die Leute das Speichern von Passwörtern scheuen, obwohl Sie einfach Connections verwenden können.
- - implizite DAG-Weiterleitung, Kontexteinwurf in Funktionen, wiederum über Abhängigkeiten und auch über das Überspringen von Aufgabenstarts.
- - über die Verwendung
default argumentsиparamsin Vorlagen sowie Variablen und Verbindungen. - - eine Geschichte darüber, wie sich der Planer auf Airflow 2.0 vorbereitet.
- – ein etwas veralteter Artikel über die Bereitstellung unseres Clusters in
docker-compose. - - Dynamische Aufgaben mithilfe von Vorlagen und Kontextweiterleitung.
- — Standard- und benutzerdefinierte Benachrichtigungen per E-Mail und Slack.
- - Verzweigungsaufgaben, Makros und XCom.
Und die im Artikel verwendeten Links:
- - Platzhalter zur Verwendung in Vorlagen verfügbar.
- — Häufige Fehler beim Erstellen von Tags.
- -
docker-composezum Experimentieren, Debuggen und mehr. - – Python-Wrapper für die Telegram-REST-API.
Source: habr.com




