Како да направите активирач на DAG во протокот на воздух со помош на Експерименталниот API

При подготовката на нашите образовни програми периодично наидуваме на потешкотии во однос на работата со некои алатки. И во моментот кога ќе ги сретнеме, нема секогаш доволно документација и написи кои би помогнале да се справиме со овој проблем.

Така беше, на пример, во 2015 година и го користевме кластерот Hadoop со Spark за 35 истовремени корисници на програмата Big Data Specialist. Не беше јасно како да се подготви за таков кориснички случај користејќи YARN. Како резултат на тоа, откако сфатиле и сами оделе по патеката, тие го направиле тоа објава на Хабре а исто така изведена Состанок во Москва Спарк.

праисторијата

Овој пат ќе зборуваме за поинаква програма - Инженер за податоци. На него нашите учесници градат два вида архитектура: ламбда и каппа. И во архитектурата lamdba, протокот на воздух се користи како дел од сериската обработка за пренос на дневници од HDFS во ClickHouse.

Се е генерално добро. Нека ги градат своите цевководи. Сепак, постои едно „но“: сите наши програми се технолошки напредни во однос на самиот процес на учење. За да ја провериме лабораторијата, користиме автоматски проверки: учесникот треба да отиде на неговата лична сметка, да кликнете на копчето „Провери“ и по некое време ќе види некаква проширена повратна информација за она што го направил. И токму во овој момент почнуваме да му пристапуваме на нашиот проблем.

Проверката на оваа лабораторија е организирана на следниов начин: испраќаме контролен пакет со податоци до Кафка на учесникот, потоа Gobblin го пренесува овој пакет со податоци на HDFS, потоа Airflow го зема овој пакет со податоци и го става во ClickHouse. Трикот е што Airflow не мора да го прави ова во реално време, тоа го прави на распоред: еднаш на секои 15 минути потребни се куп датотеки и ги поставува.

Излегува дека треба некако сами да го активираме нивниот ДАГ на наше барање додека проверката работи овде и сега. Гуглувајќи, дознавме дека за подоцнежните верзии на Airflow постои т.н Експериментално API. Зборот experimental, се разбира, звучи страшно, но што да се прави ... Одеднаш полета.

Следно, ќе ја опишеме целата патека: од инсталирање Airflow до генерирање на барање POST што активира DAG со помош на Експерименталниот API. Ќе работиме со Ubuntu 16.04.

1. Инсталација на проток на воздух

Ајде да провериме дали имаме Python 3 и virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ако недостасува едно од овие, тогаш инсталирајте го.

Сега да создадеме директориум во кој ќе продолжиме да работиме со Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Инсталирајте проток на воздух:

(venv) $ pip install airflow

Верзија на која работевме: 1.10.

Сега треба да создадеме директориум airflow_home, каде што ќе се наоѓаат датотеките DAG и приклучоците за проток на воздух. Откако ќе го креирате директориумот, поставете ја променливата на околината AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Следниот чекор е да се изврши командата што ќе ја креира и иницијализира базата на податоци за проток на податоци во SQLite:

(venv) $ airflow initdb

Базата на податоци ќе биде креирана во airflow.db стандардно.

Проверете дали е инсталиран проток на воздух:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ако командата функционираше, тогаш Airflow создаде своја конфигурациска датотека airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Воздухот има веб-интерфејс. Може да се стартува со извршување на командата:

(venv) $ airflow webserver --port 8081

Сега можете да пристапите до веб-интерфејсот во прелистувачот на портата 8081 на домаќинот каде што работи Airflow, вака: <hostname:8081>.

2. Работа со Експерименталниот API

На овој протокот на воздух е конфигуриран и подготвен да оди. Сепак, треба да го извршиме и Експерименталниот API. Нашите дама се напишани во Python, така што понатаму сите барања ќе бидат на него користејќи ја библиотеката requests.

Всушност, API веќе работи за едноставни барања. На пример, таквото барање ви овозможува да ја тестирате неговата работа:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ако сте добиле таква порака како одговор, тоа значи дека сè работи.

Меѓутоа, кога сакаме да активираме DAG, наидуваме на фактот дека овој вид на барање не може да се направи без автентикација.

За да го направите ова, ќе треба да направите голем број активности.

Прво, треба да го додадете ова во конфигурацијата:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Потоа, треба да го креирате вашиот корисник со администраторски права:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Следно, треба да креирате корисник со нормални права на кој ќе му биде дозволено да направи активирач на DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Сега сè е подготвено.

3. Стартување на барање POST

Самото барање POST ќе изгледа вака:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Барањето е успешно обработено.

Соодветно на тоа, тогаш му даваме на DAG одредено време да обработи и да поднесе барање до табелата ClickHouse, обидувајќи се да го фати контролниот пакет со податоци.

Потврдата е завршена.

Извор: www.habr.com

Додадете коментар