Hoe om 'n DAG-sneller in Airflow te maak met behulp van die eksperimentele API

Met die voorbereiding van ons opvoedkundige programme, ondervind ons van tyd tot tyd probleme in terme van werk met sommige gereedskap. En op die oomblik wanneer ons hulle teëkom, is daar nie altyd genoeg dokumentasie en artikels wat sal help om hierdie probleem die hoof te bied nie.

So was dit byvoorbeeld in 2015, en ons het die Hadoop-groepering met Spark vir 35 gelyktydige gebruikers op die Big Data Specialist-program gebruik. Dit was nie duidelik hoe om dit met YARN vir so 'n gebruikersgeval voor te berei nie. Gevolglik het hulle die pad op hul eie uitgepluis en gestap plaas op Habré en ook opgetree Moskou Spark Meetup.

voorgeskiedenis

Hierdie keer sal ons praat oor 'n ander program - Data-ingenieur. Daarop bou ons deelnemers twee tipes argitektuur: lambda en kappa. En in die lamdba-argitektuur word Airflow as deel van bondelverwerking gebruik om logs van HDFS na ClickHouse oor te dra.

Alles is oor die algemeen goed. Laat hulle hul pypleidings bou. Daar is egter 'n "maar": al ons programme is tegnologies gevorderd wat die leerproses self betref. Om die laboratorium na te gaan, gebruik ons ​​outomatiese kontroleerders: die deelnemer moet na sy persoonlike rekening gaan, op die "Check"-knoppie klik, en na 'n rukkie sien hy 'n soort uitgebreide terugvoer oor wat hy gedoen het. En dit is op hierdie punt dat ons ons probleem begin benader.

Die kontrolering van hierdie laboratorium word soos volg gereël: ons stuur 'n beheerdatapakkie na die deelnemer se Kafka, dan dra Gobblin hierdie datapakkie oor na HDFS, dan neem Airflow hierdie datapakkie en plaas dit in ClickHouse. Die truuk is dat Airflow dit nie intyds hoef te doen nie, dit doen dit volgens skedule: een keer elke 15 minute neem dit 'n klomp lêers en laai dit op.

Dit blyk dat ons op een of ander manier hul DAG op ons eie moet aktiveer op ons versoek terwyl die checker hier en nou loop. Google het uitgevind dat daar vir latere weergawes van Airflow 'n sg Eksperimentele API. Die woord experimental, natuurlik, dit klink skrikwekkend, maar wat om te doen ... Dit neem skielik af.

Vervolgens sal ons die hele pad beskryf: van die installering van Airflow tot die generering van 'n POST-versoek wat 'n DAG aktiveer deur die eksperimentele API te gebruik. Ons sal met Ubuntu 16.04 werk.

1. Lugvloei installasie

Kom ons kyk of ons Python 3 en virtualenv het.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

As een van hierdie ontbreek, installeer dit dan.

Kom ons skep nou 'n gids waarin ons sal voortgaan om met Airflow te werk.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installeer Airflow:

(venv) $ pip install airflow

Weergawe waaraan ons gewerk het: 1.10.

Nou moet ons 'n gids skep airflow_home, waar die DAG-lêers en Airflow-inproppe geleë sal wees. Nadat u die gids geskep het, stel die omgewingsveranderlike in AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Die volgende stap is om die opdrag uit te voer wat die datavloeidatabasis in SQLite sal skep en inisialiseer:

(venv) $ airflow initdb

Die databasis sal geskep word in airflow.db verstek.

Kyk of Airflow geïnstalleer is:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

As die opdrag gewerk het, het Airflow sy eie konfigurasielêer geskep airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow het 'n webkoppelvlak. Dit kan geloods word deur die opdrag uit te voer:

(venv) $ airflow webserver --port 8081

Jy kan nou toegang tot die webkoppelvlak kry in 'n blaaier op poort 8081 op die gasheer waar Airflow loop, soos volg: <hostname:8081>.

2. Werk met die eksperimentele API

Op hierdie Airflow is gekonfigureer en gereed om te gaan. Ons moet egter ook die eksperimentele API laat loop. Ons checkers is in Python geskryf, so verder sal alle versoeke daarop wees deur die biblioteek te gebruik requests.

Eintlik werk die API reeds vir eenvoudige versoeke. So 'n versoek laat jou byvoorbeeld toe om sy werk te toets:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

As jy so 'n boodskap in reaksie ontvang het, beteken dit alles werk.

Wanneer ons egter 'n DAG wil aktiveer, loop ons in die feit dat hierdie soort versoek nie sonder verifikasie gedoen kan word nie.

Om dit te doen, sal jy 'n aantal aksies moet doen.

Eerstens moet jy dit by die konfigurasie voeg:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Dan moet jy jou gebruiker met admin regte skep:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Vervolgens moet jy 'n gebruiker met normale regte skep wat toegelaat sal word om 'n DAG-sneller te maak.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nou is alles gereed.

3. Begin 'n POST-versoek

Die POST-versoek self sal soos volg lyk:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Versoek suksesvol verwerk.

Gevolglik gee ons die DAG 'n bietjie tyd om te verwerk en 'n versoek aan die ClickHouse-tabel te rig, om die beheerdatapakket te probeer vang.

Verifikasie voltooi.

Bron: will.com

Voeg 'n opmerking