U pripremi naših obrazovnih programa povremeno se susrećemo s poteškoćama u radu s nekim alatima. A u trenutku kada se s njima susrećemo, nema uvijek dovoljno dokumentacije i članaka koji bi pomogli u rješavanju ovog problema.
Tako je bilo, primjerice, 2015. godine, a koristili smo Hadoop klaster sa Sparkom za 35 istovremenih korisnika na programu Big Data Specialist. Nije bilo jasno kako ga pripremiti za takav korisnički slučaj koristeći YARN. Kao rezultat toga, nakon što su sami shvatili i krenuli putem, to su i učinili
prapovijest
Ovaj put ćemo govoriti o drugačijem programu -
Općenito je sve dobro. Neka grade svoje cjevovode. Međutim, postoji jedno „ali“: svi naši programi su tehnološki napredni u smislu samog procesa učenja. Za provjeru laboratorija koristimo se automatskim provjerivačima: sudionik treba otići na svoj osobni račun, kliknuti gumb "Provjeri" i nakon nekog vremena vidi neku vrstu proširene povratne informacije o tome što je napravio. I upravo u ovoj točki počinjemo pristupati našem problemu.
Provjera ovog laboratorija organizirana je na sljedeći način: šaljemo kontrolni paket podataka sudionikovom Kafki, zatim Gobblin prenosi ovaj paket podataka u HDFS, zatim Airflow uzima ovaj paket podataka i stavlja ga u ClickHouse. Trik je u tome što Airflow to ne mora raditi u stvarnom vremenu, on to radi prema rasporedu: jednom svakih 15 minuta preuzima hrpu datoteka i prenosi ih.
Ispada da trebamo nekako sami pokrenuti njihov DAG na naš zahtjev dok checker radi ovdje i sada. Guglajući saznali smo da za kasnije verzije Airflowa postoji tzv experimental
, naravno, zvuči zastrašujuće, ali što učiniti ... Odjednom poleti.
Zatim ćemo opisati cijeli put: od instaliranja Airflowa do generiranja POST zahtjeva koji pokreće DAG pomoću eksperimentalnog API-ja. Radit ćemo s Ubuntu 16.04.
1. Instalacija protoka zraka
Provjerimo imamo li Python 3 i virtualenv.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Ako jedan od njih nedostaje, instalirajte ga.
Sada stvorimo direktorij u kojem ćemo nastaviti raditi s Airflowom.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Instalirajte protok zraka:
(venv) $ pip install airflow
Verzija na kojoj smo radili: 1.10.
Sada moramo stvoriti imenik airflow_home
, gdje će se nalaziti DAG datoteke i Airflow dodaci. Nakon kreiranja direktorija postavite varijablu okruženja AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Sljedeći korak je pokretanje naredbe koja će stvoriti i inicijalizirati bazu podataka protoka podataka u SQLite-u:
(venv) $ airflow initdb
Baza će biti kreirana u airflow.db
zadano.
Provjerite je li instaliran Airflow:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Ako je naredba uspjela, Airflow je stvorio vlastitu konfiguracijsku datoteku airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow ima web sučelje. Može se pokrenuti izvršavanjem naredbe:
(venv) $ airflow webserver --port 8081
Sada možete pristupiti web sučelju u pregledniku na priključku 8081 na glavnom računalu na kojem se izvodio Airflow, ovako: <hostname:8081>
.
2. Rad s eksperimentalnim API-jem
Na ovom je protok zraka konfiguriran i spreman za rad. Međutim, također moramo pokrenuti Eksperimentalni API. Naše dame su napisane u Pythonu, tako da će dalje svi zahtjevi biti na njemu koristeći biblioteku requests
.
Zapravo API već radi za jednostavne zahtjeve. Na primjer, takav zahtjev vam omogućuje testiranje njegovog rada:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Ako ste kao odgovor dobili takvu poruku, to znači da sve radi.
Međutim, kada želimo pokrenuti DAG, nailazimo na činjenicu da se ova vrsta zahtjeva ne može napraviti bez autentifikacije.
Da biste to učinili, morat ćete učiniti niz radnji.
Prvo morate dodati ovo u konfiguraciju:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Zatim trebate kreirati svog korisnika s administratorskim pravima:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Zatim morate stvoriti korisnika s normalnim pravima kojem će biti dopušteno napraviti DAG okidač.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Sada je sve spremno.
3. Pokretanje POST zahtjeva
Sam POST zahtjev će izgledati ovako:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
Zahtjev je uspješno obrađen.
U skladu s tim, DAG-u dajemo malo vremena da obradi i podnese zahtjev tablici ClickHouse, pokušavajući uhvatiti kontrolni paket podataka.
Provjera dovršena.
Izvor: www.habr.com