Как да направите DAG тригер във Airflow с помощта на експерименталния API

При изготвянето на нашите образователни програми периодично срещаме трудности по отношение на работата с някои инструменти. И в момента, когато се сблъскаме с тях, не винаги има достатъчно документация и статии, които биха помогнали да се справим с този проблем.

Така беше например през 2015 г. и използвахме клъстера Hadoop със Spark за 35 едновременни потребители в програмата Big Data Specialist. Не беше ясно как да го подготвим за такъв потребителски случай с помощта на YARN. В резултат на това, след като разбраха и тръгнаха по пътя сами, те го направиха публикувайте на Хабре а също и изпълнени Москва Spark Meetup.

праистория

Този път ще говорим за различна програма - Инженер на данни. На него нашите участници изграждат два вида архитектура: ламбда и капа. А в архитектурата на lamdba Airflow се използва като част от пакетната обработка за прехвърляне на регистрационни файлове от HDFS към ClickHouse.

Като цяло всичко е добре. Нека си строят тръбопроводите. Има обаче едно „но“: всички наши програми са технологично напреднали по отношение на самия процес на обучение. За да проверим лабораторията, ние използваме автоматични проверки: участникът трябва да отиде в личния си акаунт, да щракнете върху бутона „Проверка“ и след известно време той вижда някаква разширена обратна връзка за това, което е направил. И точно в този момент започваме да подхождаме към нашия проблем.

Проверката на тази лаборатория е подредена по следния начин: изпращаме контролен пакет данни до Kafka на участника, след това Gobblin прехвърля този пакет данни към HDFS, след което Airflow взема този пакет данни и го поставя в ClickHouse. Номерът е, че Airflow не трябва да прави това в реално време, той го прави по график: веднъж на всеки 15 минути взема куп файлове и ги качва.

Оказва се, че трябва по някакъв начин да задействаме техния DAG сами по наша заявка, докато проверката работи тук и сега. Търсейки в гугъл, разбрахме, че за по-късните версии на Airflow има т.нар Експериментален API, Думата experimental, разбира се, звучи страшно, но какво да се прави ... Изведнъж излита.

След това ще опишем целия път: от инсталирането на Airflow до генерирането на POST заявка, която задейства DAG с помощта на Experimental API. Ще работим с Ubuntu 16.04.

1. Инсталация за въздушен поток

Нека проверим дали имаме Python 3 и virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ако някой от тях липсва, инсталирайте го.

Сега нека създадем директория, в която ще продължим да работим с Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Инсталирайте Airflow:

(venv) $ pip install airflow

Версия, върху която работихме: 1.10.

Сега трябва да създадем директория airflow_home, където ще се намират DAG файловете и добавките Airflow. След като създадете директорията, задайте променливата на средата AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Следващата стъпка е да изпълните командата, която ще създаде и инициализира базата данни на потока от данни в SQLite:

(venv) $ airflow initdb

Базата данни ще бъде създадена в airflow.db по подразбиране.

Проверете дали Airflow е инсталиран:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ако командата работи, тогава Airflow създава свой собствен конфигурационен файл airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow има уеб интерфейс. Може да се стартира чрез изпълнение на командата:

(venv) $ airflow webserver --port 8081

Вече можете да осъществите достъп до уеб интерфейса в браузър на порт 8081 на хоста, където се изпълняваше Airflow, по следния начин: <hostname:8081>.

2. Работа с Experimental API

На този Airflow е конфигуриран и готов за работа. Трябва обаче да стартираме и експерименталния API. Нашите пулове са написани на Python, така че по-нататък всички заявки ще бъдат на него с помощта на библиотеката requests.

Всъщност API вече работи за прости заявки. Например, такава заявка ви позволява да тествате нейната работа:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ако сте получили такова съобщение в отговор, това означава, че всичко работи.

Въпреки това, когато искаме да задействаме DAG, се натъкваме на факта, че този вид заявка не може да бъде направена без удостоверяване.

За да направите това, ще трябва да извършите редица действия.

Първо, трябва да добавите това към конфигурацията:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

След това трябва да създадете своя потребител с администраторски права:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

След това трябва да създадете потребител с нормални права, на който ще бъде разрешено да прави DAG тригер.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Сега всичко е готово.

3. Стартиране на POST заявка

Самата POST заявка ще изглежда така:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Заявката е обработена успешно.

Съответно, тогава даваме на DAG известно време за обработка и отправяне на заявка към таблицата ClickHouse, опитвайки се да улови контролния пакет данни.

Проверката е завършена.

Източник: www.habr.com

Добавяне на нов коментар