Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Hey Habr! In diesem Artikel möchte ich über ein großartiges Tool zur Entwicklung von Batch-Datenverarbeitungsprozessen sprechen, beispielsweise in der Infrastruktur eines Unternehmens-DWH oder Ihres DataLake. Wir werden über Apache Airflow (im Folgenden als Airflow bezeichnet) sprechen. Habré wird ihm zu Unrecht die Aufmerksamkeit entzogen, und im Hauptteil werde ich versuchen, Sie davon zu überzeugen, dass es sich bei der Auswahl eines Schedulers für Ihre ETL-/ELT-Prozesse zumindest lohnt, einen Blick auf Airflow zu werfen.

Zuvor habe ich eine Reihe von Artikeln zum Thema DWH geschrieben, als ich bei der Tinkoff Bank arbeitete. Jetzt bin ich Teil des Teams der Mail.Ru Group geworden und entwickle eine Plattform zur Datenanalyse im Gaming-Bereich. Sobald es Neuigkeiten und interessante Lösungen gibt, werden das Team und ich hier über unsere Plattform für Datenanalysen sprechen.

Prolog

Also, fangen wir an. Was ist Luftstrom? Dies ist eine Bibliothek (bzw Reihe von Bibliotheken) zur Entwicklung, Planung und Überwachung von Arbeitsabläufen. Das Hauptmerkmal von Airflow besteht darin, dass Python-Code zur Beschreibung (Entwicklung) von Prozessen verwendet wird. Dies hat viele Vorteile für die Organisation Ihres Projekts und Ihrer Entwicklung: Tatsächlich ist Ihr (zum Beispiel) ETL-Projekt nur ein Python-Projekt, und Sie können es nach Ihren Wünschen organisieren, unter Berücksichtigung der Merkmale der Infrastruktur, der Teamgröße usw andere Vorraussetzungen. Instrumental ist alles einfach. Verwenden Sie zum Beispiel PyCharm + Git. Es ist großartig und sehr praktisch!

Schauen wir uns nun die Haupteinheiten von Airflow an. Wenn Sie deren Wesen und Zweck verstanden haben, organisieren Sie die Prozessarchitektur optimal. Die vielleicht wichtigste Entität ist der gerichtete azyklische Graph (im Folgenden DAG).

TAG

DAG ist eine semantische Zuordnung Ihrer Aufgaben, die Sie in einer genau definierten Reihenfolge und nach einem bestimmten Zeitplan erledigen möchten. Airflow bietet eine praktische Weboberfläche für die Arbeit mit DAGs und anderen Entitäten:

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

DAG könnte so aussehen:

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Beim Entwerfen einer DAG legt ein Entwickler eine Reihe von Operatoren fest, auf denen Aufgaben innerhalb der DAG aufgebaut werden. Hier kommen wir zu einer weiteren wichtigen Einheit: dem Airflow Operator.

Betreiber

Ein Operator ist eine Entität, auf deren Grundlage Jobinstanzen erstellt werden und die beschreibt, was während der Ausführung einer Jobinstanz geschieht. Airflow-Veröffentlichungen von GitHub enthalten bereits einen Satz gebrauchsfertiger Anweisungen. Beispiele:

  • BashOperator ist ein Operator zum Ausführen eines Bash-Befehls.
  • PythonOperator ist ein Operator zum Aufrufen von Python-Code.
  • EmailOperator – Operator zum Senden von E-Mails.
  • HTTPOperator – ein Operator für die Arbeit mit http-Anfragen.
  • SqlOperator ist ein Operator zum Ausführen von SQL-Code.
  • Sensor ist ein Operator zum Warten auf ein Ereignis (das Eintreffen der gewünschten Zeit, das Erscheinen der erforderlichen Datei, eine Zeile in der Datenbank, eine Antwort von der API usw. usw.).

Es gibt spezifischere Operatoren: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Sie können auch Operatoren nach Ihren Bedürfnissen entwickeln und in Ihrem Projekt verwenden. Beispielsweise haben wir MongoDBToHiveViaHdfsTransfer erstellt, einen Operator zum Exportieren von Dokumenten von MongoDB nach Hive, und mehrere Operatoren zum Arbeiten damit Clickhouse: CHLoadFromHiveOperator und CHTableLoaderOperator. Sobald in einem Projekt häufig Code verwendet wird, der auf einfachen Anweisungen basiert, können Sie darüber nachdenken, ihn in eine neue Anweisung zu kompilieren. Dies vereinfacht die weitere Entwicklung und Sie erweitern Ihre Operatorenbibliothek im Projekt.

