Ved udarbejdelsen af vores uddannelsesprogrammer støder vi med jævne mellemrum på vanskeligheder i forhold til at arbejde med nogle værktøjer. Og i det øjeblik, vi støder på dem, er der ikke altid nok dokumentation og artikler, der kan hjælpe med at håndtere dette problem.
Sådan var det for eksempel i 2015, og vi brugte Hadoop-klyngen med Spark til 35 samtidige brugere på Big Data Specialist-programmet. Det var ikke klart, hvordan man forbereder det til en sådan brugertilfælde ved hjælp af YARN. Som et resultat, efter at have fundet ud af og gå stien på egen hånd, gjorde de det
forhistorie
Denne gang vil vi tale om et andet program -
Alt er generelt godt. Lad dem bygge deres rørledninger. Der er dog et "men": alle vores programmer er teknologisk avancerede med hensyn til selve læreprocessen. For at tjekke laboratoriet bruger vi automatiske checkers: Deltageren skal gå til sin personlige konto, klikke på knappen "Check", og efter et stykke tid ser han en form for udvidet feedback på, hvad han gjorde. Og det er på dette tidspunkt, vi begynder at nærme os vores problem.
Kontrol af dette laboratorium er arrangeret som følger: vi sender en kontroldatapakke til deltagerens Kafka, derefter overfører Gobblin denne datapakke til HDFS, derefter tager Airflow denne datapakke og lægger den i ClickHouse. Tricket er, at Airflow ikke behøver at gøre dette i realtid, det gør det efter planen: en gang hvert 15. minut tager det en masse filer og uploader dem.
Det viser sig, at vi på en eller anden måde skal udløse deres DAG på vores egen anmodning, mens checkeren kører her og nu. Googling fandt vi ud af, at der til senere versioner af Airflow findes en såkaldt experimental
, selvfølgelig, det lyder skræmmende, men hvad skal man gøre ... Det tager pludselig fart.
Dernæst vil vi beskrive hele stien: fra installation af Airflow til generering af en POST-anmodning, der udløser en DAG ved hjælp af Experimental API. Vi vil arbejde med Ubuntu 16.04.
1. Installation af luftstrøm
Lad os tjekke, at vi har Python 3 og virtualenv.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Hvis en af disse mangler, så installer den.
Lad os nu oprette en mappe, hvor vi vil fortsætte med at arbejde med Airflow.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Installer luftstrøm:
(venv) $ pip install airflow
Version vi arbejdede på: 1.10.
Nu skal vi oprette en mappe airflow_home
, hvor DAG-filerne og Airflow-plugins vil være placeret. Efter oprettelse af mappen skal du indstille miljøvariablen AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Det næste trin er at køre kommandoen, der vil oprette og initialisere dataflowdatabasen i SQLite:
(venv) $ airflow initdb
Databasen oprettes i airflow.db
Standard.
Tjek om Airflow er installeret:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Hvis kommandoen virkede, oprettede Airflow sin egen konfigurationsfil airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow har en webgrænseflade. Det kan startes ved at køre kommandoen:
(venv) $ airflow webserver --port 8081
Du kan nu få adgang til webgrænsefladen i en browser på port 8081 på værten, hvor Airflow kørte, sådan her: <hostname:8081>
.
2. Arbejde med den eksperimentelle API
På denne er Airflow konfigureret og klar til at gå. Vi skal dog også køre Experimental API. Vores brikker er skrevet i Python, så yderligere vil alle anmodninger være på det ved hjælp af biblioteket requests
.
Faktisk arbejder API'en allerede for simple anmodninger. For eksempel giver en sådan anmodning dig mulighed for at teste dens arbejde:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Hvis du modtog en sådan besked som svar, betyder det, at alt fungerer.
Men når vi vil udløse en DAG, støder vi ind i det faktum, at denne form for anmodning ikke kan laves uden autentificering.
For at gøre dette skal du udføre en række handlinger.
Først skal du tilføje dette til konfigurationen:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Derefter skal du oprette din bruger med administratorrettigheder:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Dernæst skal du oprette en bruger med normale rettigheder, som får lov til at lave en DAG-trigger.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Nu er alt klar.
3. Lancering af en POST-anmodning
Selve POST-anmodningen vil se sådan ud:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
Anmodningen blev behandlet.
I overensstemmelse hermed giver vi DAG'en lidt tid til at behandle og fremsætte en anmodning til ClickHouse-tabellen og forsøge at fange kontroldatapakken.
Bekræftelse afsluttet.
Kilde: www.habr.com