Kako napraviti DAG okidač u Airflow koristeći eksperimentalni API

Prilikom pripreme naših edukativnih programa povremeno se susrećemo sa poteškoćama u radu sa određenim alatima. A u trenutku kada ih naiđemo, nema uvijek dovoljno dokumentacije i članaka koji bi nam pomogli da se nosimo s ovim problemom.

Tako je bilo, na primjer, 2015. godine, a tokom programa “Big Data Specialist” koristili smo Hadoop klaster sa Sparkom za 35 istovremenih korisnika. Nije bilo jasno kako ga pripremiti za takav slučaj upotrebe koristeći YARN. Na kraju, nakon što smo to shvatili i prošetali stazom, jesmo objava na Habréu a također je nastupio na Moscow Spark Meetup.

prapovijest

Ovaj put ćemo pričati o drugačijem programu - Data Engineer. Naši učesnici na njemu grade dva tipa arhitekture: lambda i kapa. A u lamdba arhitekturi, kao dio batch obrade, Airflow se koristi za prijenos dnevnika iz HDFS-a u ClickHouse.

Sve je generalno dobro. Neka sami grade svoje cjevovode. Međutim, postoji „ali“: svi naši programi su tehnološki napredni sa stanovišta samog procesa učenja. Za provjeru laboratorije koristimo automatske provjere: učesnik treba da ode na svoj lični nalog, klikne dugme „Proveri“ i nakon nekog vremena vidi neku vrstu proširene povratne informacije o tome šta je uradio. I u ovom trenutku počinjemo da pristupamo našem problemu.

Verifikacija ove laboratorije je strukturirana ovako: šaljemo paket kontrolnih podataka na Kafku učesnika, zatim Gobblin prenosi ovaj paket podataka u HDFS, zatim Airflow uzima ovaj paket podataka i stavlja ga u ClickHouse. Trik je u tome što Airflow ne mora to da radi u realnom vremenu, već prema rasporedu: svakih 15 minuta uzima gomilu fajlova i postavlja ih.

Ispostavilo se da moramo nekako sami pokrenuti njihov DAG na naš zahtjev dok provjeravač radi ovdje i sada. Guglanjem smo saznali da za kasnije verzije Airflow-a postoji tzv Eksperimentalni API. Reč experimental, naravno, zvuči zastrašujuće, ali šta da se radi... Odjednom poleti.

Zatim ćemo opisati cijeli put: od instaliranja Airflow-a do generiranja POST zahtjeva koji pokreće DAG koristeći eksperimentalni API. Radićemo sa Ubuntu 16.04.

1. Instalacija protoka zraka

Provjerimo da imamo Python 3 i virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ako nešto od ovoga nedostaje, instalirajte ga.

Sada napravimo direktorij u kojem ćemo nastaviti raditi sa Airflow-om.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalirajte protok zraka:

(venv) $ pip install airflow

Verzija na kojoj smo radili: 1.10.

Sada moramo kreirati direktorij airflow_home, gdje će se nalaziti DAG datoteke i dodaci Airflow. Nakon kreiranja direktorija, postavite varijablu okruženja AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Sljedeći korak je pokretanje naredbe koja će kreirati i inicijalizirati bazu podataka toka podataka u SQLite-u:

(venv) $ airflow initdb

Baza podataka će biti kreirana u airflow.db default.

Provjerimo da li je Airflow instaliran:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ako je komanda uspjela, tada je Airflow kreirao vlastiti konfiguracijski fajl airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ima web interfejs. Može se pokrenuti pokretanjem naredbe:

(venv) $ airflow webserver --port 8081

Sada možete pogoditi web sučelje u pretraživaču na portu 8081 na hostu gdje je Airflow bio pokrenut, na primjer: <hostname:8081>.

2. Rad s eksperimentalnim API-jem

U ovom trenutku, Airflow je konfigurisan i spreman za rad. Međutim, također moramo pokrenuti eksperimentalni API. Naši čekeri su napisani na Python-u, tako da će dalje svi zahtjevi biti u njemu koristeći biblioteku requests.

U stvari, API već radi za jednostavne zahtjeve. Na primjer, ovaj zahtjev vam omogućava da testirate njegov rad:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ako dobijete takvu poruku kao odgovor, to znači da sve radi.

Međutim, kada želimo pokrenuti DAG, suočeni smo s činjenicom da se ova vrsta zahtjeva ne može napraviti bez autentifikacije.

Da biste to učinili, morat ćete napraviti još nekoliko koraka.

Prvo morate dodati ovo u konfiguraciju:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Zatim morate kreirati svog korisnika sa administratorskim pravima:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Zatim morate kreirati korisnika s normalnim pravima kojem će biti dozvoljeno da pokrene DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sada je sve spremno.

3. Pokrenite POST zahtjev

Sam POST zahtjev će izgledati ovako:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Zahtjev je uspješno obrađen.

U skladu s tim, dajemo DAG-u malo vremena da obradi i uputi zahtjev tablici ClickHouse, pokušavajući uhvatiti paket kontrolnih podataka.

Provjera završena.

izvor: www.habr.com

Dodajte komentar