Comment créer un déclencheur DAG dans Airflow à l'aide de l'API expérimentale

Lors de la préparation de nos programmes pédagogiques, nous rencontrons périodiquement des difficultés pour travailler avec certains outils. Et au moment où nous les rencontrons, il n'y a pas toujours assez de documentation et d'articles qui aideraient à faire face à ce problème.

C'était donc par exemple en 2015, et nous avons utilisé le cluster Hadoop avec Spark pour 35 utilisateurs simultanés sur le programme Big Data Specialist. Il n'était pas clair comment le préparer pour un tel cas d'utilisation en utilisant YARN. En conséquence, après avoir compris et parcouru le chemin par eux-mêmes, ils ont fait poster sur Habré et également réalisé Meetup Moscou Spark.

Préhistoire

Cette fois, nous allons parler d'un programme différent - Ingénieur de données. Sur celle-ci, nos participants construisent deux types d'architecture : lambda et kappa. Et dans l'architecture lamdba, Airflow est utilisé dans le cadre du traitement par lots pour transférer les journaux de HDFS vers ClickHouse.

Tout est généralement bon. Laissez-les construire leurs pipelines. Cependant, il y a un "mais": tous nos programmes sont technologiquement avancés en termes de processus d'apprentissage lui-même. Pour vérifier le laboratoire, nous utilisons des vérificateurs automatiques : le participant doit se rendre sur son compte personnel, cliquer sur le bouton "Vérifier", et après un certain temps, il voit une sorte de rétroaction étendue sur ce qu'il a fait. Et c'est à ce point que nous commençons à aborder notre problème.

La vérification de ce laboratoire est organisée comme suit : nous envoyons un paquet de données de contrôle au Kafka du participant, puis Gobblin transfère ce paquet de données à HDFS, puis Airflow prend ce paquet de données et le place dans ClickHouse. L'astuce est qu'Airflow n'a pas à le faire en temps réel, il le fait dans les délais : une fois toutes les 15 minutes, il prend un tas de fichiers et les télécharge.

Il s'avère que nous devons en quelque sorte déclencher leur DAG par nous-mêmes à notre demande pendant que le vérificateur s'exécute ici et maintenant. En cherchant sur Google, nous avons découvert que pour les versions ultérieures d'Airflow, il existe un soi-disant API expérimentale. mot experimental, bien sûr, cela semble effrayant, mais que faire ... Il décolle soudainement.

Ensuite, nous décrirons tout le chemin : de l'installation d'Airflow à la génération d'une requête POST qui déclenche un DAG à l'aide de l'API expérimentale. Nous travaillerons avec Ubuntu 16.04.

1. Installation du flux d'air

Vérifions que nous avons Python 3 et virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Si l'un d'entre eux manque, installez-le.

Créons maintenant un répertoire dans lequel nous allons continuer à travailler avec Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installez le flux d'air :

(venv) $ pip install airflow

Version sur laquelle nous avons travaillé : 1.10.

Maintenant, nous devons créer un répertoire airflow_home, où se trouveront les fichiers DAG et les plug-ins Airflow. Après avoir créé le répertoire, définissez la variable d'environnement AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

L'étape suivante consiste à exécuter la commande qui créera et initialisera la base de données de flux de données dans SQLite :

(venv) $ airflow initdb

La base de données sera créée en airflow.db par défaut.

Vérifiez si Airflow est installé :

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Si la commande a fonctionné, alors Airflow a créé son propre fichier de configuration airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow dispose d'une interface web. Il peut être lancé en exécutant la commande :

(venv) $ airflow webserver --port 8081

Vous pouvez désormais accéder à l'interface Web dans un navigateur sur le port 8081 de l'hôte sur lequel Airflow s'exécutait, comme ceci : <hostname:8081>.

2. Travailler avec l'API expérimentale

Sur cet Airflow est configuré et prêt à fonctionner. Cependant, nous devons également exécuter l'API expérimentale. Nos vérificateurs sont écrits en Python, donc toutes les requêtes seront dessus en utilisant la bibliothèque requests.

En fait, l'API fonctionne déjà pour des requêtes simples. Par exemple, une telle requête vous permet de tester son fonctionnement :

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Si vous avez reçu un tel message en réponse, cela signifie que tout fonctionne.

Cependant, lorsque l'on souhaite déclencher un DAG, on se heurte au fait que ce type de requête ne peut se faire sans authentification.

Pour ce faire, vous devrez effectuer un certain nombre d'actions.

Tout d'abord, vous devez ajouter ceci à la configuration :

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Ensuite, vous devez créer votre utilisateur avec des droits d'administrateur :

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ensuite, vous devez créer un utilisateur avec des droits normaux qui seront autorisés à déclencher un DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Maintenant tout est prêt.

3. Lancer une requête POST

La requête POST elle-même ressemblera à ceci :

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Demande traitée avec succès.

En conséquence, nous donnons ensuite au DAG un peu de temps pour traiter et faire une demande à la table ClickHouse, en essayant d'attraper le paquet de données de contrôle.

Vérification terminée.

Source: habr.com

Ajouter un commentaire