Meta nippreparaw il-programmi edukattivi tagħna, perjodikament niltaqgħu ma’ diffikultajiet f’termini ta’ ħidma ma’ ċerti għodod. U fil-mument meta niltaqgħu magħhom, mhux dejjem ikun hemm biżżejjed dokumentazzjoni u artikoli li jgħinuna nkampaw ma’ din il-problema.
Dan kien il-każ, pereżempju, fl-2015, u matul il-programm "Big Data Specialist" użajna cluster Hadoop bi Spark għal 35 utent simultanju. Ma kienx ċar kif tippreparaha għal każ ta' użu bħal dan bl-użu tal-ĦJUT. Fl-aħħar, wara li dehretha u mxew it-triq waħedna, għamilna
preistorja
Din id-darba se nitkellmu dwar programm differenti -
Kollox huwa ġeneralment tajjeb. Ħallihom jibnu l-pipelines tagħhom stess. Madankollu, hemm "iżda": il-programmi kollha tagħna huma teknoloġikament avvanzati mil-lat tal-proċess tat-tagħlim innifsu. Biex niċċekkjaw il-laboratorju, nużaw kontrolluri awtomatiċi: il-parteċipant jeħtieġ li jmur fil-kont personali tiegħu, ikklikkja l-buttuna "Iċċekkja", u wara xi żmien jara xi tip ta 'feedback estiż dwar dak li għamel. U huwa f'dan il-mument li nibdew nersqu lejn il-problema tagħna.
Il-verifika ta’ dan il-laboratorju hija strutturata hekk: nibagħtu pakkett ta’ dejta ta’ kontroll lill-Kafka tal-parteċipant, imbagħad Gobblin jittrasferixxi dan il-pakkett ta’ dejta lil HDFS, imbagħad Airflow jieħu dan il-pakkett ta’ dejta u jpoġġih f’ClickHouse. Il-trick huwa li Airflow m'għandux għalfejn jagħmel dan f'ħin reali, jagħmel dan skond skeda: kull 15-il minuta jieħu mazz ta 'fajls u jtellahom.
Jirriżulta li għandna bżonn b'xi mod nixprunaw id-DAG tagħhom aħna stess fuq talba tagħna waqt li l-kontrollur ikun qed jaħdem hawn u issa. Wara googling, sirna nafu li għal verżjonijiet aktar tard ta 'Airflow hemm hekk imsejħa experimental
, ovvjament, tinstema' tal-biża', imma x'għandek tagħmel... F'daqqa waħda titlaq.
Sussegwentement, se niddeskrivu t-triq kollha: mill-installazzjoni tal-Airflow sal-ġenerazzjoni ta 'talba POST li tixpruna d-DAG bl-użu tal-API Sperimentali. Aħna se naħdmu ma 'Ubuntu 16.04.
1. Installazzjoni tal-fluss ta 'l-arja
Ejja niċċekkjaw li għandna Python 3 u virtualenv.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Jekk xi ħaġa minn dan tkun nieqsa, imbagħad installah.
Issa ejja noħolqu direttorju li fih se nkomplu naħdmu bl-Airflow.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Installa Airflow:
(venv) $ pip install airflow
Il-verżjoni li ħdimna fuqha: 1.10.
Issa għandna bżonn noħolqu direttorju airflow_home
, fejn se jkunu jinsabu l-fajls DAG u l-plugins tal-Fluss tal-Ajru. Wara li toħloq id-direttorju, issettja l-varjabbli ambjentali AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Il-pass li jmiss huwa li tmexxi kmand li joħloq u jinizjalizza database tal-fluss tad-dejta f'SQLite:
(venv) $ airflow initdb
Id-database se tinħoloq fi airflow.db
default.
Ejja niċċekkjaw jekk Airflow huwiex installat:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Jekk il-kmand ħadem, allura Airflow ħoloq il-fajl ta 'konfigurazzjoni tiegħu stess airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow għandu interface tal-web. Jista 'jiġi mniedi billi tħaddem il-kmand:
(venv) $ airflow webserver --port 8081
Теперь вы можете попасть в веб-интерфейс в браузере на порту 8081 на хосте, где Airflow был запущен, например: <hostname:8081>
.
2. Ħidma ma 'API Sperimentali
F'dan il-punt, Airflow huwa kkonfigurat u lest biex imur. Madankollu, jeħtieġ ukoll li nħaddmu l-API Sperimentali. Il-kontrolluri tagħna huma miktuba f'Python, għalhekk it-talbiet kollha se jkunu fiha billi tuża l-librerija requests
.
На самом деле API уже работает для простых запросов. Например, такой запрос позволяет потестить его работу:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Jekk tirċievi messaġġ bħal dan bħala tweġiba, dan ifisser li kollox qed jaħdem.
Однако, когда мы захотим затригерить DAG, то столкнемся с тем, что этот вид запроса нельзя сделать без аутентификации.
Biex tagħmel dan, ser ikollok bżonn tagħmel numru ta 'passi aktar.
L-ewwel, trid iżżid dan mal-konfigurazzjoni:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Imbagħad, trid toħloq l-utent tiegħek bi drittijiet ta' amministratur:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Sussegwentement, trid toħloq utent bi drittijiet normali li jitħallew iqabbdu d-DAG.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Issa kollox lest.
3. Tnedija talba POST
It-talba POST nnifisha se tidher bħal din:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
It-talba ġiet ipproċessata b'suċċess.
Соответственно, далее мы даем какое-то время DAG’у на обработку и делаем запрос в таблицу ClickHouse, пытаясь поймать контрольный пакет данных.
Iċċekkja mitmuma.
Sors: www.habr.com