Hoe kinne jo in DAG-trigger meitsje yn Airflow mei de eksperimintele API

При подготовке наших образовательных программ мы периодически сталкиваемся со сложностями с точки зрения работы с некоторыми инструментами. И на тот момент, когда мы с ними сталикваемся, не всегда есть достаточно документации и статей, которые помогли бы с этой проблемой справиться.

Dit wie bygelyks it gefal yn 2015, en tidens it programma "Big Data Specialist" brûkten wy in Hadoop-kluster mei Spark foar 35 simultane brûkers. It wie net dúdlik hoe't it tariede op sa'n gebrûk gefal mei YARN. Uteinlik hawwe wy it útfûn en it paad op ús eigen rûnen, diene wy post op Habré en ek útfierd by Moskou Spark Meetup.

prehistoarje

Dizze kear sille wy prate oer in oar programma - Data Engineer. На ней наши участники строят два типа архитектуры: lambda и kappa. И в lamdba-архитектуре в рамках батч-обработки используется Airflow для перекладывания логов из HDFS в ClickHouse.

Alles is oer it algemien goed. Lit se har eigen pipelines bouwe. D'r is lykwols in "mar": al ús programma's binne technologysk avansearre út it eachpunt fan it learproses sels. Om it laboratoarium te kontrolearjen, brûke wy automatyske checkers: de dielnimmer moat nei syn persoanlike akkount gean, klikje op de knop "Kontrolearje", en nei in skoft sjocht hy in soarte fan útwreide feedback oer wat hy dien hat. En it is op dit stuit dat wy begjinne te benaderjen ús probleem.

De ferifikaasje fan dit laboratoarium is sa strukturearre: wy stjoere in kontrôlegegevenspakket nei de Kafka fan 'e dielnimmer, dan draacht Gobblin dit gegevenspakket oer nei HDFS, dan nimt Airflow dit gegevenspakket en set it yn ClickHouse. De trúk is dat Airflow dit net yn echte tiid hoecht te dwaan, it docht it neffens in skema: elke 15 minuten nimt it in bosk bestannen en uploadt se.

Получается, что нам нужно как-то триггерить их DAG самостоятельно по нашему требованию во время работы чекера здесь и сейчас. Погуглив, выяснили, что для поздних версий Airflow существует так называемый Eksperimintele API. Wurd experimental, fansels, it klinkt eng, mar wat te dwaan ... Ynienen nimt it ôf.

Далее опишем весь путь: от установки Airflow до формирования POST-запроса, который триггерит DAG, используя Experimental API. Работать будем с Ubuntu 16.04.

1. Airflow ynstallaasje

Litte wy kontrolearje dat wy Python 3 en virtualenv hawwe.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

As ien fan dit ûntbrekt, ynstallearje it dan.

Litte wy no in map meitsje wêryn wy sille trochgean te wurkjen mei Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Ynstallearje Airflow:

(venv) $ pip install airflow

De ferzje dêr't wy wurke oan: 1.10.

No moatte wy in map meitsje airflow_home, где будут располагаться DAG-файлы и плагины Airflow. После создания каталога установим переменную среды AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Следующий шаг — выполним команду, которая будет создавать и инициализировать базу данных потока данных в SQLite:

(venv) $ airflow initdb

De databank sil oanmakke wurde yn airflow.db standert.

Litte wy kontrolearje as Airflow is ynstalleare:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

As it kommando wurke, dan makke Airflow in eigen konfiguraasjetriem airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow hat in webynterface. It kin wurde lansearre troch it kommando út te fieren:

(venv) $ airflow webserver --port 8081

Теперь вы можете попасть в веб-интерфейс в браузере на порту 8081 на хосте, где Airflow был запущен, например: <hostname:8081>.

2. Wurkje mei Experimental API

На этом Airflow настроен и готов к работе. Тем не менее, нам нужно запустить еще и Experimental API. Наши чекеры написаны на Python, поэтому далее все запросы будут на нем с использованием библиотеки requests.

Eins wurket de API al foar ienfâldige oanfragen. Dit fersyk lit jo bygelyks de wurking testje:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

As jo ​​sa'n berjocht krije as antwurd, betsjut dat dat alles wurket.

Однако, когда мы захотим затригерить DAG, то столкнемся с тем, что этот вид запроса нельзя сделать без аутентификации.

Om dit te dwaan, moatte jo in oantal mear stappen dwaan.

Earst moatte jo dit tafoegje oan de konfiguraasje:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Dan moatte jo jo brûker oanmeitsje mei adminrjochten:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Folgjende moatte jo in brûker meitsje mei normale rjochten dy't de DAG kinne triggerje.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

No is alles klear.

3. Starte in POST fersyk

It POST-fersyk sels sil der sa útsjen:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

It fersyk is mei súkses ferwurke.

Соответственно, далее мы даем какое-то время DAG’у на обработку и делаем запрос в таблицу ClickHouse, пытаясь поймать контрольный пакет данных.

Kontrolearje klear.

Boarne: www.habr.com

Add a comment