如何使用实验 API 在 Airflow 中创建 DAG 触发器

在准备我们的教育计划时,我们会定期遇到使用某些工具的困难。 当我们遇到它们时,并不总是有足够的文档和文章来帮助解决这个问题。

例如,在 2015 年,我们在大数据专家计划中使用了 Hadoop 集群和 Spark,支持 35 个并发用户。 目前尚不清楚如何使用 YARN 为此类用户案例做好准备。 结果,他们自己摸索并走上了这条路, 发表在哈布雷 并且还表演了 莫斯科 Spark 聚会.

史前

这次我们要讲的是一个不同的节目—— 数据工程师。 在其上,我们的参与者构建了两种类型的架构:lambda 和 kappa。 并且在lamdba架构中,Airflow被用作批处理的一部分,将日志从HDFS传输到ClickHouse。

总体来说一切都很好。 让他们建造管道。 然而,有一个“但是”:我们所有的课程在学习过程本身方面都是技术先进的。 为了检查实验室,我们使用自动检查器:参与者需要转到他的个人帐户,单击“检查”按钮,过了一会儿,他会看到关于他所做的事情的某种扩展反馈。 正是在这一点上,我们开始解决我们的问题。

检查这个实验安排如下:我们发送一个控制数据包到参与者的Kafka,然后Gobblin将这个数据包传输到HDFS,然后Airflow将这个数据包放入ClickHouse。 诀窍在于 Airflow 不必实时执行此操作,而是按计划执行:每 15 分钟一次,它会获取一堆文件并上传它们。

事实证明,当检查器此时此刻运行时,我们需要根据我们的要求以某种方式自行触发他们的 DAG。 谷歌搜索,我们发现对于 Airflow 的更高版本,有一个所谓的 实验性API。 这个词 experimental当然,这听起来很可怕,但是该怎么办……它突然起飞了。

接下来,我们将描述整个路径:从安装 Airflow 到使用实验 API 生成触发 DAG 的 POST 请求。 我们将使用 Ubuntu 16.04。

1.气流安装

让我们检查一下我们是否有 Python 3 和 virtualenv。

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

如果缺少其中之一,请安装它。

现在让我们创建一个目录,我们将在其中继续使用 Airflow。

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

安装气流:

(venv) $ pip install airflow

我们开发的版本:1.10。

现在我们需要创建一个目录 airflow_home,DAG 文件和 Airflow 插件所在的位置。 创建目录后,设置环境变量 AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

下一步是运行将在 SQLite 中创建并初始化数据流数据库的命令:

(venv) $ airflow initdb

数据库将创建于 airflow.db 默认。

检查是否安装了 Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

如果该命令有效,则 Airflow 创建了自己的配置文件 airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow 有一个网络界面。 可以通过运行以下命令启动它:

(venv) $ airflow webserver --port 8081

现在,您可以在运行 Airflow 的主机上的端口 8081 上通过浏览器访问 Web 界面,如下所示: <hostname:8081>.

2. 使用实验 API

此时气流已配置完毕并准备就绪。 但是,我们还需要运行实验 API。 我们的检查器是用 Python 编写的,因此所有请求都将使用该库进行处理 requests.

实际上,该 API 已经可以处理简单的请求了。 例如,这样的请求允许您测试其工作:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

如果您收到这样的回复消息,则意味着一切正常。

然而,当我们想要触发 DAG 时,我们会遇到这样的情况:未经身份验证就无法发出此类请求。

为此,您需要执行许多操作。

首先,您需要将其添加到配置中:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

然后,您需要创建具有管理员权限的用户:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

接下来,您需要创建一个具有普通权限的用户,该用户将被允许进行 DAG 触发器。

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

现在一切都准备好了。

3.发起POST请求

POST 请求本身将如下所示:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

请求处理成功。

相应地,然后我们给DAG一些时间来处理并向ClickHouse表发出请求,尝试捕获控制数据包。

验证完成。

来源: habr.com

添加评论