Pri príprave našich vzdelávacích programov sa pravidelne stretávame s ťažkosťami pri práci s určitými nástrojmi. A v momente, keď sa s nimi stretneme, nie je vždy dostatok dokumentácie a článkov, ktoré by nám pomohli tento problém zvládnuť.
Tak to bolo napríklad v roku 2015 a počas programu „Big Data Specialist“ sme použili klaster Hadoop so Sparkom pre 35 súčasných používateľov. Nebolo jasné, ako ho pripraviť na takýto prípad použitia pomocou PRIadze. Nakoniec, keď sme na to prišli a kráčali po ceste sami, sme to urobili
pravek
Tentoraz budeme hovoriť o inom programe -
Všetko je vo všeobecnosti dobré. Nech si postavia vlastné potrubia. Je tu však jedno „ale“: všetky naše programy sú technologicky vyspelé z hľadiska samotného vzdelávacieho procesu. Na kontrolu laboratória používame automatickú kontrolu: účastník musí prejsť na svoj osobný účet, kliknúť na tlačidlo „Skontrolovať“ a po určitom čase uvidí nejakú rozšírenú spätnú väzbu o tom, čo urobil. A práve v tejto chvíli začíname pristupovať k nášmu problému.
Overenie tohto laboratória je štruktúrované takto: pošleme kontrolný dátový paket do Kafky účastníka, potom Gobblin prenesie tento dátový paket do HDFS, potom Airflow vezme tento dátový paket a vloží ho do ClickHouse. Trik je v tom, že Airflow to nemusí robiť v reálnom čase, robí to podľa plánu: každých 15 minút odoberie veľa súborov a nahrá ich.
Ukazuje sa, že ich DAG musíme nejako spustiť sami na našu žiadosť, zatiaľ čo kontrola beží tu a teraz. Po vygooglovaní sme zistili, že pre neskoršie verzie Airflow existuje tzv experimental
, samozrejme, znie to strašidelne, ale čo robiť... Zrazu sa to rozbehne.
Ďalej opíšeme celú cestu: od inštalácie Airflow až po vygenerovanie požiadavky POST, ktorá spustí DAG pomocou Experimental API. Budeme pracovať s Ubuntu 16.04.
1. Inštalácia prúdenia vzduchu
Skontrolujeme, či máme Python 3 a virtualenv.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Ak niečo z toho chýba, nainštalujte ho.
Teraz si vytvoríme adresár, v ktorom budeme pokračovať v práci s Airflow.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Nainštalujte prúdenie vzduchu:
(venv) $ pip install airflow
Verzia, na ktorej sme pracovali: 1.10.
Teraz musíme vytvoriť adresár airflow_home
, kde sa budú nachádzať súbory DAG a zásuvné moduly Airflow. Po vytvorení adresára nastavte premennú prostredia AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Ďalším krokom je spustenie príkazu, ktorý vytvorí a inicializuje databázu toku údajov v SQLite:
(venv) $ airflow initdb
Databáza bude vytvorená v airflow.db
predvolené.
Skontrolujeme, či je nainštalovaný Airflow:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Ak príkaz fungoval, Airflow vytvoril svoj vlastný konfiguračný súbor airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow má webové rozhranie. Dá sa spustiť spustením príkazu:
(venv) $ airflow webserver --port 8081
Teraz môžete prejsť na webové rozhranie v prehliadači na porte 8081 na hostiteľovi, kde bežal Airflow, napríklad: <hostname:8081>
.
2. Práca s Experimental API
V tomto bode je prúdenie vzduchu nakonfigurované a pripravené na použitie. Musíme však spustiť aj Experimental API. Naše kontroly sú napísané v Pythone, takže všetky požiadavky budú v ňom ďalej pomocou knižnice requests
.
V skutočnosti API už funguje pre jednoduché požiadavky. Táto požiadavka vám napríklad umožňuje otestovať jej fungovanie:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Ak dostanete takúto správu ako odpoveď, znamená to, že všetko funguje.
Keď však chceme spustiť DAG, stretávame sa so skutočnosťou, že tento typ požiadavky nemožno vykonať bez autentifikácie.
Ak to chcete urobiť, budete musieť vykonať niekoľko ďalších krokov.
Najprv musíte do konfigurácie pridať toto:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Potom musíte vytvoriť používateľa s právami správcu:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Ďalej musíte vytvoriť používateľa s normálnymi právami, ktorý bude môcť spustiť DAG.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Teraz je všetko pripravené.
3. Spustite požiadavku POST
Samotná požiadavka POST bude vyzerať takto:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
Žiadosť bola úspešne spracovaná.
V súlade s tým dáme DAG nejaký čas na spracovanie a odoslanie požiadavky na tabuľku ClickHouse, pričom sa pokúsime zachytiť paket riadiacich údajov.
Kontrola dokončená.
Zdroj: hab.com