При подготовке наших образовательных программ мы периодически сталкиваемся со сложностями с точки зрения работы с некоторыми инструментами. И на тот момент, когда мы с ними сталикваемся, не всегда есть достаточно документации и статей, которые помогли бы с этой проблемой справиться.
Dit wie bygelyks it gefal yn 2015, en tidens it programma "Big Data Specialist" brûkten wy in Hadoop-kluster mei Spark foar 35 simultane brûkers. It wie net dúdlik hoe't it tariede op sa'n gebrûk gefal mei YARN. Uteinlik hawwe wy it útfûn en it paad op ús eigen rûnen, diene wy
prehistoarje
Dizze kear sille wy prate oer in oar programma -
Alles is oer it algemien goed. Lit se har eigen pipelines bouwe. D'r is lykwols in "mar": al ús programma's binne technologysk avansearre út it eachpunt fan it learproses sels. Om it laboratoarium te kontrolearjen, brûke wy automatyske checkers: de dielnimmer moat nei syn persoanlike akkount gean, klikje op de knop "Kontrolearje", en nei in skoft sjocht hy in soarte fan útwreide feedback oer wat hy dien hat. En it is op dit stuit dat wy begjinne te benaderjen ús probleem.
De ferifikaasje fan dit laboratoarium is sa strukturearre: wy stjoere in kontrôlegegevenspakket nei de Kafka fan 'e dielnimmer, dan draacht Gobblin dit gegevenspakket oer nei HDFS, dan nimt Airflow dit gegevenspakket en set it yn ClickHouse. De trúk is dat Airflow dit net yn echte tiid hoecht te dwaan, it docht it neffens in skema: elke 15 minuten nimt it in bosk bestannen en uploadt se.
Получается, что нам нужно как-то триггерить их DAG самостоятельно по нашему требованию во время работы чекера здесь и сейчас. Погуглив, выяснили, что для поздних версий Airflow существует так называемый experimental
, fansels, it klinkt eng, mar wat te dwaan ... Ynienen nimt it ôf.
Далее опишем весь путь: от установки Airflow до формирования POST-запроса, который триггерит DAG, используя Experimental API. Работать будем с Ubuntu 16.04.
1. Airflow ynstallaasje
Litte wy kontrolearje dat wy Python 3 en virtualenv hawwe.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
As ien fan dit ûntbrekt, ynstallearje it dan.
Litte wy no in map meitsje wêryn wy sille trochgean te wurkjen mei Airflow.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Ynstallearje Airflow:
(venv) $ pip install airflow
De ferzje dêr't wy wurke oan: 1.10.
No moatte wy in map meitsje airflow_home
, где будут располагаться DAG-файлы и плагины Airflow. После создания каталога установим переменную среды AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Следующий шаг — выполним команду, которая будет создавать и инициализировать базу данных потока данных в SQLite:
(venv) $ airflow initdb
De databank sil oanmakke wurde yn airflow.db
standert.
Litte wy kontrolearje as Airflow is ynstalleare:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
As it kommando wurke, dan makke Airflow in eigen konfiguraasjetriem airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow hat in webynterface. It kin wurde lansearre troch it kommando út te fieren:
(venv) $ airflow webserver --port 8081
Теперь вы можете попасть в веб-интерфейс в браузере на порту 8081 на хосте, где Airflow был запущен, например: <hostname:8081>
.
2. Wurkje mei Experimental API
На этом Airflow настроен и готов к работе. Тем не менее, нам нужно запустить еще и Experimental API. Наши чекеры написаны на Python, поэтому далее все запросы будут на нем с использованием библиотеки requests
.
Eins wurket de API al foar ienfâldige oanfragen. Dit fersyk lit jo bygelyks de wurking testje:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
As jo sa'n berjocht krije as antwurd, betsjut dat dat alles wurket.
Однако, когда мы захотим затригерить DAG, то столкнемся с тем, что этот вид запроса нельзя сделать без аутентификации.
Om dit te dwaan, moatte jo in oantal mear stappen dwaan.
Earst moatte jo dit tafoegje oan de konfiguraasje:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Dan moatte jo jo brûker oanmeitsje mei adminrjochten:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Folgjende moatte jo in brûker meitsje mei normale rjochten dy't de DAG kinne triggerje.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
No is alles klear.
3. Starte in POST fersyk
It POST-fersyk sels sil der sa útsjen:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
It fersyk is mei súkses ferwurke.
Соответственно, далее мы даем какое-то время DAG’у на обработку и делаем запрос в таблицу ClickHouse, пытаясь поймать контрольный пакет данных.
Kontrolearje klear.
Boarne: www.habr.com