Darüber hinaus müssen alle diese Aufgabeninstanzen ausgeführt werden, und jetzt werden wir über den Scheduler sprechen.

Planer

Der Aufgabenplaner in Airflow basiert darauf Sellerie. Celery ist eine Python-Bibliothek, mit der Sie eine Warteschlange sowie die asynchrone und verteilte Ausführung von Aufgaben organisieren können. Auf der Airflow-Seite sind alle Aufgaben in Pools unterteilt. Pools werden manuell erstellt. Ihr Zweck besteht in der Regel darin, die Arbeit mit der Quelle zu entlasten oder Aufgaben innerhalb des DWH einzutippen. Pools können über die Weboberfläche verwaltet werden:

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Für jeden Pool ist die Anzahl der Slots begrenzt. Beim Erstellen einer DAG wird ihr ein Pool zugewiesen:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Der auf DAG-Ebene festgelegte Pool kann auf Aufgabenebene überschrieben werden.
Ein separater Prozess, Scheduler, ist für die Planung aller Aufgaben in Airflow verantwortlich. Tatsächlich befasst sich der Scheduler mit allen Mechanismen zum Festlegen von Aufgaben zur Ausführung. Eine Aufgabe durchläuft vor ihrer Ausführung mehrere Phasen:

  1. Vorherige Aufgaben wurden im DAG erledigt, eine neue kann in die Warteschlange gestellt werden.
  2. Die Warteschlange wird nach der Priorität der Aufgaben sortiert (Prioritäten können auch gesteuert werden), und wenn im Pool ein freier Platz vorhanden ist, kann die Aufgabe zur Arbeit gebracht werden.
  3. Wenn ein freier Arbeiter Sellerie vorhanden ist, wird die Aufgabe an ihn gesendet; Die Arbeit, die Sie in der Aufgabe programmiert haben, beginnt mit dem einen oder anderen Operator.

Einfach genug.

Der Scheduler wird auf einer Reihe aller DAGs und aller Aufgaben innerhalb der DAGs ausgeführt.

Damit der Scheduler mit der DAG arbeiten kann, muss die DAG einen Zeitplan festlegen:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Es gibt eine Reihe vorgefertigter Voreinstellungen: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Sie können auch Cron-Ausdrücke verwenden:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ausführungsdatum

Um zu verstehen, wie Airflow funktioniert, ist es wichtig zu verstehen, was ein Ausführungsdatum für eine DAG ist. Der Airflow DAG verfügt über die Dimension „Ausführungsdatum“, d. h. abhängig vom Arbeitsplan des DAG werden für jedes Ausführungsdatum Aufgabeninstanzen erstellt. Und für jeden Ausführungstermin können Aufgaben erneut ausgeführt werden – oder beispielsweise kann ein DAG an mehreren Ausführungsterminen gleichzeitig arbeiten. Dies wird hier deutlich gezeigt:

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Wenn die Implementierung der Aufgabe im DAG korrekt ist, erfolgt leider (oder vielleicht auch zum Glück: es hängt von der Situation) die Ausführung am vorherigen Ausführungsdatum mit den Anpassungen. Das ist gut, wenn Sie Daten vergangener Zeiträume mit einem neuen Algorithmus neu berechnen müssen, aber es ist schlecht, weil die Reproduzierbarkeit des Ergebnisses verloren geht (natürlich macht sich niemand die Mühe, die erforderliche Version des Quellcodes von Git zurückzugeben und zu berechnen, was Sie tun). einmal benötigen, je nach Bedarf).

Aufgabengenerierung

Bei der DAG-Implementierung handelt es sich um Python-Code, sodass wir eine sehr praktische Möglichkeit haben, die Codemenge zu reduzieren, wenn wir beispielsweise mit Shard-Quellen arbeiten. Angenommen, Sie haben drei MySQL-Shards als Quelle, Sie müssen in jeden hineinklettern und einige Daten abrufen. Und zwar unabhängig und parallel. Der Python-Code im DAG könnte so aussehen:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Die DAG sieht so aus:

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Gleichzeitig können Sie einen Shard hinzufügen oder entfernen, indem Sie einfach die Einstellung anpassen und die DAG aktualisieren. Komfortabel!

