Sådan laver du en DAG-trigger i Airflow ved hjælp af Experimental API

Ved udarbejdelsen af ​​vores uddannelsesprogrammer støder vi med jævne mellemrum på vanskeligheder i forhold til at arbejde med nogle værktøjer. Og i det øjeblik, vi støder på dem, er der ikke altid nok dokumentation og artikler, der kan hjælpe med at håndtere dette problem.

Sådan var det for eksempel i 2015, og vi brugte Hadoop-klyngen med Spark til 35 samtidige brugere på Big Data Specialist-programmet. Det var ikke klart, hvordan man forbereder det til en sådan brugertilfælde ved hjælp af YARN. Som et resultat, efter at have fundet ud af og gå stien på egen hånd, gjorde de det indlæg på Habré og også udført Moscow Spark Meetup.

forhistorie

Denne gang vil vi tale om et andet program - Data Engineer. På den bygger vores deltagere to typer arkitektur: lambda og kappa. Og i lamdba-arkitekturen bruges Airflow som en del af batchbehandling til at overføre logfiler fra HDFS til ClickHouse.

Alt er generelt godt. Lad dem bygge deres rørledninger. Der er dog et "men": alle vores programmer er teknologisk avancerede med hensyn til selve læreprocessen. For at tjekke laboratoriet bruger vi automatiske checkers: Deltageren skal gå til sin personlige konto, klikke på knappen "Check", og efter et stykke tid ser han en form for udvidet feedback på, hvad han gjorde. Og det er på dette tidspunkt, vi begynder at nærme os vores problem.

Kontrol af dette laboratorium er arrangeret som følger: vi sender en kontroldatapakke til deltagerens Kafka, derefter overfører Gobblin denne datapakke til HDFS, derefter tager Airflow denne datapakke og lægger den i ClickHouse. Tricket er, at Airflow ikke behøver at gøre dette i realtid, det gør det efter planen: en gang hvert 15. minut tager det en masse filer og uploader dem.

Det viser sig, at vi på en eller anden måde skal udløse deres DAG på vores egen anmodning, mens checkeren kører her og nu. Googling fandt vi ud af, at der til senere versioner af Airflow findes en såkaldt Eksperimentel API. Ordet experimental, selvfølgelig, det lyder skræmmende, men hvad skal man gøre ... Det tager pludselig fart.

Dernæst vil vi beskrive hele stien: fra installation af Airflow til generering af en POST-anmodning, der udløser en DAG ved hjælp af Experimental API. Vi vil arbejde med Ubuntu 16.04.

1. Installation af luftstrøm

Lad os tjekke, at vi har Python 3 og virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Hvis en af ​​disse mangler, så installer den.

Lad os nu oprette en mappe, hvor vi vil fortsætte med at arbejde med Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installer luftstrøm:

(venv) $ pip install airflow

Version vi arbejdede på: 1.10.

Nu skal vi oprette en mappe airflow_home, hvor DAG-filerne og Airflow-plugins vil være placeret. Efter oprettelse af mappen skal du indstille miljøvariablen AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Det næste trin er at køre kommandoen, der vil oprette og initialisere dataflowdatabasen i SQLite:

(venv) $ airflow initdb

Databasen oprettes i airflow.db Standard.

Tjek om Airflow er installeret:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Hvis kommandoen virkede, oprettede Airflow sin egen konfigurationsfil airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow har en webgrænseflade. Det kan startes ved at køre kommandoen:

(venv) $ airflow webserver --port 8081

Du kan nu få adgang til webgrænsefladen i en browser på port 8081 på værten, hvor Airflow kørte, sådan her: <hostname:8081>.

2. Arbejde med den eksperimentelle API

På denne er Airflow konfigureret og klar til at gå. Vi skal dog også køre Experimental API. Vores brikker er skrevet i Python, så yderligere vil alle anmodninger være på det ved hjælp af biblioteket requests.

Faktisk arbejder API'en allerede for simple anmodninger. For eksempel giver en sådan anmodning dig mulighed for at teste dens arbejde:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Hvis du modtog en sådan besked som svar, betyder det, at alt fungerer.

Men når vi vil udløse en DAG, støder vi ind i det faktum, at denne form for anmodning ikke kan laves uden autentificering.

For at gøre dette skal du udføre en række handlinger.

Først skal du tilføje dette til konfigurationen:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Derefter skal du oprette din bruger med administratorrettigheder:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Dernæst skal du oprette en bruger med normale rettigheder, som får lov til at lave en DAG-trigger.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nu er alt klar.

3. Lancering af en POST-anmodning

Selve POST-anmodningen vil se sådan ud:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Anmodningen blev behandlet.

I overensstemmelse hermed giver vi DAG'en lidt tid til at behandle og fremsætte en anmodning til ClickHouse-tabellen og forsøge at fange kontroldatapakken.

Bekræftelse afsluttet.

Kilde: www.habr.com

Tilføj en kommentar