Hoe maak je een DAG-trigger in Airflow met behulp van de Experimentele API

Bij het voorbereiden van onze educatieve programma's ondervinden we regelmatig moeilijkheden bij het werken met sommige tools. En op het moment dat we ze tegenkomen, is er niet altijd voldoende documentatie en artikelen die kunnen helpen om dit probleem op te lossen.

Zo was het bijvoorbeeld in 2015 en gebruikten we het Hadoop-cluster met Spark voor 35 gelijktijdige gebruikers in het Big Data Specialist-programma. Het was niet duidelijk hoe het met YARN voor zo'n gebruikerscase moest worden voorbereid. Als resultaat, nadat ze het pad zelf hadden bedacht en bewandeld, deden ze dat post op Habré en ook uitgevoerd Moskou Spark Meetup.

prehistorie

Deze keer zullen we het hebben over een ander programma - Data Engineer. Daarop bouwen onze deelnemers twee soorten architectuur: lambda en kappa. En in de lamdba-architectuur wordt Airflow gebruikt als onderdeel van batchverwerking om logboeken over te dragen van HDFS naar ClickHouse.

Alles is over het algemeen goed. Laat ze hun pijpleidingen bouwen. Er is echter een "maar": al onze programma's zijn technologisch geavanceerd in termen van het leerproces zelf. Om het lab te controleren, gebruiken we automatische checkers: de deelnemer moet naar zijn persoonlijke account gaan, op de knop "Controleren" klikken en na een tijdje ziet hij een soort uitgebreide feedback over wat hij heeft gedaan. En het is op dit punt dat we ons probleem beginnen te benaderen.

Het controleren van dit lab is als volgt geregeld: we sturen een besturingsdatapakket naar de Kafka van de deelnemer, dan zet Gobblin dit datapakket over naar HDFS, dan neemt Airflow dit datapakket en stopt het in ClickHouse. De truc is dat Airflow dit niet in realtime hoeft te doen, maar op schema: eens in de 15 minuten neemt het een aantal bestanden en uploadt deze.

Het blijkt dat we op de een of andere manier hun DAG op ons verzoek moeten activeren terwijl de checker hier en nu wordt uitgevoerd. Googelend kwamen we erachter dat er voor latere versies van Airflow een zgn Experimentele API. woord experimental, het klinkt natuurlijk eng, maar wat te doen ... Het gaat plotseling van start.

Vervolgens beschrijven we het hele pad: van het installeren van Airflow tot het genereren van een POST-verzoek dat een DAG activeert met behulp van de Experimentele API. We zullen werken met Ubuntu 16.04.

1. Luchtstroominstallatie

Laten we eens kijken of we Python 3 en virtualenv hebben.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Als een van deze ontbreekt, installeer deze dan.

Laten we nu een map maken waarin we zullen blijven werken met Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Luchtstroom installeren:

(venv) $ pip install airflow

Versie waaraan we hebben gewerkt: 1.10.

Nu moeten we een map maken airflow_home, waar de DAG-bestanden en Airflow-plug-ins zich bevinden. Stel na het maken van de map de omgevingsvariabele in AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

De volgende stap is het uitvoeren van de opdracht die de gegevensstroomdatabase in SQLite zal maken en initialiseren:

(venv) $ airflow initdb

De database wordt aangemaakt in airflow.db standaard.

Controleer of Airflow is geïnstalleerd:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Als de opdracht werkte, heeft Airflow zijn eigen configuratiebestand gemaakt airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow heeft een webinterface. Het kan worden gestart door de opdracht uit te voeren:

(venv) $ airflow webserver --port 8081

Je hebt nu toegang tot de webinterface in een browser op poort 8081 op de host waarop Airflow draaide, zoals dit: <hostname:8081>.

2. Werken met de Experimentele API

Hierop is Airflow geconfigureerd en klaar voor gebruik. We moeten echter ook de experimentele API uitvoeren. Onze checkers zijn geschreven in Python, dus verder zullen alle verzoeken erop staan ​​​​met behulp van de bibliotheek requests.

Eigenlijk werkt de API al voor eenvoudige verzoeken. Met een dergelijk verzoek kunt u bijvoorbeeld zijn werk testen:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Als je zo'n bericht als reactie hebt ontvangen, betekent dit dat alles werkt.

Wanneer we echter een DAG willen triggeren, lopen we tegen het feit aan dat dit soort verzoeken niet kunnen worden gedaan zonder authenticatie.

Om dit te doen, moet u een aantal acties uitvoeren.

Eerst moet je dit toevoegen aan de configuratie:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Vervolgens moet u uw gebruiker met beheerdersrechten maken:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Vervolgens moet u een gebruiker maken met normale rechten die een DAG-trigger mag maken.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nu is alles klaar.

3. Een POST-verzoek lanceren

Het POST-verzoek zelf ziet er als volgt uit:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Verzoek succesvol verwerkt.

Dienovereenkomstig geven we de DAG wat tijd om te verwerken en een verzoek in te dienen bij de ClickHouse-tabel, in een poging het besturingsgegevenspakket te vangen.

Verificatie voltooid.

Bron: www.habr.com

Voeg een reactie