Kif tagħmel trigger DAG fl-Airflow billi tuża l-API Sperimentali

Meta nippreparaw il-programmi edukattivi tagħna, perjodikament niltaqgħu ma’ diffikultajiet f’termini ta’ ħidma ma’ ċerti għodod. U fil-mument meta niltaqgħu magħhom, mhux dejjem ikun hemm biżżejjed dokumentazzjoni u artikoli li jgħinuna nkampaw ma’ din il-problema.

Dan kien il-każ, pereżempju, fl-2015, u matul il-programm "Big Data Specialist" użajna cluster Hadoop bi Spark għal 35 utent simultanju. Ma kienx ċar kif tippreparaha għal każ ta' użu bħal dan bl-użu tal-ĦJUT. Fl-aħħar, wara li dehretha u mxew it-triq waħedna, għamilna post fuq Habré u mwettqa wkoll fi Moska Spark Meetup.

preistorja

Din id-darba se nitkellmu dwar programm differenti - Inġinier tad-Dejta. Il-parteċipanti tagħna jibnu żewġ tipi ta’ arkitettura fuqha: lambda u kappa. U fl-arkitettura lamdba, bħala parti mill-ipproċessar tal-lott, Airflow jintuża biex jittrasferixxi zkuk minn HDFS għal ClickHouse.

Kollox huwa ġeneralment tajjeb. Ħallihom jibnu l-pipelines tagħhom stess. Madankollu, hemm "iżda": il-programmi kollha tagħna huma teknoloġikament avvanzati mil-lat tal-proċess tat-tagħlim innifsu. Biex niċċekkjaw il-laboratorju, nużaw kontrolluri awtomatiċi: il-parteċipant jeħtieġ li jmur fil-kont personali tiegħu, ikklikkja l-buttuna "Iċċekkja", u wara xi żmien jara xi tip ta 'feedback estiż dwar dak li għamel. U huwa f'dan il-mument li nibdew nersqu lejn il-problema tagħna.

Il-verifika ta’ dan il-laboratorju hija strutturata hekk: nibagħtu pakkett ta’ dejta ta’ kontroll lill-Kafka tal-parteċipant, imbagħad Gobblin jittrasferixxi dan il-pakkett ta’ dejta lil HDFS, imbagħad Airflow jieħu dan il-pakkett ta’ dejta u jpoġġih f’ClickHouse. Il-trick huwa li Airflow m'għandux għalfejn jagħmel dan f'ħin reali, jagħmel dan skond skeda: kull 15-il minuta jieħu mazz ta 'fajls u jtellahom.

Jirriżulta li għandna bżonn b'xi mod nixprunaw id-DAG tagħhom aħna stess fuq talba tagħna waqt li l-kontrollur ikun qed jaħdem hawn u issa. Wara googling, sirna nafu li għal verżjonijiet aktar tard ta 'Airflow hemm hekk imsejħa API sperimentali. Il-kelma experimental, ovvjament, tinstema' tal-biża', imma x'għandek tagħmel... F'daqqa waħda titlaq.

Sussegwentement, se niddeskrivu t-triq kollha: mill-installazzjoni tal-Airflow sal-ġenerazzjoni ta 'talba POST li tixpruna d-DAG bl-użu tal-API Sperimentali. Aħna se naħdmu ma 'Ubuntu 16.04.

1. Installazzjoni tal-fluss ta 'l-arja

Ejja niċċekkjaw li għandna Python 3 u virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Jekk xi ħaġa minn dan tkun nieqsa, imbagħad installah.

Issa ejja noħolqu direttorju li fih se nkomplu naħdmu bl-Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installa Airflow:

(venv) $ pip install airflow

Il-verżjoni li ħdimna fuqha: 1.10.

Issa għandna bżonn noħolqu direttorju airflow_home, fejn se jkunu jinsabu l-fajls DAG u l-plugins tal-Fluss tal-Ajru. Wara li toħloq id-direttorju, issettja l-varjabbli ambjentali AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Il-pass li jmiss huwa li tmexxi kmand li joħloq u jinizjalizza database tal-fluss tad-dejta f'SQLite:

(venv) $ airflow initdb

Id-database se tinħoloq fi airflow.db default.

Ejja niċċekkjaw jekk Airflow huwiex installat:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Jekk il-kmand ħadem, allura Airflow ħoloq il-fajl ta 'konfigurazzjoni tiegħu stess airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow għandu interface tal-web. Jista 'jiġi mniedi billi tħaddem il-kmand:

(venv) $ airflow webserver --port 8081

Теперь вы можете попасть в веб-интерфейс в браузере на порту 8081 на хосте, где Airflow был запущен, например: <hostname:8081>.

2. Ħidma ma 'API Sperimentali

F'dan il-punt, Airflow huwa kkonfigurat u lest biex imur. Madankollu, jeħtieġ ukoll li nħaddmu l-API Sperimentali. Il-kontrolluri tagħna huma miktuba f'Python, għalhekk it-talbiet kollha se jkunu fiha billi tuża l-librerija requests.

На самом деле API уже работает для простых запросов. Например, такой запрос позволяет потестить его работу:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Jekk tirċievi messaġġ bħal dan bħala tweġiba, dan ifisser li kollox qed jaħdem.

Однако, когда мы захотим затригерить DAG, то столкнемся с тем, что этот вид запроса нельзя сделать без аутентификации.

Biex tagħmel dan, ser ikollok bżonn tagħmel numru ta 'passi aktar.

L-ewwel, trid iżżid dan mal-konfigurazzjoni:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Imbagħad, trid toħloq l-utent tiegħek bi drittijiet ta' amministratur:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sussegwentement, trid toħloq utent bi drittijiet normali li jitħallew iqabbdu d-DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Issa kollox lest.

3. Tnedija talba POST

It-talba POST nnifisha se tidher bħal din:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

It-talba ġiet ipproċessata b'suċċess.

Соответственно, далее мы даем какое-то время DAG’у на обработку и делаем запрос в таблицу ClickHouse, пытаясь поймать контрольный пакет данных.

Iċċekkja mitmuma.

Sors: www.habr.com

Żid kumment