Nola egin DAG abiarazlea Airflow-en API esperimentala erabiliz

Gure hezkuntza programak prestatzerakoan, aldian-aldian zailtasunak aurkitzen ditugu tresna batzuekin lan egiteko. Eta haiekin topo egiten dugun momentuan, ez dago beti arazo honi aurre egiten lagunduko luketen dokumentazio eta artikulu nahikorik.

Hala izan zen, adibidez, 2015ean, eta Spark-ekin Hadoop klusterra erabili genuen aldi berean 35 erabiltzailerentzat Big Data Specialist programan. Ez zegoen argi nola prestatu erabiltzaile kasu baterako YARN erabiliz. Ondorioz, bidea asmatu eta euren kabuz ibilita, egin zuten argitaratu Habré-n eta antzeztu ere bai Moskuko Spark Topaketa.

historiaurrea

Oraingoan beste programa bati buruz hitz egingo dugu - Datu Ingeniaria. Bertan, gure partaideek bi arkitektura mota eraikitzen dituzte: lambda eta kappa. Eta lamdba arkitekturan, Airflow batch prozesatzeko zati gisa erabiltzen da erregistroak HDFStik ClickHousera transferitzeko.

Orokorrean dena ona da. Utzi beren hodiak eraikitzen. Hala ere, badago “baina” bat: gure programa guztiak teknologikoki aurreratuak dira ikaskuntza prozesuari berari dagokionez. Laborategia egiaztatzeko, egiaztatzaile automatikoak erabiltzen ditugu: parte-hartzaileak bere kontu pertsonalera joan behar du, "Egiaztatu" botoia sakatu, eta pixka bat igaro ondoren, egindakoaren inguruko feedback hedatu bat ikusiko du. Eta une honetan hasten gara gure arazoari heltzen.

Laborategi hau egiaztatzea honela antolatuta dago: kontrol-datuen pakete bat bidaltzen dugu parte-hartzailearen Kafka-ra, gero Gobblin-ek datu-pakete hau HDFSra transferitzen du, gero Airflow-ek datu-pakete hau hartzen du eta ClickHouse-n jartzen du. Trikimailua da Airflow-ek ez duela hori denbora errealean egin behar, programazioan egiten duela: 15 minuturo behin fitxategi mordoa hartu eta igotzen ditu.

Ematen du nolabait gure DAG abiarazi behar dugula gure eskariz, egiaztapena hemen eta orain exekutatzen ari den bitartean. Googlen, jakin dugu Airflow-en geroagoko bertsioetarako deituriko bat dagoela API esperimentala. Hitza experimental, noski, beldurgarria dirudi, baina zer egin... Bat-batean aireratzen da.

Jarraian, bide osoa deskribatuko dugu: Airflow instalatzetik API Experimentala erabiliz DAG bat abiarazten duen POST eskaera bat sortzera arte. Ubuntu 16.04rekin lan egingo dugu.

1. Aire-fluxuaren instalazioa

Egiaztatu Python 3 eta virtualenv ditugula.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Hauetako bat falta bada, instalatu.

Orain sor dezagun direktorio bat Airflow-ekin lanean jarraituko dugun.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalatu Airflow:

(venv) $ pip install airflow

Landu genuen bertsioa: 1.10.

Orain direktorio bat sortu behar dugu airflow_home, non DAG fitxategiak eta Airflow pluginak kokatuko dira. Direktorioa sortu ondoren, ezarri ingurune-aldagaia AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Hurrengo urratsa SQLite-n datu-fluxuaren datu-basea sortu eta hasieratuko duen komandoa exekutatu da:

(venv) $ airflow initdb

urtean sortuko da datu-basea airflow.db lehenetsia.

Egiaztatu Airflow instalatuta dagoen:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Komandoak funtzionatu bazuen, orduan Airflow-ek bere konfigurazio fitxategia sortu zuen airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow-ek web interfazea du. Komandoa exekutatuz abiarazi daiteke:

(venv) $ airflow webserver --port 8081

Orain web-interfazea atzi dezakezu Airflow exekutatzen ari zen ostalariko 8081 atakako arakatzaile batean, honela: <hostname:8081>.

2. API esperimentalarekin lan egitea

Honetan Airflow konfiguratuta dago eta prest dago. Hala ere, Experimental APIa ere exekutatu behar dugu. Gure zuzentzaileak Python-en idatzita daude, beraz, eskaera guztiak liburutegia erabiliz bertan egongo dira requests.

Egia esan APIa dagoeneko lan egiten ari da eskaera errazetarako. Adibidez, eskaera horrek bere lana probatzeko aukera ematen du:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Erantzun gisa horrelako mezu bat jaso baduzu, dena funtzionatzen ari dela esan nahi du.

Hala ere, DAG bat abiarazi nahi dugunean, mota honetako eskaerak autentifikaziorik gabe ezin direla egin topo egiten dugu.

Horretarako, hainbat ekintza egin beharko dituzu.

Lehenik eta behin, hau gehitu behar duzu konfigurazioan:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Ondoren, zure erabiltzailea sortu behar duzu administratzaile eskubideekin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ondoren, DAG abiarazte bat egiteko baimenduko duen eskubide arruntak dituen erabiltzaile bat sortu behar duzu.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Orain dena prest dago.

3. POST eskaera bat abiarazi

POST eskaerak berak itxura hau izango du:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Eskaera behar bezala prozesatu da.

Horren arabera, orduan DAGari denbora pixka bat ematen diogu ClickHouse taulari eskaera bat egiteko eta prozesatzeko, kontrol-datuen paketea harrapatu nahian.

Egiaztapena amaitu da.

Iturria: www.habr.com

Gehitu iruzkin berria