Hvordan lage en DAG-utløser i Airflow ved hjelp av Experimental API

Når vi utarbeider utdanningsprogrammene våre, møter vi med jevne mellomrom vanskeligheter med å jobbe med visse verktøy. Og i det øyeblikket vi møter dem, er det ikke alltid nok dokumentasjon og artikler som vil hjelpe oss med å takle dette problemet.

Dette var for eksempel tilfelle i 2015, og under «Big Data Specialist»-programmet brukte vi en Hadoop-klynge med Spark for 35 samtidige brukere. Det var ikke klart hvordan den skulle forberedes for en slik brukssituasjon ved å bruke GARN. Til slutt, etter å ha funnet ut av det og gått stien på egen hånd, gjorde vi det innlegg på Habré og opptrådte også kl Moskva Spark Meetup.

forhistorie

Denne gangen skal vi snakke om et annet program - Data Engineer. Våre deltakere bygger to typer arkitektur på den: lambda og kappa. Og i lamdba-arkitekturen, som en del av batchbehandlingen, brukes Airflow til å overføre logger fra HDFS til ClickHouse.

Alt er generelt bra. La dem bygge sine egne rørledninger. Imidlertid er det et "men": alle programmene våre er teknologisk avanserte fra synspunktet til selve læringsprosessen. For å sjekke laboratoriet bruker vi automatiske kontrollører: deltakeren må gå til sin personlige konto, klikke på "Sjekk"-knappen, og etter en tid ser han en slags utvidet tilbakemelding på hva han gjorde. Og det er i dette øyeblikket vi begynner å nærme oss problemet vårt.

Verifikasjonen av dette laboratoriet er strukturert slik: vi sender en kontrolldatapakke til deltakerens Kafka, deretter overfører Gobblin denne datapakken til HDFS, så tar Airflow denne datapakken og legger den i ClickHouse. Trikset er at Airflow ikke trenger å gjøre dette i sanntid, det gjør det i henhold til en tidsplan: hvert 15. minutt tar det en haug med filer og laster dem opp.

Det viser seg at vi på en eller annen måte må utløse deres DAG selv på vår forespørsel mens sjekken kjører her og nå. Etter googling fant vi ut at det for senere versjoner av Airflow finnes en såkalt Eksperimentell API. Ordet experimental, selvfølgelig, det høres skummelt ut, men hva skal jeg gjøre... Plutselig tar det av.

Deretter vil vi beskrive hele banen: fra installasjon av Airflow til å generere en POST-forespørsel som utløser DAG ved hjelp av Experimental API. Vi vil jobbe med Ubuntu 16.04.

1. Installasjon av luftstrøm

La oss sjekke at vi har Python 3 og virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Hvis noe av dette mangler, installer det.

La oss nå lage en katalog der vi vil fortsette å jobbe med Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installer luftstrøm:

(venv) $ pip install airflow

Versjonen vi jobbet med: 1.10.

Nå må vi lage en katalog airflow_home, hvor DAG-filer og Airflow-plugins vil være plassert. Etter å ha opprettet katalogen, angi miljøvariabelen AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Det neste trinnet er å kjøre en kommando som vil opprette og initialisere en dataflytdatabase i SQLite:

(venv) $ airflow initdb

Databasen vil bli opprettet i airflow.db misligholde.

La oss sjekke om Airflow er installert:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Hvis kommandoen fungerte, opprettet Airflow sin egen konfigurasjonsfil airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow har et webgrensesnitt. Den kan startes ved å kjøre kommandoen:

(venv) $ airflow webserver --port 8081

Du kan nå trykke på nettgrensesnittet i en nettleser på port 8081 på verten der Airflow kjørte, for eksempel: <hostname:8081>.

2. Arbeide med Experimental API

På dette tidspunktet er Airflow konfigurert og klar til bruk. Vi må imidlertid også kjøre Experimental API. Checkerne våre er skrevet i Python, så videre vil alle forespørsler være i den ved å bruke biblioteket requests.

Faktisk fungerer API allerede for enkle forespørsler. For eksempel lar denne forespørselen deg teste funksjonen:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Hvis du får en slik melding som svar, betyr det at alt fungerer.

Men når vi ønsker å utløse en DAG, står vi overfor det faktum at denne typen forespørsel ikke kan gjøres uten autentisering.

For å gjøre dette, må du utføre flere trinn.

Først må du legge til dette i konfigurasjonen:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Deretter må du opprette brukeren din med administratorrettigheter:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Deretter må du opprette en bruker med normale rettigheter som vil få lov til å utløse DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nå er alt klart.

3. Start en POST-forespørsel

Selve POST-forespørselen vil se slik ut:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Forespørselen ble behandlet.

Følgelig gir vi DAG litt tid til å behandle og sende en forespørsel til ClickHouse-tabellen, og prøve å fange kontrolldatapakken.

Kontroll fullført.

Kilde: www.habr.com

Legg til en kommentar