Ako vytvoriť spúšť DAG v Airflow pomocou Experimental API

Pri príprave našich vzdelávacích programov sa pravidelne stretávame s ťažkosťami pri práci s určitými nástrojmi. A v momente, keď sa s nimi stretneme, nie je vždy dostatok dokumentácie a článkov, ktoré by nám pomohli tento problém zvládnuť.

Tak to bolo napríklad v roku 2015 a počas programu „Big Data Specialist“ sme použili klaster Hadoop so Sparkom pre 35 súčasných používateľov. Nebolo jasné, ako ho pripraviť na takýto prípad použitia pomocou PRIadze. Nakoniec, keď sme na to prišli a kráčali po ceste sami, sme to urobili príspevok na Habré a vystupoval aj na Moskovské stretnutie Spark.

pravek

Tentoraz budeme hovoriť o inom programe - Dátový inžinier. Naši účastníci na ňom stavajú dva typy architektúry: lambda a kappa. A v architektúre lamdba sa ako súčasť dávkového spracovania používa Airflow na prenos protokolov z HDFS do ClickHouse.

Všetko je vo všeobecnosti dobré. Nech si postavia vlastné potrubia. Je tu však jedno „ale“: všetky naše programy sú technologicky vyspelé z hľadiska samotného vzdelávacieho procesu. Na kontrolu laboratória používame automatickú kontrolu: účastník musí prejsť na svoj osobný účet, kliknúť na tlačidlo „Skontrolovať“ a po určitom čase uvidí nejakú rozšírenú spätnú väzbu o tom, čo urobil. A práve v tejto chvíli začíname pristupovať k nášmu problému.

Overenie tohto laboratória je štruktúrované takto: pošleme kontrolný dátový paket do Kafky účastníka, potom Gobblin prenesie tento dátový paket do HDFS, potom Airflow vezme tento dátový paket a vloží ho do ClickHouse. Trik je v tom, že Airflow to nemusí robiť v reálnom čase, robí to podľa plánu: každých 15 minút odoberie veľa súborov a nahrá ich.

Ukazuje sa, že ich DAG musíme nejako spustiť sami na našu žiadosť, zatiaľ čo kontrola beží tu a teraz. Po vygooglovaní sme zistili, že pre neskoršie verzie Airflow existuje tzv Experimentálne API, Slovo experimental, samozrejme, znie to strašidelne, ale čo robiť... Zrazu sa to rozbehne.

Ďalej opíšeme celú cestu: od inštalácie Airflow až po vygenerovanie požiadavky POST, ktorá spustí DAG pomocou Experimental API. Budeme pracovať s Ubuntu 16.04.

1. Inštalácia prúdenia vzduchu

Skontrolujeme, či máme Python 3 a virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ak niečo z toho chýba, nainštalujte ho.

Teraz si vytvoríme adresár, v ktorom budeme pokračovať v práci s Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Nainštalujte prúdenie vzduchu:

(venv) $ pip install airflow

Verzia, na ktorej sme pracovali: 1.10.

Teraz musíme vytvoriť adresár airflow_home, kde sa budú nachádzať súbory DAG a zásuvné moduly Airflow. Po vytvorení adresára nastavte premennú prostredia AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Ďalším krokom je spustenie príkazu, ktorý vytvorí a inicializuje databázu toku údajov v SQLite:

(venv) $ airflow initdb

Databáza bude vytvorená v airflow.db predvolené.

Skontrolujeme, či je nainštalovaný Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ak príkaz fungoval, Airflow vytvoril svoj vlastný konfiguračný súbor airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow má webové rozhranie. Dá sa spustiť spustením príkazu:

(venv) $ airflow webserver --port 8081

Teraz môžete prejsť na webové rozhranie v prehliadači na porte 8081 na hostiteľovi, kde bežal Airflow, napríklad: <hostname:8081>.

2. Práca s Experimental API

V tomto bode je prúdenie vzduchu nakonfigurované a pripravené na použitie. Musíme však spustiť aj Experimental API. Naše kontroly sú napísané v Pythone, takže všetky požiadavky budú v ňom ďalej pomocou knižnice requests.

V skutočnosti API už funguje pre jednoduché požiadavky. Táto požiadavka vám napríklad umožňuje otestovať jej fungovanie:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ak dostanete takúto správu ako odpoveď, znamená to, že všetko funguje.

Keď však chceme spustiť DAG, stretávame sa so skutočnosťou, že tento typ požiadavky nemožno vykonať bez autentifikácie.

Ak to chcete urobiť, budete musieť vykonať niekoľko ďalších krokov.

Najprv musíte do konfigurácie pridať toto:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Potom musíte vytvoriť používateľa s právami správcu:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ďalej musíte vytvoriť používateľa s normálnymi právami, ktorý bude môcť spustiť DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Teraz je všetko pripravené.

3. Spustite požiadavku POST

Samotná požiadavka POST bude vyzerať takto:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Žiadosť bola úspešne spracovaná.

V súlade s tým dáme DAG nejaký čas na spracovanie a odoslanie požiadavky na tabuľku ClickHouse, pričom sa pokúsime zachytiť paket riadiacich údajov.

Kontrola dokončená.

Zdroj: hab.com

Pridať komentár