Hoe maak je een DAG-trigger in Airflow met behulp van de Experimentele API

Bij het voorbereiden van onze onderwijsprogramma's lopen wij geregeld tegen moeilijkheden aan bij het werken met bepaalde hulpmiddelen. En op de momenten dat we ze tegenkomen, is er niet altijd voldoende documentatie en artikelen beschikbaar die ons kunnen helpen met dit probleem om te gaan.

Dat was bijvoorbeeld in 2015 het geval en in het programma “Big Data Specialist” gebruikten we een Hadoop-cluster met Spark voor 35 gelijktijdige gebruikers. Het was onduidelijk hoe het met YARN voorbereid moest worden op een dergelijk gebruiksscenario. Uiteindelijk, nadat we het hadden uitgevogeld en het proces zelf hadden doorlopen, hebben we het gedaan post op Habré en trad ook op bij Moskou Spark Meetup.

prehistorie

Deze keer zullen we het over een ander programma hebben - Data Engineer. Onze deelnemers bouwen hierop twee soorten architectuur: lambda en kappa. En in de lamdba-architectuur wordt Airflow binnen de batchverwerking gebruikt om logs van HDFS naar ClickHouse over te brengen.

Over het algemeen is alles goed. Laat ze hun eigen pijplijn bouwen. Er is echter een maar: al onze programma's zijn technologisch vanuit het oogpunt van het leerproces zelf. Om het lab te controleren, gebruiken we automatische checkers: de deelnemer moet naar zijn persoonlijke account gaan, op de knop 'Controleren' klikken en na enige tijd ziet hij uitgebreide feedback over wat hij heeft gedaan. En op dat punt beginnen we ons probleem te benaderen.

De test in dit lab is als volgt opgezet: we sturen een controle-datapakket naar de Kafka-deelnemer, Gobblin stuurt dit datapakket vervolgens naar HDFS, Airflow neemt dit datapakket en plaatst het in ClickHouse. Het trucje is dat Airflow dit niet in realtime hoeft te doen, maar volgens een schema: elke 15 minuten haalt het een aantal bestanden op en uploadt ze.

Het blijkt dat we op een of andere manier zelf hun DAG moeten activeren op ons verzoek, tijdens de werking van de checker hier en nu. Na wat googlen kwamen we erachter dat er voor latere versies van Airflow een zogenaamde Experimentele API. woord experimental, het klinkt natuurlijk eng, maar wat kun je doen... Wat als het lukt?

Vervolgens beschrijven we het hele proces: van het installeren van Airflow tot het maken van een POST-verzoek dat een DAG activeert met behulp van de experimentele API. We gaan aan de slag met... Ubuntu 16.04.

1. Airflow installeren

Laten we controleren of we Python 3 en virtualenv hebben.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Indien een van deze items ontbreekt, installeer deze dan.

Laten we nu een map maken waarin we verder gaan werken met Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Laten we Airflow installeren:

(venv) $ pip install airflow

De versie waar wij mee werkten: 1.10.

Nu moeten we een directory aanmaken airflow_home, waar de DAG-bestanden en Airflow-plugins worden geplaatst. Nadat u de directory hebt aangemaakt, stelt u de omgevingsvariabele in AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

De volgende stap is het uitvoeren van de opdracht om de gegevensstroomdatabase in SQLite te maken en te initialiseren:

(venv) $ airflow initdb

De database wordt aangemaakt in airflow.db standaard.

Laten we controleren of Airflow is geïnstalleerd:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Als de opdracht werkte, heeft Airflow zijn configuratiebestand aangemaakt airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow heeft een webinterface. U kunt het starten door de volgende opdracht uit te voeren:

(venv) $ airflow webserver --port 8081

U kunt nu toegang krijgen tot de webinterface in uw browser op poort 8081 op de host waarop Airflow draaide, bijvoorbeeld: <hostname:8081>.

2. Werken met de experimentele API

Nu is Airflow ingesteld en klaar voor gebruik. We moeten echter nog wel de experimentele API lanceren. Onze checkers zijn geschreven in Python, dus alle verdere verzoeken zullen hierin worden verwerkt met behulp van de bibliotheek requests.

De API werkt feitelijk al voor eenvoudige verzoeken. Met deze query kunt u bijvoorbeeld de werking ervan testen:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Als u een dergelijk bericht als antwoord krijgt, betekent dit dat alles werkt.

Wanneer we echter een DAG willen activeren, stuiten we op het feit dat dit type verzoek niet kan worden gedaan zonder authenticatie.

Om dit te doen, moet u een aantal andere stappen uitvoeren.

Eerst moet je het volgende toevoegen aan de configuratie:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Vervolgens moet u een eigen gebruiker met beheerdersrechten aanmaken:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Vervolgens moet u een gebruiker met normale rechten aanmaken die een DAG-trigger mag maken.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nu is alles klaar.

3. Een POST-verzoek starten

Het POST-verzoek zelf ziet er als volgt uit:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Het verzoek is succesvol verwerkt.

Vervolgens geven we de DAG wat tijd om de gegevens te verwerken en een verzoek in te dienen bij de ClickHouse-tabel, in een poging het controlegegevenspakket op te vangen.

Verificatie voltooid.

Bron: www.habr.com

Koop betrouwbare hosting voor sites met DDoS-bescherming, VPS VDS-servers 🔥 Koop betrouwbare websitehosting met DDoS-bescherming, VPS- en VDS-servers | ProHoster