Como activar un DAG en Airflow usando a API experimental

Na elaboración dos nosos programas educativos atopámonos periodicamente con dificultades para traballar con algunhas ferramentas. E no momento en que nos atopamos con eles, non sempre hai suficiente documentación e artigos que axuden a facer fronte a este problema.

Así foi, por exemplo, en 2015, e utilizamos o clúster Hadoop con Spark para 35 usuarios simultáneos no programa Big Data Specialist. Non estaba claro como preparalo para tal caso de usuario usando YARN. Como resultado, despois de descubrir e percorrer o camiño por conta propia, fixérono publicar en Habré e tamén se realizou Encontro de Moscow Spark.

prehistoria

Esta vez falaremos dun programa diferente - Enxeñeiro de datos. Sobre ela, os nosos participantes constrúen dous tipos de arquitectura: lambda e kappa. E na arquitectura lamdba, Airflow úsase como parte do procesamento por lotes para transferir rexistros de HDFS a ClickHouse.

En xeral, todo é bo. Que constrúan os seus oleodutos. Non obstante, hai un "pero": todos os nosos programas están tecnoloxicamente avanzados en canto ao propio proceso de aprendizaxe. Para comprobar o laboratorio, usamos verificadores automáticos: o participante ten que ir á súa conta persoal, facer clic no botón "Comprobar" e despois dun tempo verá algún tipo de comentarios estendidos sobre o que fixo. E é neste momento cando comezamos a abordar o noso problema.

A verificación deste laboratorio organízase do seguinte xeito: enviamos un paquete de datos de control ao Kafka do participante, despois Gobblin transfire este paquete de datos a HDFS, despois Airflow toma este paquete de datos e colócao en ClickHouse. O truco é que Airflow non ten que facelo en tempo real, faino a tempo: unha vez cada 15 minutos leva un montón de ficheiros e cárgaos.

Resulta que, dalgún xeito, necesitamos activar o seu DAG pola nosa conta mentres o comprobador funciona aquí e agora. Buscando en Google, descubrimos que para as versións posteriores de Airflow hai un chamado API experimental. A palabra experimental, por suposto, soa asustado, pero que facer... De súpeto despega.

A continuación, describiremos todo o camiño: desde a instalación de Airflow ata a xeración dunha solicitude POST que desencadea un DAG mediante a API Experimental. Traballaremos con Ubuntu 16.04.

1. Instalación de fluxo de aire

Comprobamos que temos Python 3 e virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Se falta algún destes, instáleo.

Agora imos crear un directorio no que seguiremos traballando con Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalar Airflow:

(venv) $ pip install airflow

Versión na que traballamos: 1.10.

Agora necesitamos crear un directorio airflow_home, onde se localizarán os ficheiros DAG e os complementos de Airflow. Despois de crear o directorio, configure a variable de ambiente AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

O seguinte paso é executar o comando que creará e inicializará a base de datos de fluxo de datos en SQLite:

(venv) $ airflow initdb

A base de datos crearase en airflow.db por defecto.

Comproba se Airflow está instalado:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Se o comando funcionou, Airflow creou o seu propio ficheiro de configuración airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ten unha interface web. Pódese iniciar executando o comando:

(venv) $ airflow webserver --port 8081

Agora podes acceder á interface web nun navegador no porto 8081 do host onde se estaba executando Airflow, así: <hostname:8081>.

2. Traballando coa API Experimental

Neste Airflow está configurado e listo para funcionar. Non obstante, tamén necesitamos executar a API experimental. Os nosos verificadores están escritos en Python, polo que todas as solicitudes estarán nel usando a biblioteca requests.

En realidade, a API xa está funcionando para solicitudes sinxelas. Por exemplo, tal solicitude permítelle probar o seu traballo:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Se recibiches tal mensaxe como resposta, significa que todo está funcionando.

Non obstante, cando queremos activar un DAG, atopámonos co feito de que este tipo de solicitude non se pode facer sen autenticación.

Para iso, terás que facer unha serie de accións.

Primeiro, cómpre engadir isto á configuración:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Despois, debes crear o teu usuario con dereitos de administrador:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

A continuación, cómpre crear un usuario con dereitos normais ao que se lle permitirá facer un disparador DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Agora todo está listo.

3. Iniciando unha solicitude POST

A propia solicitude POST terá o seguinte aspecto:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

A solicitude procesouse correctamente.

En consecuencia, dámoslle tempo ao DAG para procesar e facer unha solicitude á táboa ClickHouse, tentando capturar o paquete de datos de control.

Verificación completada.

Fonte: www.habr.com

Engadir un comentario