Come creare un trigger DAG in Airflow utilizzando l'API sperimentale

Nella preparazione dei nostri programmi educativi, incontriamo periodicamente difficoltà in termini di lavoro con alcuni strumenti. E nel momento in cui li incontriamo, non sempre c'è abbastanza documentazione e articoli che possano aiutare ad affrontare questo problema.

Così è stato, ad esempio, nel 2015, e abbiamo utilizzato il cluster Hadoop con Spark per 35 utenti simultanei nel programma Big Data Specialist. Non era chiaro come prepararlo per un caso utente di questo tipo utilizzando YARN. Di conseguenza, dopo aver capito e percorso il percorso da soli, lo hanno fatto post su Habré e anche eseguito Incontro Spark di Mosca.

Sfondo

Questa volta parleremo di un programma diverso - Ingegnere dati. Su di esso i nostri partecipanti costruiscono due tipi di architettura: lambda e kappa. Inoltre, nell'architettura lambdba, Airflow viene utilizzato come parte dell'elaborazione batch per trasferire i log da HDFS a ClickHouse.

In generale va tutto bene. Lasciamo che costruiscano i loro oleodotti. Esiste però un “ma”: tutti i nostri programmi sono tecnologicamente avanzati in termini di processo di apprendimento stesso. Per controllare il laboratorio utilizziamo controllori automatici: il partecipante deve accedere al proprio account personale, fare clic sul pulsante "Verifica" e dopo un po' vede una sorta di feedback esteso su ciò che ha fatto. Ed è a questo punto che cominciamo ad affrontare il nostro problema.

Il controllo di questo laboratorio è organizzato come segue: inviamo un pacchetto di dati di controllo al Kafka del partecipante, quindi Gobblin trasferisce questo pacchetto di dati a HDFS, quindi Airflow prende questo pacchetto di dati e lo inserisce in ClickHouse. Il trucco è che Airflow non deve farlo in tempo reale, lo fa nei tempi previsti: una volta ogni 15 minuti prende un mucchio di file e li carica.

Si scopre che dobbiamo in qualche modo attivare il loro DAG da soli su nostra richiesta mentre il controllo è in esecuzione qui e ora. Cercando su Google, abbiamo scoperto che per le versioni successive di Airflow esiste un cosiddetto API sperimentale. La parola experimental, certo, sembra spaventoso, ma cosa fare ... All'improvviso decolla.

Successivamente, descriveremo l'intero percorso: dall'installazione di Airflow alla generazione di una richiesta POST che attiva un DAG utilizzando l'API sperimentale. Lavoreremo con Ubuntu 16.04.

1. Installazione del flusso d'aria

Controlliamo di avere Python 3 e virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Se manca uno di questi, installalo.

Ora creiamo una directory in cui continueremo a lavorare con Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Installa il flusso d'aria:

(venv) $ pip install airflow

Versione su cui abbiamo lavorato: 1.10.

Ora dobbiamo creare una directory airflow_home, dove verranno posizionati i file DAG e i plugin Airflow. Dopo aver creato la directory, imposta la variabile di ambiente AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Il passaggio successivo è eseguire il comando che creerà e inizializzerà il database del flusso di dati in SQLite:

(venv) $ airflow initdb

Il database verrà creato in airflow.db predefinito.

Controlla se Airflow è installato:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Se il comando ha funzionato, Airflow ha creato il proprio file di configurazione airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ha un'interfaccia web. Può essere lanciato eseguendo il comando:

(venv) $ airflow webserver --port 8081

Ora puoi accedere all'interfaccia web in un browser sulla porta 8081 sull'host su cui era in esecuzione Airflow, in questo modo: <hostname:8081>.

2. Lavorare con l'API sperimentale

Su questo Airflow è configurato e pronto all'uso. Tuttavia, dobbiamo anche eseguire l'API sperimentale. I nostri controllori sono scritti in Python, quindi tutte le richieste verranno inviate utilizzando la libreria requests.

In realtà l'API sta già funzionando per richieste semplici. Ad esempio, tale richiesta consente di testarne il funzionamento:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Se hai ricevuto un messaggio del genere in risposta, significa che tutto funziona.

Tuttavia, quando vogliamo attivare un DAG, ci imbattiamo nel fatto che questo tipo di richiesta non può essere effettuata senza autenticazione.

Per fare ciò, dovrai eseguire una serie di azioni.

Per prima cosa devi aggiungere questo alla configurazione:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Quindi, devi creare il tuo utente con diritti di amministratore:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Successivamente, è necessario creare un utente con diritti normali a cui sarà consentito eseguire un trigger DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Adesso è tutto pronto.

3. Lancio di una richiesta POST

La richiesta POST stessa sarà simile alla seguente:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Richiesta elaborata con successo.

Di conseguenza, diamo al DAG un po' di tempo per elaborare e fare una richiesta alla tabella ClickHouse, cercando di catturare il pacchetto di dati di controllo.

Verifica completata.

Fonte: habr.com

Aggiungi un commento