Nella preparazione dei nostri programmi educativi, incontriamo periodicamente difficoltà in termini di lavoro con alcuni strumenti. E nel momento in cui li incontriamo, non sempre c'è abbastanza documentazione e articoli che possano aiutare ad affrontare questo problema.
Così è stato, ad esempio, nel 2015, e abbiamo utilizzato il cluster Hadoop con Spark per 35 utenti simultanei nel programma Big Data Specialist. Non era chiaro come prepararlo per un caso utente di questo tipo utilizzando YARN. Di conseguenza, dopo aver capito e percorso il percorso da soli, lo hanno fatto e anche eseguito .
Sfondo
Questa volta parleremo di un programma diverso - . Su di esso i nostri partecipanti costruiscono due tipi di architettura: lambda e kappa. Inoltre, nell'architettura lambdba, Airflow viene utilizzato come parte dell'elaborazione batch per trasferire i log da HDFS a ClickHouse.
Tutto sommato va bene. Lasciamo che costruiscano le loro pipeline. Tuttavia, c'è un ma: tutti i nostri programmi sono tecnologici dal punto di vista del processo di apprendimento stesso. Per verificare il laboratorio, utilizziamo dei verificatori automatici: il partecipante deve accedere al suo account personale, cliccare sul pulsante "Verifica" e, dopo un po' di tempo, riceve un feedback dettagliato su ciò che ha fatto. Ed è a questo punto che iniziamo ad affrontare il nostro problema.
Il controllo di questo laboratorio è organizzato come segue: inviamo un pacchetto di dati di controllo al Kafka del partecipante, quindi Gobblin trasferisce questo pacchetto di dati a HDFS, quindi Airflow prende questo pacchetto di dati e lo inserisce in ClickHouse. Il trucco è che Airflow non deve farlo in tempo reale, lo fa nei tempi previsti: una volta ogni 15 minuti prende un mucchio di file e li carica.
Si scopre che dobbiamo in qualche modo attivare il loro DAG da soli su nostra richiesta mentre il controllo è in esecuzione qui e ora. Cercando su Google, abbiamo scoperto che per le versioni successive di Airflow esiste un cosiddetto . La parola experimental, certo, sembra spaventoso, ma cosa fare ... All'improvviso decolla.
Successivamente, descriveremo l'intero processo: dall'installazione di Airflow alla creazione di una richiesta POST che attiva un DAG utilizzando l'API sperimentale. Lavoreremo con Ubuntu 16.04
1. Installazione del flusso d'aria
Controlliamo di avere Python 3 e virtualenv.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0Se manca uno di questi, installalo.
Ora creiamo una directory in cui continueremo a lavorare con Airflow.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $Installa il flusso d'aria:
(venv) $ pip install airflowVersione su cui abbiamo lavorato: 1.10.
Ora dobbiamo creare una directory airflow_home, dove verranno posizionati i file DAG e i plugin Airflow. Dopo aver creato la directory, imposta la variabile di ambiente AIRFLOW_HOME.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>Il passaggio successivo è eseguire il comando che creerà e inizializzerà il database del flusso di dati in SQLite:
(venv) $ airflow initdbIl database verrà creato in airflow.db predefinito.
Controlla se Airflow è installato:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0Se il comando ha funzionato, Airflow ha creato il proprio file di configurazione airflow.cfg в AIRFLOW_HOME:
$ tree
.
├── airflow.cfg
└── unittests.cfgAirflow ha un'interfaccia web. Può essere lanciato eseguendo il comando:
(venv) $ airflow webserver --port 8081Ora puoi accedere all'interfaccia web in un browser sulla porta 8081 sull'host su cui era in esecuzione Airflow, in questo modo: <hostname:8081>.
2. Lavorare con l'API sperimentale
Su questo Airflow è configurato e pronto all'uso. Tuttavia, dobbiamo anche eseguire l'API sperimentale. I nostri controllori sono scritti in Python, quindi tutte le richieste verranno inviate utilizzando la libreria requests.
In realtà l'API sta già funzionando per richieste semplici. Ad esempio, tale richiesta consente di testarne il funzionamento:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'Se hai ricevuto un messaggio del genere in risposta, significa che tutto funziona.
Tuttavia, quando vogliamo attivare un DAG, ci imbattiamo nel fatto che questo tipo di richiesta non può essere effettuata senza autenticazione.
Per fare ciò, dovrai eseguire una serie di azioni.
Per prima cosa devi aggiungere questo alla configurazione:
[api]
auth_backend = airflow.contrib.auth.backends.password_authQuindi, devi creare il tuo utente con diritti di amministratore:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()Successivamente, è necessario creare un utente con diritti normali a cui sarà consentito eseguire un trigger DAG.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()Adesso è tutto pronto.
3. Lancio di una richiesta POST
La richiesta POST stessa sarà simile alla seguente:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'Richiesta elaborata con successo.
Di conseguenza, diamo al DAG un po' di tempo per elaborare e fare una richiesta alla tabella ClickHouse, cercando di catturare il pacchetto di dati di controllo.
Verifica completata.
Fonte: habr.com
