Kako napraviti DAG okidač u Airflowu pomoću eksperimentalnog API-ja

U pripremi naših obrazovnih programa povremeno se susrećemo s poteškoćama u radu s nekim alatima. A u trenutku kada se s njima susrećemo, nema uvijek dovoljno dokumentacije i članaka koji bi pomogli u rješavanju ovog problema.

Tako je bilo, primjerice, 2015. godine, a koristili smo Hadoop klaster sa Sparkom za 35 istovremenih korisnika na programu Big Data Specialist. Nije bilo jasno kako ga pripremiti za takav korisnički slučaj koristeći YARN. Kao rezultat toga, nakon što su sami shvatili i krenuli putem, to su i učinili objaviti na Habréu a također i izvedena Moscow Spark Meetup.

prapovijest

Ovaj put ćemo govoriti o drugačijem programu - Inženjer podataka. Na njemu naši polaznici grade dvije vrste arhitekture: lambda i kappa. A u arhitekturi lamdba, Airflow se koristi kao dio skupne obrade za prijenos zapisa iz HDFS-a u ClickHouse.

Općenito je sve dobro. Neka grade svoje cjevovode. Međutim, postoji jedno „ali“: svi naši programi su tehnološki napredni u smislu samog procesa učenja. Za provjeru laboratorija koristimo se automatskim provjerivačima: sudionik treba otići na svoj osobni račun, kliknuti gumb "Provjeri" i nakon nekog vremena vidi neku vrstu proširene povratne informacije o tome što je napravio. I upravo u ovoj točki počinjemo pristupati našem problemu.

Provjera ovog laboratorija organizirana je na sljedeći način: šaljemo kontrolni paket podataka sudionikovom Kafki, zatim Gobblin prenosi ovaj paket podataka u HDFS, zatim Airflow uzima ovaj paket podataka i stavlja ga u ClickHouse. Trik je u tome što Airflow to ne mora raditi u stvarnom vremenu, on to radi prema rasporedu: jednom svakih 15 minuta preuzima hrpu datoteka i prenosi ih.

Ispada da trebamo nekako sami pokrenuti njihov DAG na naš zahtjev dok checker radi ovdje i sada. Guglajući saznali smo da za kasnije verzije Airflowa postoji tzv Eksperimentalni API, Riječ experimental, naravno, zvuči zastrašujuće, ali što učiniti ... Odjednom poleti.

Zatim ćemo opisati cijeli put: od instaliranja Airflowa do generiranja POST zahtjeva koji pokreće DAG pomoću eksperimentalnog API-ja. Radit ćemo s Ubuntu 16.04.

1. Instalacija protoka zraka

Provjerimo imamo li Python 3 i virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ako jedan od njih nedostaje, instalirajte ga.

Sada stvorimo direktorij u kojem ćemo nastaviti raditi s Airflowom.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalirajte protok zraka:

(venv) $ pip install airflow

Verzija na kojoj smo radili: 1.10.

Sada moramo stvoriti imenik airflow_home, gdje će se nalaziti DAG datoteke i Airflow dodaci. Nakon kreiranja direktorija postavite varijablu okruženja AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Sljedeći korak je pokretanje naredbe koja će stvoriti i inicijalizirati bazu podataka protoka podataka u SQLite-u:

(venv) $ airflow initdb

Baza će biti kreirana u airflow.db zadano.

Provjerite je li instaliran Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ako je naredba uspjela, Airflow je stvorio vlastitu konfiguracijsku datoteku airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ima web sučelje. Može se pokrenuti izvršavanjem naredbe:

(venv) $ airflow webserver --port 8081

Sada možete pristupiti web sučelju u pregledniku na priključku 8081 na glavnom računalu na kojem se izvodio Airflow, ovako: <hostname:8081>.

2. Rad s eksperimentalnim API-jem

Na ovom je protok zraka konfiguriran i spreman za rad. Međutim, također moramo pokrenuti Eksperimentalni API. Naše dame su napisane u Pythonu, tako da će dalje svi zahtjevi biti na njemu koristeći biblioteku requests.

Zapravo API već radi za jednostavne zahtjeve. Na primjer, takav zahtjev vam omogućuje testiranje njegovog rada:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ako ste kao odgovor dobili takvu poruku, to znači da sve radi.

Međutim, kada želimo pokrenuti DAG, nailazimo na činjenicu da se ova vrsta zahtjeva ne može napraviti bez autentifikacije.

Da biste to učinili, morat ćete učiniti niz radnji.

Prvo morate dodati ovo u konfiguraciju:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Zatim trebate kreirati svog korisnika s administratorskim pravima:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Zatim morate stvoriti korisnika s normalnim pravima kojem će biti dopušteno napraviti DAG okidač.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sada je sve spremno.

3. Pokretanje POST zahtjeva

Sam POST zahtjev će izgledati ovako:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Zahtjev je uspješno obrađen.

U skladu s tim, DAG-u dajemo malo vremena da obradi i podnese zahtjev tablici ClickHouse, pokušavajući uhvatiti kontrolni paket podataka.

Provjera dovršena.

Izvor: www.habr.com

Dodajte komentar