Під час підготовки наших освітніх програм ми періодично стикаємося зі складнощами з погляду роботи з деякими інструментами. І на той момент, коли ми з ними зіштовхуємось, не завжди є достатньо документації та статей, які б допомогли з цією проблемою впоратися.
Так було, наприклад, у 2015 році і ми на програмі "Спеціаліст за великими даними" користувалися Hadoop-кластером зі Spark на 35 одночасних користувачів. Як його готувати під такий юзкейс із використанням YARN, було незрозуміло. У результаті, розібравшись та пройшовши шлях самостійно, зробили
Передісторія
На цей раз мова піде про іншу програму –
Все загалом добре. Нехай будують свої пайплайни. Проте є «але»: всі наші програми технологічні з погляду самого процесу навчання. Для перевірки лаб ми використовуємо автоматичні чекери: учаснику потрібно зайти в особистий кабінет, натиснути кнопку “Перевірити”, і через якийсь час він бачить якийсь розширений зворотний зв'язок на те, що зробив. І саме зараз ми починаємо підходити до нашої проблеми.
Перевірка цієї лаби влаштована так: ми надсилаємо контрольний пакет даних у Kafka учасника, далі Gobblin перекладає цей пакет даних на HDFS, далі Airflow бере цей пакет даних і кладе ClickHouse. Фішка в тому, що Airflow не повинен це робити в реал-таймі, він це робить за розкладом: раз на 15 хвилин бере пачку файлів і закидає.
Виходить, що нам потрібно якось тригерити їх DAG самостійно на нашу вимогу під час роботи чекера тут і зараз. Погугливши, з'ясували, що для пізніх версій Airflow існує так званий experimental
, звичайно, звучить лякаюче, але що робити… Раптом злетить.
Далі опишемо весь шлях: від установки Airflow до формування POST-запиту, який тригеріт DAG, використовуючи Experimental API. Працюватимемо з Ubuntu 16.04.
1. Встановлення Airflow
Перевіримо, що у нас є Python 3 і virtualenv.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Якщо чогось із цього немає, то встановіть.
Тепер створимо каталог, у якому далі працюватимемо з Airflow.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Встановимо Airflow:
(venv) $ pip install airflow
Версія, на якій ми працювали: 1.10.
Тепер нам потрібно створити каталог airflow_home
, де будуть розташовуватися DAG-файли та плагіни Airflow. Після створення каталогу встановимо змінне середовище AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Наступний крок - виконаємо команду, яка буде створювати та ініціалізувати базу даних потоку даних у SQLite:
(venv) $ airflow initdb
База даних буде створена в airflow.db
за замовчуванням.
Перевіримо чи встановився Airflow:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Якщо команда відпрацювала, Airflow створив свій конфігураційний файл airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow має веб-інтерфейс. Його можна запустити, виконавши команду:
(venv) $ airflow webserver --port 8081
Тепер ви можете потрапити до веб-інтерфейсу в браузері на порту 8081 на хості, де Airflow був запущений, наприклад: <hostname:8081>
.
2. Робота з Experimental API
На цьому Airflow налаштований та готовий до роботи. Тим не менш, нам потрібно запустити ще й Experimental API. Наші чекери написані на Python, тому всі запити будуть на ньому з використанням бібліотеки. requests
.
Насправді, API вже працює для простих запитів. Наприклад, такий запит дозволяє потішити його роботу:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Якщо отримали таке повідомлення у відповідь, це означає, що все працює.
Однак, коли ми захочемо затригерувати DAG, то зіткнемося з тим, що цей вид запиту не можна зробити без автентифікації.
Для цього потрібно буде зробити ще низку дій.
По-перше, до конфіг потрібно додати це:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Потім потрібно створити свого користувача з адмінськими правами:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Потім потрібно створити користувача зі звичайними правами, якому буде дозволено робити тригер DAG'а.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Тепер усе готове.
3. Запуск POST-запиту
Сам POST-запит буде виглядати так:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
Запит успішно оброблений.
Відповідно, далі ми даємо якийсь час DAG'у на обробку та робимо запит до таблиці ClickHouse, намагаючись зловити контрольний пакет даних.
Перевірку завершено.
Джерело: habr.com