Как сделать триггер DAG’а в Airflow, используя Experimental API

При подготовке наших образовательных программ мы периодически сталкиваемся со сложностями с точки зрения работы с некоторыми инструментами. И на тот момент, когда мы с ними сталикваемся, не всегда есть достаточно документации и статей, которые помогли бы с этой проблемой справиться.

Так было, например, в 2015 году и мы на программе “Специалист по большим данным” пользовались Hadoop-кластером со Spark на 35 одновременных пользователей. Как его готовить под такой юзкейс с использованием YARN, было непонятно. В итоге, разобравшись и пройдя путь самостоятельно, сделали пост на Хабре и еще выступили на Moscow Spark Meetup.

Предыстория

В этот раз речь пойдет о другой программе – Data Engineer. На ней наши участники строят два типа архитектуры: lambda и kappa. И в lamdba-архитектуре в рамках батч-обработки используется Airflow для перекладывания логов из HDFS в ClickHouse.

Все в общем-то хорошо. Пусть строят свои пайплайны. Однако, есть «но»: все наши программы технологичны с точки зрения самого процесса обучения. Для проверки лаб мы используем автоматические чекеры: участнику нужно зайти в личный кабинет, нажать кнопку “Проверить”, и через какое-то время он видит какую-то расширенную обратную связь на то, что сделал. И именно в этот момент мы начинаем подходить к нашей проблеме.

Проверка этой лабы устроена так: мы посылаем контрольный пакет данных в Kafka участника, далее Gobblin перекладывает этот пакет данных на HDFS, далее Airflow берет этот пакет данных и кладет в ClickHouse. Фишка в том, что Airflow не должен это делать в реал-тайме, он это делает по расписанию: раз в 15 минут берет пачку файлов и закидывает.

Получается, что нам нужно как-то триггерить их DAG самостоятельно по нашему требованию во время работы чекера здесь и сейчас. Погуглив, выяснили, что для поздних версий Airflow существует так называемый Experimental API. Слово experimental, конечно, звучит пугающе, но что делать… Вдруг взлетит.

Далее опишем весь путь: от установки Airflow до формирования POST-запроса, который триггерит DAG, используя Experimental API. Работать будем с Ubuntu 16.04.

1. Установка Airflow

Проверим, что у нас стоит Python 3 и virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Если чего-то из этого нет, то установите.

Теперь создадим каталог, в котором будем дальше работать с Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Установим Airflow:

(venv) $ pip install airflow

Версия, на которой работали мы: 1.10.

Теперь нам нужно создать каталог airflow_home, где будут располагаться DAG-файлы и плагины Airflow. После создания каталога установим переменную среды AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Следующий шаг — выполним команду, которая будет создавать и инициализировать базу данных потока данных в SQLite:

(venv) $ airflow initdb

База данных будет создана в airflow.db по умолчанию.

Проверим установился ли Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Если команда отработала, то Airflow создал свой конфигурационный файл airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

У Airflow есть веб-интерфейс. Его можно запустить, выполнив команду:

(venv) $ airflow webserver --port 8081

Теперь вы можете попасть в веб-интерфейс в браузере на порту 8081 на хосте, где Airflow был запущен, например: <hostname:8081>.

2. Работа с Experimental API

На этом Airflow настроен и готов к работе. Тем не менее, нам нужно запустить еще и Experimental API. Наши чекеры написаны на Python, поэтому далее все запросы будут на нем с использованием библиотеки requests.

На самом деле API уже работает для простых запросов. Например, такой запрос позволяет потестить его работу:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Если получили такое сообщение в ответ, то это значит, что все работает.

Однако, когда мы захотим затригерить DAG, то столкнемся с тем, что этот вид запроса нельзя сделать без аутентификации.

Для этого нужно будет проделать еще ряд действий.

Во-первых, в конфиг нужно добавить это:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Затем, нужно создать своего пользователя с админскими правами:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Затем, нужно создать пользователя с обычными правами, которому будет разрешено делать триггер DAG’а.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Теперь все готово.

3. Запуск POST-запроса

Сам POST-запрос будет выглядет так:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Запрос обработан успешно.

Соответственно, далее мы даем какое-то время DAG’у на обработку и делаем запрос в таблицу ClickHouse, пытаясь поймать контрольный пакет данных.

Проверка завершена.

Источник: habr.com