Kaip sukurti DAG aktyviklį „Airflow“ naudojant eksperimentinę API

Rengdami savo edukacines programas, periodiškai susiduriame su sunkumais dirbant su kai kuriomis priemonėmis. Ir tuo metu, kai susiduriame su jais, ne visada yra pakankamai dokumentacijos ir straipsnių, kurie padėtų susidoroti su šia problema.

Taip buvo, pavyzdžiui, 2015 m., o „Hadoop“ klasterį su „Spark“ naudojome 35 „Big Data Specialist“ programos naudotojams vienu metu. Nebuvo aišku, kaip jį paruošti tokiam vartotojo atvejui naudojant YARN. Dėl to, patys išsiaiškinę ir nuėję kelią, jie tai padarė įrašas apie Habré ir taip pat atliko Maskvos kibirkšties susitikimas.

priešistorė

Šį kartą kalbėsime apie kitokią programą – Duomenų inžinierius. Ant jo mūsų dalyviai stato dviejų tipų architektūrą: lambda ir kappa. O lamdba architektūroje „Airflow“ naudojama kaip paketinio apdorojimo dalis, norint perkelti žurnalus iš HDFS į „ClickHouse“.

Apskritai viskas gerai. Leisk jiems nutiesti savo vamzdynus. Tačiau yra „bet“: visos mūsų programos yra technologiškai pažangios paties mokymosi proceso atžvilgiu. Norėdami patikrinti laboratoriją, naudojame automatines tikrintuvas: dalyvis turi eiti į savo asmeninę paskyrą, paspausti mygtuką „Tikrinti“ ir po kurio laiko jis mato kažkokį išplėstinį atsiliepimą apie tai, ką padarė. Ir būtent šiuo metu mes pradedame artėti prie savo problemos.

Šios laboratorijos tikrinimas yra išdėstytas taip: mes išsiunčiame kontrolinį duomenų paketą dalyvio Kafka, tada Gobblin perduoda šį duomenų paketą į HDFS, tada Airflow paima šį duomenų paketą ir įdeda jį į ClickHouse. Apgaulė ta, kad „Airflow“ neprivalo to daryti realiu laiku, ji tai daro pagal grafiką: kartą per 15 minučių paima krūvą failų ir juos įkelia.

Pasirodo, mums reikia kažkaip savo prašymu suaktyvinti jų DAG, kol tikrintuvas veikia čia ir dabar. Googluodami išsiaiškinome, kad vėlesnėms Airflow versijoms yra vadinamasis Eksperimentinė API. Žodis experimental, žinoma, skamba baisiai, bet ką daryti... Staiga ima.

Toliau apibūdinsime visą kelią: nuo „Airflow“ diegimo iki POST užklausos, suaktyvinančios DAG, sugeneravimo naudojant eksperimentinę API. Dirbsime su Ubuntu 16.04.

1. Oro srauto įrengimas

Patikrinkime, ar turime Python 3 ir virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Jei vieno iš jų trūksta, įdiekite jį.

Dabar sukurkime katalogą, kuriame ir toliau dirbsime su „Airflow“.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Įdiekite oro srautą:

(venv) $ pip install airflow

Versija, prie kurios dirbome: 1.10.

Dabar turime sukurti katalogą airflow_home, kur bus DAG failai ir Airflow įskiepiai. Sukūrę katalogą, nustatykite aplinkos kintamąjį AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Kitas žingsnis yra paleisti komandą, kuri sukurs ir inicijuos duomenų srauto duomenų bazę SQLite:

(venv) $ airflow initdb

Duomenų bazė bus sukurta airflow.db numatytas.

Patikrinkite, ar sumontuotas oro srautas:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Jei komanda veikė, tada Airflow sukūrė savo konfigūracijos failą airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

„Airflow“ turi žiniatinklio sąsają. Jį galima paleisti vykdant komandą:

(venv) $ airflow webserver --port 8081

Dabar žiniatinklio sąsają galite pasiekti naršyklėje, esančiame pagrindinio kompiuterio, kuriame veikė „Airflow“, 8081 prievadas, kaip nurodyta toliau: <hostname:8081>.

2. Darbas su eksperimentine API

Šiuo atveju oro srautas yra sukonfigūruotas ir paruoštas darbui. Tačiau taip pat turime paleisti eksperimentinę API. Mūsų šaškės yra parašytos Python, todėl toliau visos užklausos bus pateikiamos naudojant biblioteką requests.

Tiesą sakant, API jau veikia paprastoms užklausoms. Pavyzdžiui, tokia užklausa leidžia išbandyti jos darbą:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Jei gavote tokį pranešimą atsakydami, tai reiškia, kad viskas veikia.

Tačiau kai norime suaktyvinti DAG, susiduriame su faktu, kad tokio tipo užklausos negalima pateikti be autentifikavimo.

Norėdami tai padaryti, turėsite atlikti keletą veiksmų.

Pirmiausia turite pridėti tai prie konfigūracijos:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Tada turite sukurti savo vartotoją su administratoriaus teisėmis:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tada turite sukurti vartotoją su įprastomis teisėmis, kuriam bus leista padaryti DAG aktyviklį.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Dabar viskas paruošta.

3. POST užklausos paleidimas

Pati POST užklausa atrodys taip:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Užklausa sėkmingai apdorota.

Atitinkamai, tada mes suteikiame DAG šiek tiek laiko apdoroti ir pateikti užklausą „ClickHouse“ lentelei, bandydami sugauti valdymo duomenų paketą.

Patikrinimas baigtas.

Šaltinis: www.habr.com

Добавить комментарий