Nella preparazione dei nostri programmi educativi, incontriamo periodicamente difficoltà in termini di lavoro con alcuni strumenti. E nel momento in cui li incontriamo, non sempre c'è abbastanza documentazione e articoli che possano aiutare ad affrontare questo problema.
Così è stato, ad esempio, nel 2015, e abbiamo utilizzato il cluster Hadoop con Spark per 35 utenti simultanei nel programma Big Data Specialist. Non era chiaro come prepararlo per un caso utente di questo tipo utilizzando YARN. Di conseguenza, dopo aver capito e percorso il percorso da soli, lo hanno fatto
Sfondo
Questa volta parleremo di un programma diverso -
In generale va tutto bene. Lasciamo che costruiscano i loro oleodotti. Esiste però un “ma”: tutti i nostri programmi sono tecnologicamente avanzati in termini di processo di apprendimento stesso. Per controllare il laboratorio utilizziamo controllori automatici: il partecipante deve accedere al proprio account personale, fare clic sul pulsante "Verifica" e dopo un po' vede una sorta di feedback esteso su ciò che ha fatto. Ed è a questo punto che cominciamo ad affrontare il nostro problema.
Il controllo di questo laboratorio è organizzato come segue: inviamo un pacchetto di dati di controllo al Kafka del partecipante, quindi Gobblin trasferisce questo pacchetto di dati a HDFS, quindi Airflow prende questo pacchetto di dati e lo inserisce in ClickHouse. Il trucco è che Airflow non deve farlo in tempo reale, lo fa nei tempi previsti: una volta ogni 15 minuti prende un mucchio di file e li carica.
Si scopre che dobbiamo in qualche modo attivare il loro DAG da soli su nostra richiesta mentre il controllo è in esecuzione qui e ora. Cercando su Google, abbiamo scoperto che per le versioni successive di Airflow esiste un cosiddetto experimental
, certo, sembra spaventoso, ma cosa fare ... All'improvviso decolla.
Successivamente, descriveremo l'intero percorso: dall'installazione di Airflow alla generazione di una richiesta POST che attiva un DAG utilizzando l'API sperimentale. Lavoreremo con Ubuntu 16.04.
1. Installazione del flusso d'aria
Controlliamo di avere Python 3 e virtualenv.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Se manca uno di questi, installalo.
Ora creiamo una directory in cui continueremo a lavorare con Airflow.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Installa il flusso d'aria:
(venv) $ pip install airflow
Versione su cui abbiamo lavorato: 1.10.
Ora dobbiamo creare una directory airflow_home
, dove verranno posizionati i file DAG e i plugin Airflow. Dopo aver creato la directory, imposta la variabile di ambiente AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Il passaggio successivo è eseguire il comando che creerà e inizializzerà il database del flusso di dati in SQLite:
(venv) $ airflow initdb
Il database verrà creato in airflow.db
predefinito.
Controlla se Airflow è installato:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Se il comando ha funzionato, Airflow ha creato il proprio file di configurazione airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow ha un'interfaccia web. Può essere lanciato eseguendo il comando:
(venv) $ airflow webserver --port 8081
Ora puoi accedere all'interfaccia web in un browser sulla porta 8081 sull'host su cui era in esecuzione Airflow, in questo modo: <hostname:8081>
.
2. Lavorare con l'API sperimentale
Su questo Airflow è configurato e pronto all'uso. Tuttavia, dobbiamo anche eseguire l'API sperimentale. I nostri controllori sono scritti in Python, quindi tutte le richieste verranno inviate utilizzando la libreria requests
.
In realtà l'API sta già funzionando per richieste semplici. Ad esempio, tale richiesta consente di testarne il funzionamento:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Se hai ricevuto un messaggio del genere in risposta, significa che tutto funziona.
Tuttavia, quando vogliamo attivare un DAG, ci imbattiamo nel fatto che questo tipo di richiesta non può essere effettuata senza autenticazione.
Per fare ciò, dovrai eseguire una serie di azioni.
Per prima cosa devi aggiungere questo alla configurazione:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Quindi, devi creare il tuo utente con diritti di amministratore:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Successivamente, è necessario creare un utente con diritti normali a cui sarà consentito eseguire un trigger DAG.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Adesso è tutto pronto.
3. Lancio di una richiesta POST
La richiesta POST stessa sarà simile alla seguente:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
Richiesta elaborata con successo.
Di conseguenza, diamo al DAG un po' di tempo per elaborare e fare una richiesta alla tabella ClickHouse, cercando di catturare il pacchetto di dati di controllo.
Verifica completata.
Fonte: habr.com