Sie können auch eine komplexere Codegenerierung verwenden, beispielsweise mit Quellen in Form einer Datenbank arbeiten oder eine tabellarische Struktur, einen Algorithmus für die Arbeit mit einer Tabelle, beschreiben und unter Berücksichtigung der Besonderheiten der DWH-Infrastruktur den Prozess generieren des Ladens von N Tabellen in Ihren Speicher. Wenn Sie beispielsweise mit einer API arbeiten, die die Arbeit mit einem Parameter in Form einer Liste nicht unterstützt, können Sie mithilfe dieser Liste N Aufgaben in einem DAG generieren, die Parallelität von Anforderungen in der API auf einen Pool beschränken und extrahieren die notwendigen Daten aus der API. Flexibel!

Repository

Airflow verfügt über ein eigenes Backend-Repository, eine Datenbank (vielleicht MySQL oder Postgres, wir haben Postgres), die den Status von Aufgaben, DAGs, Verbindungseinstellungen, globalen Variablen usw. usw. speichert. Hier möchte ich sagen, dass das Repository in Airflow ist sehr einfach (ca. 20 Tabellen) und praktisch, wenn Sie einen Ihrer Prozesse darauf aufbauen möchten. Ich erinnere mich an 100500 Tabellen im Informatica-Repository, die lange durchsucht werden mussten, bevor man verstand, wie man eine Abfrage erstellt.

Überwachung

Angesichts der Einfachheit des Repositorys können Sie einen für Sie bequemen Aufgabenüberwachungsprozess erstellen. Wir nutzen einen Notizblock in Zeppelin, in dem wir uns den Status von Aufgaben ansehen:

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Es kann sich auch um die Weboberfläche von Airflow selbst handeln:

Airflow ist ein Tool zur komfortablen und schnellen Entwicklung und Wartung von Batch-Datenverarbeitungsprozessen

Der Airflow-Code ist offen, daher haben wir eine Warnung in Telegram hinzugefügt. Wenn ein Fehler auftritt, sendet jede laufende Aufgabeninstanz Spam an die Telegram-Gruppe, in der sich das gesamte Entwicklungs- und Supportteam befindet.

Wir erhalten eine zeitnahe Antwort per Telegram (falls erforderlich), per Zeppelin – ein Gesamtbild der Aufgaben in Airflow.

Insgesamt

Airflow ist in erster Linie Open Source und man darf keine Wunder erwarten. Seien Sie bereit, Zeit und Mühe zu investieren, um eine funktionierende Lösung zu entwickeln. Ein Ziel aus der Kategorie erreichbar, glauben Sie mir, es lohnt sich. Entwicklungsgeschwindigkeit, Flexibilität, einfache Hinzufügung neuer Prozesse – Sie werden es lieben. Natürlich muss man der Organisation des Projekts und der Stabilität der Arbeit von Airflow selbst große Aufmerksamkeit schenken: Es gibt keine Wunder.

Jetzt funktioniert Airflow täglich etwa 6,5 ​​Tausend Aufgaben. Sie sind von Natur aus ganz unterschiedlich. Es gibt Aufgaben zum Laden von Daten aus vielen verschiedenen und sehr spezifischen Quellen in das Haupt-DWH, es gibt Aufgaben zum Berechnen von Storefronts innerhalb des Haupt-DWH, es gibt Aufgaben zum Veröffentlichen von Daten in einem schnellen DWH, es gibt viele, viele verschiedene Aufgaben – und Airflow kaut sie den ganzen Tag. In Zahlen ausgedrückt ist dies der Fall 2,3 tausend ELT-Aufgaben unterschiedlicher Komplexität innerhalb von DWH (Hadoop), ca 2,5 Hundert Datenbanken Quellen, dies ist ein Befehl von 4 ETL-Entwickler, die unterteilt sind in ETL-Datenverarbeitung im DWH und ELT-Datenverarbeitung im DWH und natürlich mehr ein Administrator, das sich mit der Infrastruktur des Dienstes befasst.

Pläne für die Zukunft

Die Anzahl der Prozesse wächst zwangsläufig und das Wichtigste, was wir in Bezug auf die Airflow-Infrastruktur tun werden, ist die Skalierung. Wir möchten einen Airflow-Cluster aufbauen, ein paar Beine für Celery-Arbeiter zuweisen und einen doppelten Kopf mit Jobplanungsprozessen und einem Repository erstellen.

Letzter Akt

Das ist natürlich bei weitem nicht alles, was ich über Airflow sagen möchte, aber ich habe versucht, die wichtigsten Punkte hervorzuheben. Der Appetit kommt mit dem Essen, probieren Sie es aus und es wird Ihnen schmecken 🙂

Source: habr.com

Kommentar hinzufügen