DAG trigger létrehozása az Airflow-ban a Kísérleti API használatával

Oktatási programjaink elkészítése során időszakonként nehézségekbe ütközünk az egyes eszközökkel való munkavégzés során. És abban a pillanatban, amikor találkozunk velük, nem mindig áll rendelkezésre elegendő dokumentáció és cikk, amely segítene megbirkózni ezzel a problémával.

Így volt ez például 2015-ben is, és a Hadoop-fürtöt a Spark-kal 35 egyidejű felhasználó számára használtuk a Big Data Specialist programban. Nem volt világos, hogyan kell felkészíteni egy ilyen felhasználói esetre a YARN használatával. Ennek eredményeként, miután kitalálták és maguktól járták az utat, megtették bejegyzés Habré-n és előadták is Moszkvai Spark Meetup.

őstörténet

Ezúttal egy másik programról fogunk beszélni - Az adatok Engineer. Kétféle építészetet építenek rá résztvevőink: lambdát és kappát. A lamdba architektúrában pedig az Airflow-t a kötegelt feldolgozás részeként használják a naplók HDFS-ről ClickHouse-ba való átvitelére.

Általában minden jó. Hadd építsék meg a csővezetékeiket. Van azonban egy „de”: minden programunk technológiailag fejlett magát a tanulási folyamatot tekintve. A labor ellenőrzéséhez automatikus ellenőrzőket használunk: a résztvevőnek be kell lépnie a személyes fiókjába, rá kell kattintania az „Ellenőrzés” gombra, és egy idő után megjelenik valamilyen kiterjesztett visszajelzés a tetteiről. És ezen a ponton kezdünk közelíteni a problémánkhoz.

Ennek a labornak az ellenőrzése a következőképpen történik: küldünk egy vezérlő adatcsomagot a résztvevő Kafkájának, majd Gobblin ezt az adatcsomagot továbbítja a HDFS-nek, majd az Airflow veszi ezt az adatcsomagot és elhelyezi a ClickHouse-ban. A trükk az, hogy az Airflow-nak ezt nem valós időben kell megtennie, hanem ütemterv szerint: 15 percenként egy rakás fájlt készít és feltölt.

Kiderült, hogy a DAG-jukat saját magunktól kell valahogy kiváltanunk kérésünkre, miközben az ellenőrző itt és most fut. Guglizva megtudtuk, hogy az Airflow későbbi verzióihoz létezik egy ún Kísérleti API. A szó experimental, persze, ijesztően hangzik, de mit tegyek... Hirtelen felszáll.

Ezután leírjuk a teljes utat: az Airflow telepítésétől a POST-kérés generálásáig, amely elindít egy DAG-t a Kísérleti API használatával. Ubuntu 16.04-el fogunk dolgozni.

1. Légáramlás telepítése

Ellenőrizzük, hogy van-e Python 3 és virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ha ezek közül valamelyik hiányzik, telepítse.

Most hozzunk létre egy könyvtárat, amelyben folytatjuk az Airflow-val való munkát.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Légáramlás telepítése:

(venv) $ pip install airflow

Verzió, amelyen dolgoztunk: 1.10.

Most létre kell hoznunk egy könyvtárat airflow_home, ahol a DAG fájlok és az Airflow pluginek lesznek. A könyvtár létrehozása után állítsa be a környezeti változót AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

A következő lépés a parancs futtatása, amely létrehozza és inicializálja az adatfolyam-adatbázist az SQLite-ban:

(venv) $ airflow initdb

Az adatbázis ben jön létre airflow.db alapértelmezett.

Ellenőrizze, hogy az Airflow telepítve van-e:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ha a parancs működött, az Airflow létrehozta a saját konfigurációs fájlját airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Az Airflow webes felülettel rendelkezik. A parancs futtatásával indítható:

(venv) $ airflow webserver --port 8081

Mostantól elérheti a webes felületet egy böngészőben a 8081-es porton azon a gazdagépen, amelyen az Airflow futott, így: <hostname:8081>.

2. Munka a kísérleti API-val

Ezen az Airflow konfigurálva van, és használatra kész. Futtatnunk kell azonban a Kísérleti API-t is. Ellenőrzőink Pythonban vannak írva, így a továbbiakban minden kérés rajta lesz a könyvtár használatával requests.

Valójában az API már működik egyszerű kérések esetén. Például egy ilyen kérés lehetővé teszi a munkájának tesztelését:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ha ilyen üzenetet kapott válaszként, az azt jelenti, hogy minden működik.

Amikor azonban DAG-ot akarunk kiváltani, abba a ténybe ütközünk, hogy ezt a fajta kérést nem lehet hitelesítés nélkül benyújtani.

Ehhez számos műveletet kell végrehajtania.

Először is hozzá kell adnia ezt a konfigurációhoz:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Ezután létre kell hoznia a felhasználót rendszergazdai jogokkal:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ezután létre kell hoznia egy normál jogosultságokkal rendelkező felhasználót, aki DAG-triggert készíthet.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Most minden készen áll.

3. POST kérés indítása

Maga a POST kérés így fog kinézni:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

A kérés feldolgozása sikeresen megtörtént.

Ennek megfelelően a DAG-nak adunk egy kis időt, hogy feldolgozza és kérést intézzen a ClickHouse táblához, megpróbálva elkapni a vezérlő adatcsomagot.

Az ellenőrzés befejeződött.

Forrás: will.com

Hozzászólás