Cum să declanșați un DAG în Airflow folosind API-ul experimental

În pregătirea programelor noastre educaționale, întâmpinăm periodic dificultăți în ceea ce privește lucrul cu unele instrumente. Și în momentul în care le întâlnim, nu există întotdeauna suficientă documentație și articole care să ajute să facem față acestei probleme.

Așa a fost, de exemplu, în 2015 și am folosit clusterul Hadoop cu Spark pentru 35 de utilizatori simultani în programul Big Data Specialist. Nu era clar cum să-l pregătiți pentru un astfel de caz de utilizator folosind YARN. Drept urmare, după ce și-au dat seama și mergând singuri pe cale, au făcut-o postați pe Habré și, de asemenea, executat Întâlnirea Moscova Spark.

preistorie

De data aceasta vom vorbi despre un alt program - Inginer de date. Pe el, participanții noștri construiesc două tipuri de arhitectură: lambda și kappa. Și în arhitectura lamdba, Airflow este folosit ca parte a procesării loturilor pentru a transfera jurnalele de la HDFS la ClickHouse.

Totul este în general bine. Lasă-i să-și construiască conductele. Cu toate acestea, există un „dar”: toate programele noastre sunt avansate tehnologic în ceea ce privește procesul de învățare în sine. Pentru a verifica laboratorul, folosim verificatoare automate: participantul trebuie să meargă la contul său personal, să facă clic pe butonul „Verifică” și, după un timp, vede un fel de feedback extins cu privire la ceea ce a făcut. Și tocmai în acest moment începem să ne abordăm problema.

Verificarea acestui laborator este aranjată după cum urmează: trimitem un pachet de date de control către Kafka participantului, apoi Gobblin transferă acest pachet de date la HDFS, apoi Airflow ia acest pachet de date și îl pune în ClickHouse. Trucul este că Airflow nu trebuie să facă acest lucru în timp real, o face conform programului: o dată la 15 minute, ia o grămadă de fișiere și le încarcă.

Se pare că trebuie să declanșăm cumva DAG-ul lor pe cont propriu, la cererea noastră, în timp ce verificatorul rulează aici și acum. Gândind pe Google, am aflat că pentru versiunile ulterioare de Airflow există un așa-numit API-ul experimental. Cuvântul experimental, desigur, sună înfricoșător, dar ce să faci... Decolează brusc.

În continuare, vom descrie întreaga cale: de la instalarea Airflow până la generarea unei solicitări POST care declanșează un DAG folosind API-ul experimental. Vom lucra cu Ubuntu 16.04.

1. Instalarea fluxului de aer

Să verificăm dacă avem Python 3 și virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Dacă unul dintre acestea lipsește, atunci instalați-l.

Acum să creăm un director în care vom continua să lucrăm cu Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalați fluxul de aer:

(venv) $ pip install airflow

Versiunea la care am lucrat: 1.10.

Acum trebuie să creăm un director airflow_home, unde vor fi localizate fișierele DAG și pluginurile Airflow. După crearea directorului, setați variabila de mediu AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Următorul pas este să rulați comanda care va crea și inițializa baza de date a fluxului de date în SQLite:

(venv) $ airflow initdb

Baza de date va fi creată în airflow.db Mod implicit.

Verificați dacă Airflow este instalat:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Dacă comanda a funcționat, atunci Airflow și-a creat propriul fișier de configurare airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow are o interfață web. Poate fi lansat rulând comanda:

(venv) $ airflow webserver --port 8081

Acum puteți accesa interfața web într-un browser pe portul 8081 de pe gazda pe care rula Airflow, astfel: <hostname:8081>.

2. Lucrul cu API-ul experimental

Pe acest flux de aer este configurat și gata de funcționare. Cu toate acestea, trebuie să rulăm și API-ul experimental. Verificatoarele noastre sunt scrise în Python, așa că în continuare toate solicitările vor fi pe el folosind biblioteca requests.

De fapt, API-ul funcționează deja pentru cereri simple. De exemplu, o astfel de solicitare vă permite să-i testați funcționarea:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Dacă ați primit un astfel de mesaj ca răspuns, înseamnă că totul funcționează.

Cu toate acestea, atunci când dorim să declanșăm un DAG, ne întâlnim cu faptul că acest tip de solicitare nu poate fi făcută fără autentificare.

Pentru a face acest lucru, va trebui să faceți o serie de acțiuni.

Mai întâi, trebuie să adăugați acest lucru la configurație:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Apoi, trebuie să vă creați utilizatorul cu drepturi de administrator:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

În continuare, trebuie să creați un utilizator cu drepturi normale, căruia i se va permite să declanșeze un DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Acum totul este gata.

3. Lansarea unei cereri POST

Cererea POST în sine va arăta astfel:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Solicitarea a fost procesată cu succes.

În consecință, acordăm DAG-ului ceva timp pentru a procesa și a face o cerere către tabelul ClickHouse, încercând să prindă pachetul de date de control.

Verificare finalizată.

Sursa: www.habr.com

Adauga un comentariu