Як зрабіць трыгер DAG'а ў Airflow, выкарыстоўваючы Experimental API

Пры падрыхтоўцы нашых адукацыйных праграм мы перыядычна сутыкаемся са складанасцямі з пункту гледжання працы з некаторымі інструментамі. І на той момант, калі мы з імі сутыкаемся, не заўсёды ёсць дастаткова дакументацыі і артыкулаў, якія дапамаглі б з гэтай праблемай справіцца.

Так было, напрыклад, у 2015 годзе і мы на праграме "Спецыяліст па вялікіх дадзеных" карысталіся Hadoop-кластарам са Spark на 35 адначасовых карыстальнікаў. Як яго рыхтаваць пад такі юзкейс з выкарыстаннем YARN, было незразумела. У выніку, разабраўшыся і прайшоўшы шлях самастойна, зрабілі пост на Хабры і яшчэ выступілі на Moscow Spark Meetup.

перадгісторыя

У гэты раз гаворка пойдзе аб іншай праграме - Інжынер дадзеных. На ёй нашы ўдзельнікі будуюць два тыпы архітэктуры: lambda і kappa. І ў lamdba-архітэктуры ў рамках батч-апрацоўкі выкарыстоўваецца Airflow для перакладання логаў з HDFS у ClickHouse.

Усё ўвогуле добра. Няхай будуюць свае пайплайны. Аднак, ёсць "але": усе нашы праграмы тэхналагічныя з пункту гледжання самога працэсу навучання. Для праверкі лабаў мы выкарыстоўваем аўтаматычныя чэкеры: удзельніку трэба зайсці ў асабісты кабінет, націснуць кнопку “Праверыць”, і праз нейкі час ён бачыць нейкую пашыраную зваротную сувязь на тое, што зрабіў. І менавіта ў гэты момант мы пачынаем падыходзіць да нашай праблемы.

Праверка гэтай лабы ўладкованая так: мы дасылаем кантрольны пакет дадзеных у Kafka ўдзельніка, далей Gobblin перакладае гэты пакет дадзеных на HDFS, далей Airflow бярэ гэты пакет дадзеных і кладзе ў ClickHouse. Фішка ў тым, што Airflow не павінен гэта рабіць у рэал-тайме, ён гэта робіць па раскладзе: раз у 15 хвілін бярэ пачак файлаў і закідвае.

Атрымліваецца, што нам трэба неяк трыгерыць іх DAG самастойна па нашым патрабаванні падчас працы чэкера тут і зараз. Пагугліўшы, высветлілі, што для позніх версій Airflow існуе так званы Experimental API. слова experimental, вядома, гучыць жахліва, але што рабіць… Раптам узляціць.

Далей апішам увесь шлях: ад усталёўкі Airflow да фармавання POST-запыту, які трыгерыт DAG, выкарыстаючы Experimental API. Працаваць будзем з Ubuntu 16.04/XNUMX.

1. Устаноўка Airflow

Праверым, што ў нас стаіць Python 3 і virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Калі чагосьці з гэтага няма, то ўсталюеце.

Цяпер створым каталог, у якім будзем далей працаваць з Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Усталюем Airflow:

(venv) $ pip install airflow

Версія, на якой працавалі мы: 1.10.

Цяпер нам трэба стварыць каталог airflow_home, дзе будуць размяшчацца DAG-файлы і плагіны Airflow. Пасля стварэння каталога ўсталюем зменнае асяроддзі AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Наступны крок - выканальны каманду, якая будзе ствараць і ініцыялізаваць базу дадзеных струменя дадзеных у SQLite:

(venv) $ airflow initdb

База дадзеных будзе створана ў airflow.db па змаўчанні.

Праверым ці ўсталяваўся Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Калі каманда адпрацавала, то Airflow стварыў свой канфігурацыйны файл airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

У Airflow ёсць вэб-інтэрфейс. Яго можна запусціць, выканаўшы каманду:

(venv) $ airflow webserver --port 8081

Цяпер вы можаце патрапіць у вэб-інтэрфейс у браўзэры на порце 8081 на хасце, дзе Airflow быў запушчаны, напрыклад: <hostname:8081>.

2. Праца з Experimental API

На гэтым Airflow наладжаны і готаў да працы. Тым не менш нам трэба запусціць яшчэ і Experimental API. Нашы чэкеры напісаны на Python, таму далей усе запыты будуць на ім з выкарыстаннем бібліятэкі. requests.

Насамрэч API ужо працуе для простых запытаў. Напрыклад, такі запыт дазваляе пацясціць яго працу:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Калі атрымалі такое паведамленьне ў адказ, то гэта значыць, што ўсё працуе.

Аднак, калі мы захочам затрыгерыць DAG, то сутыкнемся з тым, што гэты від запыту нельга зрабіць без аўтэнтыфікацыі.

Для гэтага трэба будзе зрабіць яшчэ шэраг дзеянняў.

Па-першае, у канфіг трэба дадаць гэта:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Затым, трэба стварыць свайго карыстальніка з адмінскімі правамі:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Затым, трэба стварыць карыстальніка са звычайнымі правамі, якому будзе дазволена рабіць трыгер DAG'а.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Цяпер усё гатова.

3. Запуск POST-запыту

Сам POST-запыт будзе выглядаць так:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Запыт апрацаваны паспяхова.

Адпаведна, далей мы даем нейкі час DAG'у на апрацоўку і які робіцца запыт у табліцу ClickHouse, спрабуючы злавіць кантрольны пакет дадзеных.

Праверка завершана.

Крыніца: habr.com

Дадаць каментар