Як зробити тригер DAG'а в Airflow, використовуючи Experimental API

Під час підготовки наших освітніх програм ми періодично стикаємося зі складнощами з погляду роботи з деякими інструментами. І на той момент, коли ми з ними зіштовхуємось, не завжди є достатньо документації та статей, які б допомогли з цією проблемою впоратися.

Так було, наприклад, у 2015 році і ми на програмі "Спеціаліст за великими даними" користувалися Hadoop-кластером зі Spark на 35 одночасних користувачів. Як його готувати під такий юзкейс із використанням YARN, було незрозуміло. У результаті, розібравшись та пройшовши шлях самостійно, зробили пост на Хабрі і ще виступили на Moscow Spark Meetup.

Передісторія

На цей раз мова піде про іншу програму – Інженер даних. На ній наші учасники будують два типи архітектури: lambda та kappa. І в lamdba-архітектурі в рамках батч-обробки використовується Airflow для перекладання логів із HDFS у ClickHouse.

Все загалом добре. Нехай будують свої пайплайни. Проте є «але»: всі наші програми технологічні з погляду самого процесу навчання. Для перевірки лаб ми використовуємо автоматичні чекери: учаснику потрібно зайти в особистий кабінет, натиснути кнопку “Перевірити”, і через якийсь час він бачить якийсь розширений зворотний зв'язок на те, що зробив. І саме зараз ми починаємо підходити до нашої проблеми.

Перевірка цієї лаби влаштована так: ми надсилаємо контрольний пакет даних у Kafka учасника, далі Gobblin перекладає цей пакет даних на HDFS, далі Airflow бере цей пакет даних і кладе ClickHouse. Фішка в тому, що Airflow не повинен це робити в реал-таймі, він це робить за розкладом: раз на 15 хвилин бере пачку файлів і закидає.

Виходить, що нам потрібно якось тригерити їх DAG самостійно на нашу вимогу під час роботи чекера тут і зараз. Погугливши, з'ясували, що для пізніх версій Airflow існує так званий Experimental API. слово experimental, звичайно, звучить лякаюче, але що робити… Раптом злетить.

Далі опишемо весь шлях: від установки Airflow до формування POST-запиту, який тригеріт DAG, використовуючи Experimental API. Працюватимемо з Ubuntu 16.04.

1. Встановлення Airflow

Перевіримо, що у нас є Python 3 і virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Якщо чогось із цього немає, то встановіть.

Тепер створимо каталог, у якому далі працюватимемо з Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Встановимо Airflow:

(venv) $ pip install airflow

Версія, на якій ми працювали: 1.10.

Тепер нам потрібно створити каталог airflow_home, де будуть розташовуватися DAG-файли та плагіни Airflow. Після створення каталогу встановимо змінне середовище AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Наступний крок - виконаємо команду, яка буде створювати та ініціалізувати базу даних потоку даних у SQLite:

(venv) $ airflow initdb

База даних буде створена в airflow.db за замовчуванням.

Перевіримо чи встановився Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Якщо команда відпрацювала, Airflow створив свій конфігураційний файл airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow має веб-інтерфейс. Його можна запустити, виконавши команду:

(venv) $ airflow webserver --port 8081

Тепер ви можете потрапити до веб-інтерфейсу в браузері на порту 8081 на хості, де Airflow був запущений, наприклад: <hostname:8081>.

2. Робота з Experimental API

На цьому Airflow налаштований та готовий до роботи. Тим не менш, нам потрібно запустити ще й Experimental API. Наші чекери написані на Python, тому всі запити будуть на ньому з використанням бібліотеки. requests.

Насправді, API вже працює для простих запитів. Наприклад, такий запит дозволяє потішити його роботу:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Якщо отримали таке повідомлення у відповідь, це означає, що все працює.

Однак, коли ми захочемо затригерувати DAG, то зіткнемося з тим, що цей вид запиту не можна зробити без автентифікації.

Для цього потрібно буде зробити ще низку дій.

По-перше, до конфіг потрібно додати це:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Потім потрібно створити свого користувача з адмінськими правами:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Потім потрібно створити користувача зі звичайними правами, якому буде дозволено робити тригер DAG'а.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Тепер усе готове.

3. Запуск POST-запиту

Сам POST-запит буде виглядати так:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Запит успішно оброблений.

Відповідно, далі ми даємо якийсь час DAG'у на обробку та робимо запит до таблиці ClickHouse, намагаючись зловити контрольний пакет даних.

Перевірку завершено.

Джерело: habr.com

Додати коментар або відгук