Hur man gör en DAG-utlösare i Airflow med hjälp av Experimental API

När vi förbereder våra utbildningsprogram stöter vi periodvis på svårigheter när det gäller att arbeta med vissa verktyg. Och i det ögonblick när vi stöter på dem finns det inte alltid tillräckligt med dokumentation och artiklar som skulle hjälpa till att hantera detta problem.

Så var det till exempel 2015, och vi använde Hadoop-klustret med Spark för 35 samtidiga användare på Big Data Specialist-programmet. Det var inte klart hur man skulle förbereda det för ett sådant användarfall med YARN. Som ett resultat, efter att ha listat ut och gått vägen på egen hand, gjorde de det inlägg på Habré och uppträdde också Moscow Spark Meetup.

förhistoria

Den här gången ska vi prata om ett annat program - Datatekniker. På den bygger våra deltagare två typer av arkitektur: lambda och kappa. Och i lamdba-arkitekturen används Airflow som en del av batchbearbetning för att överföra loggar från HDFS till ClickHouse.

Allt är i allmänhet bra. Låt dem bygga sina pipelines. Men det finns ett "men": alla våra program är tekniskt avancerade när det gäller själva inlärningsprocessen. För att kontrollera labbet använder vi automatiska checkers: deltagaren måste gå till sitt personliga konto, klicka på knappen "Kontrollera" och efter ett tag ser han någon form av utökad feedback om vad han gjorde. Och det är vid denna tidpunkt som vi börjar närma oss vårt problem.

Kontroll av detta labb ordnas enligt följande: vi skickar ett kontrolldatapaket till deltagarens Kafka, sedan överför Gobblin detta datapaket till HDFS, sedan tar Airflow detta datapaket och lägger det i ClickHouse. Tricket är att Airflow inte behöver göra detta i realtid, det gör det enligt schemat: en gång var 15:e minut tar det en massa filer och laddar upp dem.

Det visar sig att vi på något sätt måste trigga deras DAG på egen hand på vår begäran medan checkern körs här och nu. Googlade fick vi reda på att det för senare versioner av Airflow finns en sk Experimentellt API. Ordet experimental, det låter förstås läskigt, men vad ska man göra ... Det tar plötsligt fart.

Därefter kommer vi att beskriva hela vägen: från installation av Airflow till att generera en POST-begäran som utlöser en DAG med hjälp av Experimental API. Vi kommer att arbeta med Ubuntu 16.04.

1. Luftflödesinstallation

Låt oss kontrollera att vi har Python 3 och virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Om någon av dessa saknas, installera den.

Låt oss nu skapa en katalog där vi kommer att fortsätta arbeta med Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installera luftflöde:

(venv) $ pip install airflow

Version vi arbetade med: 1.10.

Nu måste vi skapa en katalog airflow_home, där DAG-filerna och Airflow-plugins kommer att finnas. När du har skapat katalogen, ställ in miljövariabeln AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Nästa steg är att köra kommandot som skapar och initierar dataflödesdatabasen i SQLite:

(venv) $ airflow initdb

Databasen kommer att skapas i airflow.db standard.

Kontrollera om Airflow är installerat:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Om kommandot fungerade skapade Airflow sin egen konfigurationsfil airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow har ett webbgränssnitt. Det kan startas genom att köra kommandot:

(venv) $ airflow webserver --port 8081

Du kan nu komma åt webbgränssnittet i en webbläsare på port 8081 på värden där Airflow kördes, så här: <hostname:8081>.

2. Arbeta med Experimentell API

På detta är Airflow konfigurerat och redo att gå. Men vi måste också köra Experimentell API. Våra pjäser är skrivna i Python, så vidare kommer alla förfrågningar att finnas på det med hjälp av biblioteket requests.

Egentligen fungerar API redan för enkla förfrågningar. Till exempel låter en sådan begäran dig testa dess arbete:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Om du fick ett sådant meddelande som svar betyder det att allt fungerar.

Men när vi vill utlösa en DAG stöter vi på det faktum att den här typen av begäran inte kan göras utan autentisering.

För att göra detta måste du göra ett antal åtgärder.

Först måste du lägga till detta i konfigurationen:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Sedan måste du skapa din användare med administratörsrättigheter:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Därefter måste du skapa en användare med normala rättigheter som kommer att tillåtas göra en DAG-utlösare.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nu är allt klart.

3. Starta en POST-begäran

Själva POST-begäran kommer att se ut så här:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Begäran har bearbetats.

Följaktligen ger vi DAG lite tid att bearbeta och göra en begäran till ClickHouse-tabellen och försöka fånga kontrolldatapaketet.

Verifiering slutförd.

Källa: will.com

Lägg en kommentar