Jak vytvořit spouštěč DAG v Airflow pomocí Experimental API

Při přípravě našich vzdělávacích programů se pravidelně setkáváme s obtížemi při práci s některými nástroji. A v okamžiku, kdy se s nimi setkáme, není vždy dostatek dokumentace a článků, které by pomohly tento problém zvládnout.

Tak to bylo například v roce 2015 a my jsme použili cluster Hadoop se Sparkem pro 35 současných uživatelů v programu Big Data Specialist. Nebylo jasné, jak jej připravit pro takový uživatelský případ pomocí YARN. V důsledku toho, když přišli na cestu a šli po cestě sami, udělali to příspěvek na Habré a také vystupoval Moskevské setkání Spark.

pravěk

Tentokrát budeme mluvit o jiném programu - Datový inženýr. Na něm naši účastníci staví dva typy architektury: lambda a kappa. A v architektuře lamdba se Airflow používá jako součást dávkového zpracování k přenosu protokolů z HDFS do ClickHouse.

Vše je obecně dobré. Ať si postaví potrubí. Existuje však jedno „ale“: všechny naše programy jsou technologicky pokročilé, pokud jde o samotný proces učení. Ke kontrole laboratoře používáme automatické kontroly: účastník musí přejít na svůj osobní účet, kliknout na tlačítko „Zkontrolovat“ a po chvíli uvidí nějakou rozšířenou zpětnou vazbu o tom, co udělal. A právě v tomto bodě začínáme přistupovat k našemu problému.

Kontrola této laboratoře je uspořádána následovně: pošleme kontrolní datový paket do Kafky účastníka, poté Gobblin přenese tento datový paket do HDFS, poté Airflow vezme tento datový paket a vloží jej do ClickHouse. Trik je v tom, že Airflow to nemusí dělat v reálném čase, dělá to podle plánu: jednou za 15 minut vezme spoustu souborů a nahraje je.

Ukazuje se, že musíme na naši žádost nějak sami spustit jejich DAG, zatímco kontrola běží tady a teď. Googlem jsme zjistili, že pro pozdější verze Airflow existuje tzv Experimentální API. Slovo experimental, samozřejmě, zní to děsivě, ale co dělat... Najednou to vzlétne.

Dále popíšeme celou cestu: od instalace Airflow až po vygenerování požadavku POST, který spouští DAG pomocí Experimental API. Budeme pracovat s Ubuntu 16.04.

1. Instalace proudění vzduchu

Zkontrolujeme, že máme Python 3 a virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Pokud jeden z nich chybí, nainstalujte jej.

Nyní si vytvoříme adresář, ve kterém budeme s Airflow dále pracovat.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalace Airflow:

(venv) $ pip install airflow

Verze, na které jsme pracovali: 1.10.

Nyní musíme vytvořit adresář airflow_home, kde budou umístěny soubory DAG a pluginy Airflow. Po vytvoření adresáře nastavte proměnnou prostředí AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Dalším krokem je spuštění příkazu, který vytvoří a inicializuje databázi toku dat v SQLite:

(venv) $ airflow initdb

Databáze bude vytvořena v airflow.db výchozí.

Zkontrolujte, zda je nainstalován Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Pokud příkaz fungoval, Airflow vytvořil svůj vlastní konfigurační soubor airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow má webové rozhraní. Lze jej spustit spuštěním příkazu:

(venv) $ airflow webserver --port 8081

Nyní můžete přistupovat k webovému rozhraní v prohlížeči na portu 8081 na hostiteli, kde běžel Airflow, takto: <hostname:8081>.

2. Práce s Experimental API

V tomto je proudění vzduchu nakonfigurováno a připraveno k použití. Musíme však také spustit Experimental API. Naše dáma je napsána v Pythonu, takže další požadavky budou na ní pomocí knihovny requests.

Ve skutečnosti API pro jednoduché požadavky již funguje. Takový požadavek vám například umožňuje otestovat jeho práci:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Pokud jste obdrželi takovou zprávu jako odpověď, znamená to, že vše funguje.

Když však chceme spustit DAG, narazíme na skutečnost, že tento druh požadavku nelze provést bez ověření.

Chcete-li to provést, budete muset provést řadu akcí.

Nejprve musíte do konfigurace přidat toto:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Poté musíte vytvořit uživatele s právy správce:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Dále musíte vytvořit uživatele s normálními právy, který bude moci vytvořit spouštěč DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nyní je vše připraveno.

3. Spuštění požadavku POST

Samotný požadavek POST bude vypadat takto:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Žádost byla úspěšně zpracována.

V souladu s tím pak dáme DAG nějaký čas na zpracování a odeslání požadavku do tabulky ClickHouse, ve snaze zachytit paket řídicích dat.

Ověření dokončeno.

Zdroj: www.habr.com

Přidat komentář