Jak zrobić wyzwalacz DAG w Airflow za pomocą Experimental API

Przygotowując nasze programy edukacyjne, okresowo napotykamy trudności w pracy z niektórymi narzędziami. A w momencie, gdy je napotykamy, nie zawsze jest wystarczająca dokumentacja i artykuły, które pomogłyby poradzić sobie z tym problemem.

Tak było na przykład w 2015 roku i korzystaliśmy z klastra Hadoop ze Spark dla 35 jednoczesnych użytkowników w programie Big Data Specialist. Nie było jasne, jak przygotować go do takiego przypadku użytkownika za pomocą YARN. W rezultacie, wymyśliwszy i idąc ścieżką na własną rękę, zrobili to post na Habré a także występował Moskiewskie spotkanie Spark.

prehistoria

Tym razem porozmawiamy o innym programie - Data Engineer. Na niej nasi uczestnicy budują dwa rodzaje architektury: lambda i kappa. A w architekturze lambdba Airflow jest używany jako część przetwarzania wsadowego do przesyłania logów z HDFS do ClickHouse.

Wszystko jest generalnie dobre. Niech budują rurociągi. Jest jednak jedno „ale”: wszystkie nasze programy są zaawansowane technologicznie pod względem samego procesu nauki. Aby sprawdzić laboratorium, używamy automatycznych sprawdzarek: uczestnik musi przejść do swojego konta osobistego, kliknąć przycisk „Sprawdź”, a po chwili widzi jakąś rozszerzoną informację zwrotną na temat tego, co zrobił. I w tym momencie zaczynamy zbliżać się do naszego problemu.

Sprawdzenie tego laboratorium jest zorganizowane w następujący sposób: wysyłamy pakiet danych kontrolnych do Kafki uczestnika, następnie Gobblin przesyła ten pakiet danych do HDFS, następnie Airflow pobiera ten pakiet danych i umieszcza go w ClickHouse. Sztuczka polega na tym, że Airflow nie musi tego robić w czasie rzeczywistym, robi to zgodnie z harmonogramem: raz na 15 minut pobiera kilka plików i przesyła je.

Okazuje się, że musimy jakoś samodzielnie uruchomić ich DAG na naszą prośbę, podczas gdy sprawdzanie działa tu i teraz. Googlując dowiedzieliśmy się, że dla późniejszych wersji Airflow istnieje tzw Eksperymentalny interfejs API. Słowo experimental, oczywiście brzmi to strasznie, ale co robić... Nagle startuje.

Następnie opiszemy całą ścieżkę: od instalacji Airflow do wygenerowania żądania POST, które uruchamia DAG za pomocą Experimental API. Będziemy pracować z Ubuntu 16.04.

1. Instalacja przepływu powietrza

Sprawdźmy, czy mamy Pythona 3 i virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Jeśli brakuje jednego z nich, zainstaluj go.

Teraz utwórzmy katalog, w którym będziemy kontynuować pracę z Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Zainstaluj przepływ powietrza:

(venv) $ pip install airflow

Wersja na której pracowaliśmy: 1.10.

Teraz musimy utworzyć katalog airflow_home, gdzie będą znajdować się pliki DAG i wtyczki Airflow. Po utworzeniu katalogu ustaw zmienną środowiskową AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Kolejnym krokiem jest uruchomienie polecenia, które utworzy i zainicjuje bazę danych przepływu danych w SQLite:

(venv) $ airflow initdb

Baza danych zostanie utworzona w airflow.db domyślny.

Sprawdź, czy zainstalowany jest przepływ powietrza:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Jeśli polecenie zadziałało, Airflow utworzył własny plik konfiguracyjny airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ma interfejs sieciowy. Można go uruchomić wydając polecenie:

(venv) $ airflow webserver --port 8081

Możesz teraz uzyskać dostęp do interfejsu sieciowego w przeglądarce na porcie 8081 na hoście, na którym działał Airflow, w ten sposób: <hostname:8081>.

2. Praca z eksperymentalnym API

Na tym Airflow jest skonfigurowany i gotowy do pracy. Jednak musimy również uruchomić Experimental API. Nasze warcaby są napisane w Pythonie, więc dalej wszystkie żądania będą na nim za pomocą biblioteki requests.

W rzeczywistości interfejs API działa już w przypadku prostych żądań. Na przykład takie żądanie pozwala przetestować jego działanie:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Jeśli otrzymałeś taką wiadomość w odpowiedzi, oznacza to, że wszystko działa.

Jednak gdy chcemy uruchomić DAG, napotykamy na fakt, że tego rodzaju żądania nie można wykonać bez uwierzytelnienia.

Aby to zrobić, będziesz musiał wykonać szereg czynności.

Najpierw musisz dodać to do konfiguracji:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Następnie musisz utworzyć użytkownika z uprawnieniami administratora:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Następnie musisz utworzyć użytkownika z normalnymi uprawnieniami, który będzie mógł tworzyć wyzwalacz DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Teraz wszystko jest gotowe.

3. Uruchomienie żądania POST

Samo żądanie POST będzie wyglądać następująco:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Żądanie przetworzone pomyślnie.

W związku z tym dajemy DAG trochę czasu na przetworzenie i wysłanie żądania do tabeli ClickHouse, próbując złapać pakiet danych kontrolnych.

Weryfikacja zakończona.

Źródło: www.habr.com

Dodaj komentarz