Kako narediti sprožilec DAG v Airflow z uporabo eksperimentalnega API-ja

Pri pripravi izobraževalnih programov občasno naletimo na težave pri delu z nekaterimi orodji. In v trenutku, ko se z njimi srečamo, ni vedno dovolj dokumentacije in člankov, ki bi pomagali pri obvladovanju te težave.

Tako je bilo na primer leta 2015, ko smo uporabili gručo Hadoop s Sparkom za 35 hkratnih uporabnikov na programu Big Data Specialist. Ni bilo jasno, kako ga pripraviti za tak uporabniški primer z uporabo YARN. Posledično so ugotovili in hodili po poti sami objava na Habréju in tudi izvajal Moscow Spark Meetup.

prazgodovina

Tokrat bomo govorili o drugačnem programu - Podatkovni inženir. Na njej naši udeleženci gradijo dve vrsti arhitekture: lambda in kapa. In v arhitekturi lamdba se Airflow uporablja kot del paketne obdelave za prenos dnevnikov iz HDFS v ClickHouse.

Na splošno je vse dobro. Naj gradijo svoje plinovode. Vendar obstaja en »ampak«: vsi naši programi so tehnološko napredni v smislu samega učnega procesa. Za preverjanje laboratorija uporabljamo samodejne pregledovalnike: udeleženec mora iti v svoj osebni račun, klikniti gumb »Preveri« in čez nekaj časa vidi nekakšno razširjeno povratno informacijo o tem, kaj je naredil. In na tej točki se začnemo približevati našemu problemu.

Preverjanje tega laboratorija je urejeno na naslednji način: udeleženčevemu Kafki pošljemo kontrolni podatkovni paket, nato Gobblin ta podatkovni paket prenese v HDFS, nato Airflow prevzame ta podatkovni paket in ga postavi v ClickHouse. Trik je v tem, da Airflowu tega ni treba storiti v realnem času, ampak po urniku: vsakih 15 minut vzame kup datotek in jih naloži.

Izkazalo se je, da moramo nekako sami sprožiti njihov DAG na našo zahtevo, medtem ko preverjalnik teče tukaj in zdaj. Z googlanjem smo ugotovili, da za novejše različice Airflow obstaja t.i Eksperimentalni API. beseda experimental, seveda se sliši strašljivo, ampak kaj storiti ... Nenadoma vzleti.

Nato bomo opisali celotno pot: od namestitve Airflow do generiranja zahteve POST, ki sproži DAG z uporabo Experimental API. Delali bomo z Ubuntu 16.04.

1. Namestitev pretoka zraka

Preverimo, ali imamo Python 3 in virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Če kateri od teh manjka, ga namestite.

Zdaj pa ustvarimo imenik, v katerem bomo nadaljevali delo z Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Namestite Airflow:

(venv) $ pip install airflow

Različica, na kateri smo delali: 1.10.

Sedaj moramo ustvariti imenik airflow_home, kjer se bodo nahajale datoteke DAG in vtičniki Airflow. Ko ustvarite imenik, nastavite spremenljivko okolja AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Naslednji korak je zagon ukaza, ki bo ustvaril in inicializiral bazo podatkovnega toka v SQLite:

(venv) $ airflow initdb

Baza podatkov bo ustvarjena v airflow.db privzeto.

Preverite, ali je nameščen Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Če je ukaz deloval, je Airflow ustvaril lastno konfiguracijsko datoteko airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ima spletni vmesnik. Zaženete ga lahko z ukazom:

(venv) $ airflow webserver --port 8081

Zdaj lahko dostopate do spletnega vmesnika v brskalniku na vratih 8081 na gostitelju, kjer se izvaja Airflow, takole: <hostname:8081>.

2. Delo z eksperimentalnim API-jem

Na tem je Airflow konfiguriran in pripravljen za uporabo. Vendar pa moramo zagnati tudi eksperimentalni API. Naši damalniki so napisani v Pythonu, tako da bodo nadaljnje vse zahteve na njem z uporabo knjižnice requests.

Pravzaprav API že deluje za preproste zahteve. Na primer, taka zahteva vam omogoča, da preizkusite njeno delo:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Če ste v odgovor prejeli takšno sporočilo, to pomeni, da vse deluje.

Ko pa želimo sprožiti DAG, naletimo na dejstvo, da tovrstne zahteve ni mogoče narediti brez avtentikacije.

Če želite to narediti, boste morali narediti več dejanj.

Najprej morate to dodati v konfiguracijo:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Nato morate ustvariti svojega uporabnika s skrbniškimi pravicami:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nato morate ustvariti uporabnika z običajnimi pravicami, ki mu bo dovoljeno sprožiti DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Zdaj je vse pripravljeno.

3. Zagon zahteve POST

Sama zahteva POST bo videti takole:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Zahteva uspešno obdelana.

V skladu s tem damo DAG-u nekaj časa za obdelavo in vložitev zahteve za tabelo ClickHouse ter poskuša ujeti kontrolni podatkovni paket.

Preverjanje končano.

Vir: www.habr.com

Dodaj komentar