Бо истифода аз API Experimental чӣ гуна триггери DAG-ро дар Airflow сохтан мумкин аст

Ҳангоми таҳияи барномаҳои таълимии худ, мо давра ба давра дар робита ба кор бо асбобҳои муайян ба мушкилот дучор мешавем. Ва дар айни замон, вақте ки мо бо онҳо дучор мешавем, на ҳама вақт ҳуҷҷатҳо ва мақолаҳои кофӣ мавҷуданд, ки ба мо дар ҳалли ин мушкилот кӯмак мекунанд.

Ин, масалан, дар соли 2015 буд ва дар ҷараёни барномаи "Мутахассиси маълумоти калон" мо кластери Hadoop бо Spark барои 35 корбарони ҳамзамон истифода кардем. Маълум набуд, ки чӣ гуна онро барои чунин парванда бо истифода аз YARN омода кардан лозим аст. Ниҳоят, мо онро муайян карда, худамон роҳро пеш гирифтем дар Habré нашр кунед ва инчунин дар Вохурии «Шораи Москва»..

prehistory

Ин дафъа мо дар бораи барномаи дигар сӯҳбат хоҳем кард - Муҳандиси маълумот. Иштироккунандагони мо дар болои он ду намуди меъморй месозанд: ламбда ва каппа. Ва дар меъмории lamdba, ҳамчун як қисми коркарди партия, Airflow барои интиқоли гузоришҳо аз HDFS ба ClickHouse истифода мешавад.

Ҳама чиз умуман хуб аст. Бигузор худашон трубопровод созанд. Аммо як «аммо» вуҷуд дорад: тамоми барномаҳои мо аз нуқтаи назари худи раванди таълим аз ҷиҳати технологӣ пешрафтаанд. Барои санҷидани лаборатория мо аз чекҳои автоматӣ истифода мебарем: иштирокчӣ бояд ба ҳисоби шахсии худ равад, тугмаи "Тафтиш" -ро пахш кунад ва пас аз чанд вақт ӯ дар бораи коре, ки ӯ кардааст, як навъ фикру мулоҳизаҳои васеъро мебинад. Ва маҳз дар ҳамин лаҳза мо ба мушкили худ наздик мешавем.

Санҷиши ин лаборатория чунин сохта шудааст: мо як бастаи додаҳои идоракуниро ба Кафкаи иштирокчӣ мефиристем, пас Gobblin ин бастаи маълумотро ба HDFS интиқол медиҳад, пас Airflow ин бастаи маълумотро гирифта, дар ClickHouse мегузорад. Ҳилла дар он аст, ки Airflow набояд ин корро дар вақти воқеӣ анҷом диҳад, онро мувофиқи ҷадвал иҷро мекунад: ҳар 15 дақиқа як даста файлҳоро мегирад ва онҳоро бор мекунад.

Маълум мешавад, ки мо бояд бо дархости мо худамон DAG-и онҳоро ангеза диҳем, вақте ки чек дар ин ҷо ва ҳоло кор мекунад. Пас аз гуглинг, мо фаҳмидем, ки барои версияҳои минбаъдаи Airflow ба ном мавҷуд аст API-и таҷрибавӣ. Калима experimental, албатта, дахшатангез ба назар мерасад, вале чй бояд кард... Ногахон ба хаво мебарояд.

Минбаъд, мо тамоми роҳро тавсиф хоҳем кард: аз насби Airflow то тавлиди дархости POST, ки DAG-ро бо истифода аз API Experimental ангеза медиҳад. Мо бо Ubuntu 16.04 кор хоҳем кард.

1. Насби ҷараёни ҳаво

Биёед тафтиш кунем, ки мо Python 3 ва virtualenv дорем.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Агар яке аз инҳо мавҷуд набошад, онро насб кунед.

Акнун биёед директорияеро эҷод кунем, ки дар он мо кор бо Airflow идома хоҳем дод.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Ҳаворо насб кунед:

(venv) $ pip install airflow

Версияе, ки мо дар он кор кардем: 1.10.

Акнун мо бояд директория эҷод кунем airflow_home, ки дар он файлҳои DAG ва плагинҳои Airflow ҷойгир хоҳанд шуд. Пас аз сохтани директория, тағирёбандаи муҳити атрофро таъин кунед AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Қадами навбатӣ иҷро кардани фармонест, ки дар SQLite махзани маълумотро эҷод ва оғоз мекунад:

(venv) $ airflow initdb

Дар базаи маълумотҳо таъсис дода мешавад airflow.db пешфарз

Биёед тафтиш кунем, ки оё Airflow насб шудааст:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Агар фармон кор мекард, пас Airflow файли конфигуратсияи худро эҷод кард airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow дорои интерфейси веб мебошад. Онро тавассути иҷро кардани фармон оғоз кардан мумкин аст:

(venv) $ airflow webserver --port 8081

Шумо ҳоло метавонед интерфейси вебро дар браузери бандари 8081 дар ҳост, ки Airflow кор мекард, пахш кунед, масалан: <hostname:8081>.

2. Кор бо API Experimental

Дар ин лаҳза, Airflow танзим карда шудааст ва барои рафтан омода аст. Аммо, мо инчунин бояд API-и Experimental -ро иҷро кунем. Шашкаҳои мо дар Python навишта шудаанд, аз ин рӯ минбаъд ҳама дархостҳо бо истифода аз китобхона дар он хоҳанд буд requests.

Дар асл, API аллакай барои дархостҳои оддӣ кор мекунад. Масалан, ин дархост ба шумо имкон медиҳад, ки кори онро санҷед:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Агар шумо дар ҷавоб чунин паём гиред, ин маънои онро дорад, ки ҳама чиз кор мекунад.

Аммо, вақте ки мо мехоҳем, ки DAG-ро оғоз кунем, мо бо он дучор мешавем, ки ин навъи дархостро бе аутентификатсия иҷро кардан мумкин нест.

Барои ин ба шумо лозим меояд, ки як қатор қадамҳои дигарро иҷро кунед.

Аввалан, шумо бояд инро ба конфигуратсия илова кунед:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Пас, шумо бояд корбари худро бо ҳуқуқҳои администратор эҷод кунед:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Баъдан, шумо бояд корбареро бо ҳуқуқҳои муқаррарӣ эҷод кунед, ки ба он иҷозат дода мешавад, ки DAG-ро оғоз кунад.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Акнун ҳама чиз тайёр аст.

3. Дархости POST-ро оғоз кунед

Худи дархости POST чунин хоҳад буд:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Дархост бомуваффақият коркард карда шуд.

Мутаносибан, мо ба DAG чанд вақт медиҳем, то ба ҷадвали ClickHouse муроҷиат кунад ва дархост кунад, то бастаи додаҳои идоракуниро дастгир кунад.

Санҷиш анҷом ёфт.

Манбаъ: will.com

Илова Эзоҳ