Эксперименттік API арқылы Airflow жүйесінде DAG триггерін қалай жасауға болады

Білім беру бағдарламаларын дайындау барысында біз кейбір құралдармен жұмыс істеу тұрғысынан мезгіл-мезгіл қиындықтарға тап боламыз. Біз олармен кездескен кезде бұл мәселені шешуге көмектесетін құжаттар мен мақалалар әрдайым жеткіліксіз.

Бұл, мысалы, 2015 жылы болды және біз Big Data Specialist бағдарламасында бір уақытта 35 пайдаланушы үшін Spark бар Hadoop кластерін қолдандық. YARN көмегімен мұндай пайдаланушы жағдайына оны қалай дайындау керектігі түсініксіз болды. Нәтижесінде олар жолды өздері анықтап, жүрді Хабреге жариялау және де орындады Мәскеудегі Spark кездесуі.

тарихын

Бұл жолы біз басқа бағдарлама туралы сөйлесетін боламыз - Деректер инженері. Онда біздің қатысушылар сәулеттің екі түрін салады: ламбда және каппа. Ал lamdba архитектурасында Airflow журналдарды HDFS-тен ClickHouse-қа тасымалдау үшін пакеттік өңдеудің бөлігі ретінде пайдаланылады.

Жалпы бәрі жақсы. Олар өз құбырларын салсын. Дегенмен, «бірақ» бар: біздің барлық бағдарламаларымыз оқу процесінің өзі тұрғысынан технологиялық тұрғыдан жетілдірілген. Зертхананы тексеру үшін біз автоматты тексерулерді қолданамыз: қатысушы өзінің жеке кабинетіне өтіп, «Тексеру» түймесін басу керек, біраз уақыттан кейін ол не істегені туралы кеңейтілген кері байланысты көреді. Міне, осы кезде біз өз мәселемізге жақындай бастаймыз.

Бұл зертхананы тексеру келесідей реттелген: біз қатысушының Кафкасына бақылау деректер пакетін жібереміз, содан кейін Gobblin бұл деректер пакетін HDFS жүйесіне тасымалдайды, содан кейін Airflow бұл деректер пакетін алып, ClickHouse ішіне қояды. Айтуынша, Airflow мұны нақты уақыт режимінде жасаудың қажеті жоқ, ол оны кесте бойынша жасайды: әр 15 минут сайын ол көптеген файлдарды алып, оларды жүктеп салады.

Тексеру осында және қазір жұмыс істеп тұрғанда, біз олардың DAG-ын біздің өтінішіміз бойынша өзіміз іске қосуымыз керек екен. Google арқылы біз Airflow-тың кейінгі нұсқалары үшін деп аталатын нұсқасы бар екенін білдік Эксперименттік API. Сөз experimental, әрине, бұл қорқынышты естіледі, бірақ не істеу керек ... Ол кенеттен ұшып кетеді.

Әрі қарай, біз бүкіл жолды сипаттаймыз: Airflow орнатудан бастап Experimental API арқылы DAG іске қосатын POST сұрауын жасауға дейін. Біз Ubuntu 16.04 нұсқасымен жұмыс істейміз.

1. Ауа ағынын орнату

Бізде Python 3 және virtualenv бар екенін тексерейік.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Егер олардың біреуі жоқ болса, оны орнатыңыз.

Енді Airflow-пен жұмыс істеуді жалғастыратын каталог жасайық.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Ауа ағынын орнату:

(venv) $ pip install airflow

Біз жұмыс істеген нұсқа: 1.10.

Енді бізге каталог жасау керек airflow_home, DAG файлдары мен Airflow плагиндері қайда орналасады. Каталогты жасағаннан кейін ортаның айнымалы мәнін орнатыңыз AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Келесі қадам SQLite ішінде деректер ағыны дерекқорын жасайтын және инициализациялайтын пәрменді іске қосу болып табылады:

(venv) $ airflow initdb

Деректер базасы жылы жасалады airflow.db әдепкі

Ауа ағынының орнатылғанын тексеріңіз:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Егер пәрмен жұмыс істесе, Airflow өзінің конфигурация файлын жасады airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow веб-интерфейсі бар. Оны келесі пәрменді іске қосу арқылы іске қосуға болады:

(venv) $ airflow webserver --port 8081

Енді сіз Airflow іске қосылған хосттағы 8081 портындағы шолғышта веб-интерфейске қол жеткізе аласыз, мысалы: <hostname:8081>.

2. Experimental API-мен жұмыс істеу

Бұл жерде ауа ағыны конфигурацияланған және пайдалануға дайын. Дегенмен, эксперименттік API іске қосуымыз керек. Біздің дойбылар Python тілінде жазылған, сондықтан одан әрі барлық сұраулар кітапхана арқылы оған түседі requests.

Іс жүзінде API қарапайым сұраулар үшін жұмыс істейді. Мысалы, мұндай сұрау оның жұмысын тексеруге мүмкіндік береді:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Егер сіз осындай хабарламаны жауап ретінде алсаңыз, бұл бәрі жұмыс істеп тұрғанын білдіреді.

Дегенмен, біз DAG іске қосқымыз келгенде, мұндай сұрауды аутентификациясыз жасауға болмайтындығына тап боламыз.

Мұны істеу үшін сізге бірқатар әрекеттерді орындау қажет болады.

Алдымен оны конфигурацияға қосу керек:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Содан кейін әкімші құқықтары бар пайдаланушыны жасау керек:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Әрі қарай, DAG триггерін жасауға рұқсат беретін қалыпты құқықтары бар пайдаланушыны жасау керек.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Қазір бәрі дайын.

3. POST сұрауын іске қосу

POST сұрауының өзі келесідей болады:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Сұрау сәтті өңделді.

Тиісінше, содан кейін біз DAG-ға басқару деректер пакетін ұстауға тырысып, ClickHouse кестесіне сұранысты өңдеуге және жасауға біраз уақыт береміз.

Тексеру аяқталды.

Ақпарат көзі: www.habr.com

пікір қалдыру