Hallo, ich bin Dmitry Logvinenko – Dateningenieur der Analytics-Abteilung der Vezet-Unternehmensgruppe.
Ich erzähle Ihnen von einem wunderbaren Tool zur Entwicklung von ETL-Prozessen – Apache Airflow. Aber Airflow ist so vielseitig und vielfältig, dass Sie es sich auch dann genauer ansehen sollten, wenn Sie nicht an Datenflüssen beteiligt sind, sondern regelmäßig Prozesse starten und deren Ausführung überwachen müssen.
Und ja, ich werde nicht nur erzählen, sondern auch zeigen: Das Programm hat jede Menge Code, Screenshots und Empfehlungen.
Was Sie normalerweise sehen, wenn Sie das Wort Airflow / Wikimedia Commons googeln
- nur besser, und es wurde für ganz andere Zwecke gemacht, nämlich (wie es vor der Kata geschrieben steht):
Ausführen und Überwachen von Aufgaben auf einer unbegrenzten Anzahl von Maschinen (so viele Celery / Kubernetes und Ihr Gewissen es zulassen)
mit dynamischer Workflow-Generierung aus sehr einfach zu schreibendem und verständlichem Python-Code
und die Möglichkeit, beliebige Datenbanken und APIs miteinander zu verbinden, indem sowohl vorgefertigte Komponenten als auch selbst erstellte Plugins verwendet werden (was äußerst einfach ist).
Wir verwenden Apache Airflow wie folgt:
Wir sammeln Daten aus verschiedenen Quellen (viele SQL Server- und PostgreSQL-Instanzen, verschiedene APIs mit Anwendungsmetriken, sogar 1C) in DWH und ODS (wir haben Vertica und Clickhouse).
wie fortgeschritten cron, der die Datenkonsolidierungsprozesse auf dem ODS startet und auch deren Wartung überwacht.
Bis vor Kurzem wurde unser Bedarf durch einen kleinen Server mit 32 Kernen und 50 GB RAM gedeckt. In Airflow funktioniert das:
mehr 200 dag (eigentlich Workflows, in die wir Aufgaben gestopft haben),
in jedem im Durchschnitt 70 Aufgaben,
diese Güte beginnt (auch im Durchschnitt) einmal pro Stunde.
Und darüber, wie wir expandiert haben, werde ich unten schreiben, aber jetzt definieren wir das Überproblem, das wir lösen werden:
Es gibt drei Quell-SQL-Server mit jeweils 50 Datenbanken – Instanzen eines Projekts bzw. sie haben die gleiche Struktur (fast überall, mua-ha-ha), was bedeutet, dass jeder eine Orders-Tabelle hat (zum Glück eine Tabelle damit). Name kann in jedes Unternehmen übernommen werden). Wir nehmen die Daten, indem wir Servicefelder hinzufügen (Quellserver, Quelldatenbank, ETL-Aufgaben-ID) und werfen sie naiv beispielsweise in Vertica ein.
Lassen Sie uns gehen!
Der Hauptteil, praktisch (und ein wenig theoretisch)
Warum tun wir (und Sie)
Als die Bäume groß waren und ich einfach SQL-schik In einem russischen Einzelhandelsgeschäft haben wir ETL-Prozesse, auch Datenflüsse genannt, mit zwei uns zur Verfügung stehenden Tools betrogen:
Informatica Power Center - ein extrem weitläufiges System, äußerst produktiv, mit eigener Hardware, eigener Versionierung. Ich habe, Gott bewahre, 1 % seiner Fähigkeiten genutzt. Warum? Zunächst einmal hat uns diese Schnittstelle, irgendwo aus den 380er Jahren, mental unter Druck gesetzt. Zweitens ist dieses Gerät für äußerst ausgefallene Prozesse, die rasante Wiederverwendung von Komponenten und andere sehr wichtige Unternehmenstricks konzipiert. Über die Tatsache, dass es kostet, wie der Flügel des Airbus AXNUMX / Jahr, werden wir nichts sagen.
Vorsicht, ein Screenshot kann Menschen unter 30 ein wenig verletzen
SQL Server-Integrationsserver - Wir haben diesen Kameraden in unseren projektinternen Abläufen verwendet. Nun, in der Tat: Wir verwenden bereits SQL Server und es wäre irgendwie unvernünftig, seine ETL-Tools nicht zu verwenden. Alles darin ist gut: Sowohl die Benutzeroberfläche als auch die Fortschrittsberichte sind wunderschön ... Aber das ist nicht der Grund, warum wir Softwareprodukte lieben, oh, nicht aus diesem Grund. Versionieren Sie es dtsx (das ist XML mit beim Speichern gemischten Knoten) können wir, aber wozu? Wie wäre es mit der Erstellung eines Aufgabenpakets, das Hunderte von Tabellen von einem Server auf einen anderen zieht? Ja, was für hundert, Ihr Zeigefinger wird aus zwanzig Teilen abfallen, wenn Sie mit der Maustaste klicken. Aber es sieht auf jeden Fall modischer aus:
Wir haben auf jeden Fall nach Auswegen gesucht. Fall sogar fast kam zu einem selbst geschriebenen SSIS-Paketgenerator ...
…und dann hat mich ein neuer Job gefunden. Und Apache Airflow hat mich dabei überholt.
Als ich herausfand, dass ETL-Prozessbeschreibungen einfacher Python-Code sind, tanzte ich einfach nicht vor Freude. Auf diese Weise wurden Datenströme versioniert und unterschieden, und das Eingießen von Tabellen mit einer einzigen Struktur aus Hunderten von Datenbanken in ein Ziel wurde zu einer Angelegenheit von Python-Code auf eineinhalb oder zwei 13-Zoll-Bildschirmen.
Zusammenbau des Clusters
Lassen Sie uns keinen kompletten Kindergarten arrangieren und hier nicht über völlig offensichtliche Dinge sprechen, wie die Installation von Airflow, der von Ihnen gewählten Datenbank, Celery und anderen in den Docks beschriebenen Fällen.
Damit wir sofort mit den Experimenten beginnen können, habe ich skizziert docker-compose.yml indem:
Lasst uns tatsächlich erhöhen Airflow: Planer, Webserver. Flower wird sich dort auch drehen, um Celery-Aufgaben zu überwachen (da es bereits hineingeschoben wurde). apache/airflow:1.10.10-python3.7, aber es macht uns nichts aus)
PostgreSQL, in die Airflow seine Serviceinformationen (Schedulerdaten, Ausführungsstatistiken usw.) schreibt und Celery abgeschlossene Aufgaben markiert;
Redis, das als Aufgabenvermittler für Celery fungieren wird;
Selleriearbeiter, die mit der direkten Ausführung von Aufgaben beschäftigt sein wird.
Zum Ordner ./dags Wir werden unsere Dateien mit der Beschreibung von dags hinzufügen. Sie werden im Handumdrehen eingesammelt, sodass Sie nicht nach jedem Niesen mit dem gesamten Stapel jonglieren müssen.
An einigen Stellen wird der Code in den Beispielen nicht vollständig angezeigt (um den Text nicht zu überladen), aber an einigen Stellen wird er dabei geändert. Vollständige Beispiele für funktionierenden Code finden Sie im Repository https://github.com/dm-logv/airflow-tutorial.
Bei der Zusammenstellung der Komposition habe ich mich weitgehend auf das bekannte Bild verlassen Puckel/Docker-Luftstrom - Schauen Sie sich das unbedingt an. Vielleicht brauchen Sie nichts anderes in Ihrem Leben.
Alle Airflow-Einstellungen sind nicht nur über verfügbar airflow.cfg, sondern auch durch Umgebungsvariablen (danke an die Entwickler), die ich böswillig ausgenutzt habe.
Natürlich ist es noch nicht produktionsbereit: Ich habe bewusst keine Heartbeats auf Container gesetzt, ich habe mich nicht um die Sicherheit gekümmert. Aber ich habe das Minimum getan, das für unsere Experimentatoren geeignet war.
Beachten Sie, dass:
Der dag-Ordner muss sowohl für den Planer als auch für die Worker zugänglich sein.
Das Gleiche gilt für alle Bibliotheken von Drittanbietern – sie müssen alle auf Maschinen mit einem Scheduler und Workern installiert werden.
Nun, jetzt ist es ganz einfach:
$ docker-compose up --scale worker=3
Nachdem alles aufgegangen ist, können Sie sich die Weboberflächen ansehen:
Wenn Sie in all diesen „Tagen“ nichts verstanden haben, finden Sie hier ein kurzes Wörterbuch:
Scheduler - Der wichtigste Onkel in Airflow, der kontrolliert, dass Roboter und nicht Menschen hart arbeiten: Er überwacht den Zeitplan, aktualisiert Tagebücher und startet Aufgaben.
Im Allgemeinen hatte er in älteren Versionen Probleme mit dem Speicher (nein, keine Amnesie, sondern Lecks) und der Legacy-Parameter blieb sogar in den Konfigurationen bestehen run_duration — sein Neustartintervall. Aber jetzt ist alles in Ordnung.
TAG (auch bekannt als „dag“) – „gerichteter azyklischer Graph“, aber eine solche Definition wird nur wenigen Menschen sagen, aber tatsächlich handelt es sich um einen Container für miteinander interagierende Aufgaben (siehe unten) oder ein Analogon von Package in SSIS und Workflow in Informatica .
Zusätzlich zu den Dags kann es noch Subdags geben, aber wir werden höchstwahrscheinlich nicht auf sie zugreifen.
DAG-Lauf - initialisierter Tag, dem ein eigener zugewiesen ist execution_date. Dagrans desselben Tages können parallel arbeiten (natürlich nur, wenn Sie Ihre Aufgaben idempotent gemacht haben).
Operator sind Codeteile, die für die Ausführung einer bestimmten Aktion verantwortlich sind. Es gibt drei Arten von Operatoren:
Aktionwie unser Favorit PythonOperator, das jeden (gültigen) Python-Code ausführen kann;
privaten Transfer, die Daten von Ort zu Ort transportieren, sagen wir, MsSqlToHiveTransfer;
Sensor Andererseits ermöglicht es Ihnen, zu reagieren oder die weitere Ausführung des Tages zu verlangsamen, bis ein Ereignis eintritt. HttpSensor kann den angegebenen Endpunkt abrufen und die Übertragung starten, wenn die gewünschte Antwort wartet GoogleCloudStorageToS3Operator. Ein neugieriger Geist wird fragen: „Warum? Schließlich kann man Wiederholungen direkt im Operator durchführen!“ Und dann, um den Aufgabenpool nicht mit suspendierten Operatoren zu verstopfen. Der Sensor startet, prüft und stirbt vor dem nächsten Versuch.
Aufgabe - Deklarierte Operatoren, unabhängig vom Typ, die an den Tag angehängt sind, werden in den Rang einer Aufgabe befördert.
Aufgabeninstanz - als der Generalplaner entschied, dass es an der Zeit sei, Aufgaben auf Leistungsträger-Arbeiter in die Schlacht zu schicken (direkt vor Ort, wenn wir sie verwenden). LocalExecutor oder an einen entfernten Knoten im Fall von CeleryExecutor), weist es ihnen einen Kontext zu (d. h. eine Reihe von Variablen – Ausführungsparameter), erweitert Befehls- oder Abfragevorlagen und bündelt sie.
Wir generieren Aufgaben
Lassen Sie uns zunächst das allgemeine Schema unseres Dougs skizzieren, und dann werden wir mehr und mehr in die Details eintauchen, da wir einige nicht triviale Lösungen anwenden.
In seiner einfachsten Form sieht ein solcher Tag also so aus:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Lassen Sie es uns herausfinden:
Zuerst importieren wir die notwendigen Bibliotheken und etwas anderes;
sql_server_ds - Das List[namedtuple[str, str]] mit den Namen der Verbindungen von Airflow Connections und den Datenbanken, aus denen wir unsere Platte entnehmen werden;
dag - die Ankündigung unseres Tages, die unbedingt dabei sein muss globals(), sonst findet Airflow es nicht. Doug muss auch sagen:
Wie heißt er orders - dieser Name erscheint dann im Webinterface,
dass er am XNUMX. Juli ab Mitternacht arbeiten wird,
und es sollte ungefähr alle 6 Stunden laufen (für harte Jungs hier statt). timedelta() zulässig cron-Linie 0 0 0/6 ? * * *, für die weniger Coolen - ein Ausdruck wie @daily);
workflow() wird die Hauptaufgabe erledigen, aber nicht jetzt. Im Moment speichern wir einfach unseren Kontext im Protokoll.
Und nun die einfache Magie des Erstellens von Aufgaben:
wir gehen unsere Quellen durch;
initialisieren PythonOperator, was unseren Dummy ausführen wird workflow(). Vergessen Sie nicht, einen eindeutigen Namen (innerhalb des Tags) für die Aufgabe anzugeben und den Tag selbst zu verknüpfen. Flagge provide_context wiederum fügt der Funktion zusätzliche Argumente hinzu, die wir sorgfältig sammeln werden **context.
Im Moment ist das alles. Was wir bekommen haben:
neuer Tag im Webinterface,
eineinhalbhundert Aufgaben, die parallel ausgeführt werden (sofern die Airflow-, Celery-Einstellungen und die Serverkapazität dies zulassen).
Nun, ich habe es fast geschafft.
Wer installiert die Abhängigkeiten?
Um das Ganze zu vereinfachen, habe ich es vermasselt docker-compose.yml wird bearbeitet requirements.txt auf allen Knoten.
Jetzt ist es weg:
Graue Quadrate sind vom Planer verarbeitete Aufgabeninstanzen.
Wir warten etwas, die Aufgaben werden von den Arbeitern übernommen:
Die Grünen haben ihre Arbeit natürlich erfolgreich abgeschlossen. Die Roten sind nicht sehr erfolgreich.
Auf unserem Produkt gibt es übrigens keinen Ordner ./dags, es gibt keine Synchronisation zwischen Maschinen - alle Tage liegen drin git auf unserem Gitlab, und Gitlab CI verteilt beim Zusammenführen Updates an Maschinen master.
Ein wenig über Blume
Während die Arbeiter unsere Schnuller zertrümmern, erinnern wir uns an ein anderes Werkzeug, das uns etwas zeigen kann – die Blume.
Die allererste Seite mit zusammenfassenden Informationen zu Worker-Knoten:
Die intensivste Seite mit Aufgaben, die funktioniert haben:
Die langweiligste Seite mit dem Status unseres Brokers:
Die hellste Seite enthält Aufgabenstatusdiagramme und deren Ausführungszeit:
Wir laden das Unterladene
Nachdem alle Aufgaben erledigt sind, können Sie die Verwundeten abtransportieren.
Und es gab viele Verwundete – aus dem einen oder anderen Grund. Bei korrekter Nutzung von Airflow deuten genau diese Quadrate darauf hin, dass die Daten definitiv nicht angekommen sind.
Sie müssen das Protokoll beobachten und die ausgefallenen Aufgabeninstanzen neu starten.
Durch Klicken auf ein beliebiges Quadrat sehen wir die uns zur Verfügung stehenden Aktionen:
Du kannst die Gefallenen nehmen und beseitigen. Das heißt, wir vergessen, dass dort etwas fehlgeschlagen ist, und die gleiche Instanzaufgabe wird an den Scheduler weitergeleitet.
Es ist klar, dass es nicht sehr menschlich ist, dies mit der Maus mit all den roten Quadraten zu tun – das ist nicht das, was wir von Airflow erwarten. Natürlich haben wir Massenvernichtungswaffen: Browse/Task Instances
Lassen Sie uns alles auf einmal auswählen und auf Null zurücksetzen. Klicken Sie auf das richtige Element:
Nach der Reinigung sehen unsere Taxis so aus (sie warten bereits darauf, dass der Disponent sie einplant):
Verbindungen, Hooks und andere Variablen
Es ist Zeit, einen Blick auf die nächste DAG zu werfen. update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Hat jeder jemals ein Berichtsupdate durchgeführt? Das ist noch einmal sie: Es gibt eine Liste von Quellen, aus denen man die Daten beziehen kann; es gibt eine Liste, wo man es ablegen kann; Vergessen Sie nicht zu hupen, wenn etwas passiert ist oder kaputt gegangen ist (naja, hier geht es nicht um uns, nein).
Gehen wir die Datei noch einmal durch und schauen uns die neuen, obskuren Dinge an:
from commons.operators import TelegramBotSendMessage - Nichts hindert uns daran, unsere eigenen Operatoren zu erstellen, was wir uns zunutze machten, indem wir einen kleinen Wrapper für den Versand von Nachrichten an Unblocked erstellten. (Wir werden weiter unten mehr über diesen Operator sprechen);
default_args={} - dag kann die gleichen Argumente an alle seine Operatoren verteilen;
to='{{ var.value.all_the_kings_men }}' - Feld to Wir werden nicht fest codiert, sondern dynamisch mithilfe von Jinja und einer Variablen mit einer Liste von E-Mails generiert, die ich sorgfältig eingegeben habe Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — Bedingung zum Starten des Antriebs. In unserem Fall fliegt der Brief erst dann an die Chefs, wenn alle Abhängigkeiten geklärt sind erfolgreich;
tg_bot_conn_id='tg_main' - Argumente conn_id Akzeptieren Sie Verbindungs-IDs, die wir erstellen Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - Nachrichten im Telegramm fliegen nur dann weg, wenn Aufgaben gefallen sind;
task_concurrency=1 - Wir verbieten den gleichzeitigen Start mehrerer Aufgabeninstanzen einer Aufgabe. Andernfalls werden wir mehrere gleichzeitig starten VerticaOperator (schaut auf einen Tisch);
report_update >> [email, tg] - Alle VerticaOperator vereinen sich beim Versenden von Briefen und Nachrichten wie folgt:
Da Notifier-Betreiber jedoch unterschiedliche Startbedingungen haben, funktioniert nur eine. In der Baumansicht sieht alles etwas weniger visuell aus:
Ich werde ein paar Worte dazu sagen Makros und ihre Freunde - Variablen.
Makros sind Jinja-Platzhalter, die verschiedene nützliche Informationen in Operatorargumente ersetzen können. Zum Beispiel so:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} wird auf den Inhalt der Kontextvariablen erweitert execution_date im Format YYYY-MM-DD: 2020-07-14. Das Beste daran ist, dass Kontextvariablen an eine bestimmte Aufgabeninstanz (ein Quadrat in der Baumansicht) gebunden sind und die Platzhalter beim Neustart auf dieselben Werte erweitert werden.
Die zugewiesenen Werte können über die Schaltfläche „Rendered“ auf jeder Aufgabeninstanz angezeigt werden. So funktioniert die Aufgabe beim Briefversand:
Und so bei der Aufgabe mit dem Versenden einer Nachricht:
Eine vollständige Liste der integrierten Makros für die neueste verfügbare Version finden Sie hier: Makros-Referenz
Darüber hinaus können wir mit Hilfe von Plugins unsere eigenen Makros deklarieren, aber das ist eine andere Geschichte.
Zusätzlich zu den vordefinierten Dingen können wir die Werte unserer Variablen ersetzen (ich habe dies bereits im Code oben verwendet). Lasst uns kreieren Admin/Variables ein paar Dinge:
Verwenden Sie einfach den Pfad zum gewünschten Schlüssel: {{ var.json.bot_config.bot.token }}.
Ich werde buchstäblich ein Wort sagen und einen Screenshot darüber zeigen Verbindungen. Hier ist alles elementar: auf der Seite Admin/Connections Wir erstellen eine Verbindung, fügen dort unsere Logins/Passwörter und spezifischere Parameter hinzu. So:
Passwörter können verschlüsselt werden (gründlicher als die Standardeinstellung) oder Sie können den Verbindungstyp weglassen (wie ich es für getan habe). tg_main) - Tatsache ist, dass die Liste der Typen in Airflow-Modellen fest verankert ist und nicht erweitert werden kann, ohne in die Quellcodes einzusteigen (falls ich plötzlich etwas nicht mehr gegoogelt habe, korrigieren Sie mich bitte), aber nichts wird uns davon abhalten, Credits einfach so zu bekommen Name.
Sie können auch mehrere Verbindungen mit demselben Namen herstellen: in diesem Fall die Methode BaseHook.get_connection(), das uns Verbindungen beim Namen verschafft, wird geben zufällig von mehreren Namensvettern (es wäre logischer, Round Robin zu machen, aber belassen wir es beim Gewissen der Airflow-Entwickler).
Variablen und Verbindungen sind sicherlich coole Tools, aber es ist wichtig, das Gleichgewicht nicht zu verlieren: welche Teile Ihrer Flows Sie im Code selbst speichern und welche Teile Sie Airflow zur Speicherung übergeben. Einerseits kann es praktisch sein, den Wert, beispielsweise ein Postfach, schnell über die Benutzeroberfläche zu ändern. Andererseits ist dies immer noch eine Rückkehr zum Mausklick, von dem wir (ich) loskommen wollten.
Zu den Aufgaben gehört die Arbeit mit Zusammenhängen Haken. Im Allgemeinen sind Airflow-Hooks Punkte für die Verbindung mit Diensten und Bibliotheken von Drittanbietern. Z.B, JiraHook öffnet einen Client für die Interaktion mit Jira (Sie können Aufgaben hin und her verschieben) und mithilfe von SambaHook Sie können eine lokale Datei dorthin verschieben smb-Punkt.
Analysieren des benutzerdefinierten Operators
Und wir konnten uns genau ansehen, wie es hergestellt wird TelegramBotSendMessage
Code commons/operators.py mit dem eigentlichen Betreiber:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Hier ist, wie alles andere in Airflow, alles ganz einfach:
Geerbt von BaseOperator, das einige Airflow-spezifische Dinge implementiert (schauen Sie sich nach Belieben um)
Deklarierte Felder template_fields, in dem Jinja nach zu verarbeitenden Makros sucht.
Die richtigen Argumente dafür zusammengestellt __init__(), legen Sie bei Bedarf die Standardeinstellungen fest.
Wir haben auch die Initialisierung des Vorfahren nicht vergessen.
Den entsprechenden Haken geöffnet TelegramBotHookhat davon ein Client-Objekt erhalten.
Überschriebene (neu definierte) Methode BaseOperator.execute(), das Airfow zucken wird, wenn es an der Zeit ist, den Operator zu starten - darin werden wir die Hauptaktion umsetzen und vergessen, uns anzumelden. (Wir loggen uns übrigens direkt ein stdout и stderr - Der Luftstrom fängt alles auf, umhüllt es schön und zersetzt es, wo es nötig ist.)
Mal sehen, was wir haben commons/hooks.py. Der erste Teil der Datei mit dem Hook selbst:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ich weiß gar nicht, was ich hier erklären soll, ich notiere nur die wichtigen Punkte:
Wir erben, denken Sie über die Argumente nach – in den meisten Fällen wird es eines sein: conn_id;
Standardmethoden außer Kraft setzen: Ich habe mich eingeschränkt get_conn(), in dem ich die Verbindungsparameter nach Namen erhalte und nur den Abschnitt erhalte extra (das ist ein JSON-Feld), in das ich (nach meinen eigenen Anweisungen!) das Telegram-Bot-Token eingefügt habe: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ich erstelle eine Instanz von uns TelegramBotund ihm ein bestimmtes Token geben.
Das ist alles. Sie können einen Client von einem Hook abrufen, indem Sie verwenden TelegramBotHook().clent oder TelegramBotHook().get_conn().
Und der zweite Teil der Datei, in dem ich einen Microwrapper für die Telegram-REST-API erstelle, um nicht dasselbe zu ziehen python-telegram-bot für eine Methode sendMessage.
Der richtige Weg ist, alles zusammenzuzählen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - Im Plugin ein öffentliches Repository ablegen und an Open Source übergeben.
Während wir das alles studierten, schlugen unsere Berichtsaktualisierungen erfolgreich fehl und schickten mir eine Fehlermeldung im Kanal. Ich werde nachsehen, ob es falsch ist ...
In unserem Dogen ist etwas kaputt gegangen! Ist das nicht das, was wir erwartet haben? Exakt!
Wirst du einschenken?
Haben Sie das Gefühl, dass ich etwas verpasst habe? Es scheint, dass er versprochen hat, Daten von SQL Server nach Vertica zu übertragen, und dann hat er es angenommen und ist vom Thema abgekommen, der Schurke!
Diese Gräueltat war Absicht, ich musste einfach ein paar Begriffe für Sie entschlüsseln. Jetzt können Sie weitermachen.
Unser Plan war folgender:
Mach's gut
Aufgaben generieren
Sehen Sie, wie schön alles ist
Weisen Sie Füllungen Sitzungsnummern zu
Holen Sie sich Daten von SQL Server
Geben Sie Daten in Vertica ein
Statistiken sammeln
Um das alles zum Laufen zu bringen, habe ich eine kleine Ergänzung zu unserem vorgenommen docker-compose.yml:
Vertica als Gastgeber dwh mit den meisten Standardeinstellungen,
drei Instanzen von SQL Server,
Wir füllen die Datenbanken in letzterem mit einigen Daten (auf keinen Fall schauen). mssql_init.py!)
Wir starten alles Gute mit Hilfe eines etwas komplizierteren Befehls als beim letzten Mal:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Was unser Wunder-Randomizer generiert hat, können Sie als Gegenstand verwenden Data Profiling/Ad Hoc Query:
Die Hauptsache ist, es den Analysten nicht zu zeigen
näher ausführen ETL-Sitzungen Das werde ich nicht tun, da ist alles trivial: Wir erstellen eine Basis, es gibt ein Zeichen darin, wir verpacken alles mit einem Kontextmanager und jetzt machen wir Folgendes:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Die Zeit ist gekommen sammeln unsere Daten von unseren eineinhalbhundert Tischen. Machen wir das mit Hilfe sehr unprätentiöser Zeilen:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Mit Hilfe eines Hakens bekommen wir von Airflow pymssql-verbinden
Ersetzen wir eine Einschränkung in Form eines Datums in der Anfrage – diese wird von der Template-Engine in die Funktion geworfen.
Füttere unsere Bitte pandasWer wird uns kriegen? DataFrame - Es wird uns in Zukunft nützlich sein.
Ich verwende Substitution {dt} anstelle eines Anforderungsparameters %s Nicht weil ich ein böser Pinocchio bin, sondern weil pandas kann es nicht bewältigen pymssql und rutscht den letzten aus params: Listobwohl er es wirklich will tuple.
Beachten Sie auch, dass der Entwickler pymssql beschlossen, ihn nicht mehr zu unterstützen, und es ist Zeit, auszuziehen pyodbc.
Schauen wir uns an, womit Airflow die Argumente unserer Funktionen vollgestopft hat:
Wenn keine Daten vorhanden sind, macht es keinen Sinn, fortzufahren. Aber es ist auch seltsam, die Füllung als gelungen zu betrachten. Aber das ist kein Fehler. A-ah-ah, was tun?! Und hier ist was:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException wird Airflow mitteilen, dass keine Fehler vorliegen, aber wir überspringen die Aufgabe. Die Schnittstelle wird kein grünes oder rotes Quadrat haben, sondern rosa.
Die Datenbank, aus der wir die Bestellungen entnommen haben,
ID unserer Überschwemmungssitzung (es wird anders sein für jede Aufgabe),
Ein Hash aus der Quelle und der Bestell-ID – so dass wir in der endgültigen Datenbank (in der alles in einer Tabelle zusammengefasst wird) eine eindeutige Bestell-ID haben.
Bleibt noch der vorletzte Schritt: Alles in Vertica füllen. Und seltsamerweise ist CSV eine der spektakulärsten und effizientesten Möglichkeiten, dies zu tun!
Beim Verkauf erstellen wir die Zieltafel manuell. Hier habe ich mir eine kleine Maschine gegönnt:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
ich benutze VerticaOperator() Ich erstelle ein Datenbankschema und eine Tabelle (sofern diese noch nicht vorhanden sind, natürlich). Die Hauptsache ist, die Abhängigkeiten richtig anzuordnen:
- Na ja, - sagte die kleine Maus, - nicht wahr?
Sind Sie davon überzeugt, dass ich das schrecklichste Tier im Wald bin?
Julia Donaldson, Der Grüffelo
Ich denke, wenn meine Kollegen und ich einen Wettbewerb hätten: Wer würde schnell einen ETL-Prozess von Grund auf erstellen und starten: Sie mit ihrem SSIS und einer Maus und ich mit Airflow ... Und dann würden wir auch die Wartungsfreundlichkeit vergleichen ... Wow, ich denke, Sie werden mir zustimmen, dass ich sie an allen Fronten schlagen werde!
Wenn es etwas ernster ist, dann hat Apache Airflow – indem es Prozesse in Form von Programmcode beschreibt – meine Aufgabe erfüllt viel komfortabler und angenehmer.
Seine unbegrenzte Erweiterbarkeit, sowohl in Bezug auf Plug-Ins als auch auf die Skalierbarkeit, gibt Ihnen die Möglichkeit, Airflow in nahezu jedem Bereich einzusetzen: sogar im gesamten Zyklus der Datenerfassung, -aufbereitung und -verarbeitung, sogar beim Raketenstart (z. B. zum Mars). Kurs).
Teil endgültig, Referenz und Informationen
Den Rechen haben wir für Sie zusammengestellt
start_date. Ja, das ist bereits ein lokales Meme. Über Dougs Hauptargument start_date alle gehen vorbei. Kurz gesagt, wenn Sie in angeben start_date aktuelles Datum und schedule_interval - eines Tages, dann startet die DAG morgen frühestens.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Und keine Probleme mehr.
Damit ist ein weiterer Laufzeitfehler verbunden: Task is missing the start_date parameter, was meistens darauf hindeutet, dass Sie vergessen haben, sich an den dag-Operator zu binden.
Alles auf einer Maschine. Ja, und Basen (Airflow selbst und unsere Beschichtung) und ein Webserver und ein Planer und Arbeiter. Und es hat sogar funktioniert. Aber im Laufe der Zeit wuchs die Anzahl der Aufgaben für Dienste, und als PostgreSQL begann, innerhalb von 20 s statt 5 ms auf den Index zu reagieren, haben wir es übernommen und mitgenommen.
LocalExecutor. Ja, wir sitzen immer noch darauf und sind bereits am Rande des Abgrunds angelangt. Bisher hat uns LocalExecutor ausgereicht, aber jetzt ist es an der Zeit, mit mindestens einem Mitarbeiter zu expandieren, und wir müssen hart arbeiten, um auf CeleryExecutor umzusteigen. Und angesichts der Tatsache, dass man damit auf einer Maschine arbeiten kann, hindert Sie nichts daran, Celery auch auf einem Server zu verwenden, der „selbstverständlich nie in Produktion gehen wird, ehrlich gesagt!“
Nichtbenutzung integrierte Werkzeuge:
Verbindungen um Dienstanmeldeinformationen zu speichern,
SLA-Fehlschüsse auf Aufgaben zu reagieren, die nicht rechtzeitig erledigt wurden,
xcom für den Metadatenaustausch (sagte ich MetaDaten!) zwischen Dag-Aufgaben.
E-Mail-Missbrauch. Nun was soll ich sagen? Für alle Wiederholungen fehlgeschlagener Aufgaben wurden Benachrichtigungen eingerichtet. Jetzt hat mein geschäftliches Gmail mehr als 90 E-Mails von Airflow und der Webmail-Maulkorb weigert sich, mehr als 100 gleichzeitig abzurufen und zu löschen.
Damit wir noch mehr mit dem Kopf und nicht mit den Händen arbeiten können, hat Airflow Folgendes für uns vorbereitet:
REST API - Er hat immer noch den Status „Experimental“, was ihn nicht von der Arbeit abhält. Damit können Sie nicht nur Informationen über Dags und Aufgaben abrufen, sondern auch einen Dag stoppen/starten, einen DAG-Run oder einen Pool erstellen.
CLI – Über die Befehlszeile sind viele Tools verfügbar, deren Verwendung über die WebUI nicht nur umständlich ist, sondern die im Allgemeinen nicht vorhanden sind. Zum Beispiel:
backfill erforderlich, um Aufgabeninstanzen neu zu starten.
Zum Beispiel kamen Analysten und sagten: „Und Sie, Genosse, haben Unsinn in den Daten vom 1. bis 13. Januar!“ Repariere es, repariere es, repariere es, repariere es!“ Und du bist so ein Koch:
run, wodurch Sie eine Instanzaufgabe ausführen und sogar für alle Abhängigkeiten punkten können. Darüber hinaus können Sie es über ausführen LocalExecutor, auch wenn Sie einen Sellerie-Cluster haben.
Macht so ziemlich das Gleiche test, nur schreibt auch in Basen nichts.
connections ermöglicht die Massenerstellung von Verbindungen aus der Shell.
Python-API - eine eher knallharte Art der Interaktion, die für Plugins gedacht ist und nicht mit kleinen Händen darin herumschwärmt. Aber wer soll uns davon abhalten, dorthin zu gehen? /home/airflow/dags, laufen ipython und anfangen herumzualbern? Sie können beispielsweise alle Verbindungen mit dem folgenden Code exportieren:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Herstellen einer Verbindung zur Airflow-Metadatenbank. Ich empfehle nicht, darin zu schreiben, aber das Abrufen von Aufgabenstatus für verschiedene spezifische Metriken kann viel schneller und einfacher sein als die Verwendung einer der APIs.
Nehmen wir an, nicht alle unsere Aufgaben sind idempotent, aber sie können manchmal fallen, und das ist normal. Einige Blockaden sind jedoch bereits verdächtig und sollten überprüft werden.
Vorsicht SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Referenzen
Und natürlich sind die ersten zehn Links aus der Ausgabe von Google der Inhalt des Airflow-Ordners aus meinen Lesezeichen.
Apache Airflow-Dokumentation - Natürlich müssen wir mit dem Büro beginnen. Dokumentation, aber wer liest die Anleitung?
Praxisbeispiele - Nun, lesen Sie zumindest die Empfehlungen der Macher.
Das Zen von Python und Apache Airflow - implizite DAG-Weiterleitung, Kontexteinwurf in Funktionen, wiederum über Abhängigkeiten und auch über das Überspringen von Aufgabenstarts.