So erstellen Sie einen DAG-Trigger in Airflow mithilfe der experimentellen API

Bei der Vorbereitung unserer Bildungsprogramme stoßen wir regelmäßig auf Schwierigkeiten bei der Arbeit mit einigen Tools. Und in dem Moment, in dem wir auf sie stoßen, gibt es nicht immer genügend Dokumentation und Artikel, die bei der Bewältigung dieses Problems helfen würden.

So war es beispielsweise im Jahr 2015, als wir den Hadoop-Cluster mit Spark für 35 gleichzeitige Benutzer im Rahmen des Big Data Specialist-Programms nutzten. Es war nicht klar, wie man es mit YARN auf einen solchen Anwendungsfall vorbereiten sollte. Das Ergebnis war, dass sie es taten, nachdem sie den Weg selbst herausgefunden und beschritten hatten Beitrag auf Habré und auch durchgeführt Moskauer Spark-Treffen.

Vorgeschichte

Dieses Mal werden wir über ein anderes Programm sprechen - Dateningenieur. Darauf bauen unsere Teilnehmer zwei Arten von Architektur auf: Lambda und Kappa. Und in der Lambdba-Architektur wird Airflow als Teil der Stapelverarbeitung verwendet, um Protokolle von HDFS an ClickHouse zu übertragen.

Im Großen und Ganzen ist alles gut. Lassen Sie sie ihre Pipelines bauen. Allerdings gibt es ein „Aber“: Alle unsere Programme sind hinsichtlich des Lernprozesses selbst technologisch fortschrittlich. Um das Labor zu überprüfen, verwenden wir automatische Prüfer: Der Teilnehmer muss zu seinem persönlichen Konto gehen, auf die Schaltfläche „Prüfen“ klicken und nach einer Weile sieht er eine Art erweitertes Feedback zu dem, was er getan hat. Und an diesem Punkt beginnen wir, uns unserem Problem zu nähern.

Die Überprüfung dieses Labors ist wie folgt aufgebaut: Wir senden ein Kontrolldatenpaket an den Kafka des Teilnehmers, dann überträgt Gobblin dieses Datenpaket an HDFS, dann nimmt Airflow dieses Datenpaket und legt es in ClickHouse ab. Der Trick besteht darin, dass Airflow dies nicht in Echtzeit tun muss, sondern nach einem Zeitplan: Alle 15 Minuten nimmt es eine Reihe von Dateien und lädt sie hoch.

Es stellt sich heraus, dass wir ihren DAG auf unsere Anfrage hin irgendwie selbst auslösen müssen, während der Prüfer hier und jetzt läuft. Beim Googeln haben wir herausgefunden, dass es für spätere Versionen von Airflow ein sogenanntes gibt Experimentelle API. Wort experimentalNatürlich klingt es beängstigend, aber was tun ... Es geht plötzlich los.

Als Nächstes beschreiben wir den gesamten Pfad: von der Installation von Airflow bis zur Generierung einer POST-Anfrage, die mithilfe der experimentellen API einen DAG auslöst. Wir werden mit Ubuntu 16.04 arbeiten.

1. Installation des Luftstroms

Überprüfen wir, ob wir Python 3 und Virtualenv haben.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Wenn eines davon fehlt, installieren Sie es.

Nun erstellen wir ein Verzeichnis, in dem wir mit Airflow weiterarbeiten.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Airflow installieren:

(venv) $ pip install airflow

Version, an der wir gearbeitet haben: 1.10.

Jetzt müssen wir ein Verzeichnis erstellen airflow_home, wo sich die DAG-Dateien und Airflow-Plugins befinden. Legen Sie nach dem Erstellen des Verzeichnisses die Umgebungsvariable fest AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Der nächste Schritt besteht darin, den Befehl auszuführen, der die Datenflussdatenbank in SQLite erstellt und initialisiert:

(venv) $ airflow initdb

Die Datenbank wird in erstellt airflow.db Default.

Überprüfen Sie, ob Airflow installiert ist:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Wenn der Befehl funktioniert hat, hat Airflow eine eigene Konfigurationsdatei erstellt airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow verfügt über eine Weboberfläche. Es kann durch Ausführen des folgenden Befehls gestartet werden:

(venv) $ airflow webserver --port 8081

Sie können jetzt in einem Browser über Port 8081 auf dem Host, auf dem Airflow ausgeführt wurde, wie folgt auf die Weboberfläche zugreifen: <hostname:8081>.

2. Arbeiten mit der experimentellen API

Daraufhin ist Airflow konfiguriert und betriebsbereit. Allerdings müssen wir auch die experimentelle API ausführen. Unsere Prüfer sind in Python geschrieben, sodass alle Anfragen weiterhin über die Bibliothek darauf erfolgen requests.

Tatsächlich funktioniert die API bereits für einfache Anfragen. Mit einer solchen Anfrage können Sie beispielsweise die Funktion testen:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Wenn Sie als Antwort eine solche Nachricht erhalten haben, bedeutet das, dass alles funktioniert.

Wenn wir jedoch einen DAG auslösen wollen, stoßen wir auf die Tatsache, dass eine solche Anfrage nicht ohne Authentifizierung gestellt werden kann.

Dazu müssen Sie eine Reihe von Aktionen ausführen.

Zuerst müssen Sie Folgendes zur Konfiguration hinzufügen:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Anschließend müssen Sie Ihren Benutzer mit Administratorrechten erstellen:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Als Nächstes müssen Sie einen Benutzer mit normalen Rechten erstellen, der einen DAG-Auslöser ausführen darf.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Jetzt ist alles fertig.

3. Starten einer POST-Anfrage

Die POST-Anfrage selbst sieht folgendermaßen aus:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Anfrage erfolgreich bearbeitet.

Dementsprechend geben wir der DAG dann etwas Zeit zur Verarbeitung und stellen eine Anfrage an die ClickHouse-Tabelle, um das Kontrolldatenpaket abzufangen.

Verifizierung abgeschlossen.

Source: habr.com

Kommentar hinzufügen