在准备我们的教育计划时,我们会定期遇到使用某些工具的困难。 当我们遇到它们时,并不总是有足够的文档和文章来帮助解决这个问题。
例如,在 2015 年,我们在大数据专家计划中使用了 Hadoop 集群和 Spark,支持 35 个并发用户。 目前尚不清楚如何使用 YARN 为此类用户案例做好准备。 结果,他们自己摸索并走上了这条路,
史前
这次我们要讲的是一个不同的节目——
总体来说一切都很好。 让他们建造管道。 然而,有一个“但是”:我们所有的课程在学习过程本身方面都是技术先进的。 为了检查实验室,我们使用自动检查器:参与者需要转到他的个人帐户,单击“检查”按钮,过了一会儿,他会看到关于他所做的事情的某种扩展反馈。 正是在这一点上,我们开始解决我们的问题。
检查这个实验安排如下:我们发送一个控制数据包到参与者的Kafka,然后Gobblin将这个数据包传输到HDFS,然后Airflow将这个数据包放入ClickHouse。 诀窍在于 Airflow 不必实时执行此操作,而是按计划执行:每 15 分钟一次,它会获取一堆文件并上传它们。
事实证明,当检查器此时此刻运行时,我们需要根据我们的要求以某种方式自行触发他们的 DAG。 谷歌搜索,我们发现对于 Airflow 的更高版本,有一个所谓的 experimental
当然,这听起来很可怕,但是该怎么办……它突然起飞了。
接下来,我们将描述整个路径:从安装 Airflow 到使用实验 API 生成触发 DAG 的 POST 请求。 我们将使用 Ubuntu 16.04。
1.气流安装
让我们检查一下我们是否有 Python 3 和 virtualenv。
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
如果缺少其中之一,请安装它。
现在让我们创建一个目录,我们将在其中继续使用 Airflow。
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
安装气流:
(venv) $ pip install airflow
我们开发的版本:1.10。
现在我们需要创建一个目录 airflow_home
,DAG 文件和 Airflow 插件所在的位置。 创建目录后,设置环境变量 AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
下一步是运行将在 SQLite 中创建并初始化数据流数据库的命令:
(venv) $ airflow initdb
数据库将创建于 airflow.db
默认。
检查是否安装了 Airflow:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
如果该命令有效,则 Airflow 创建了自己的配置文件 airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow 有一个网络界面。 可以通过运行以下命令启动它:
(venv) $ airflow webserver --port 8081
现在,您可以在运行 Airflow 的主机上的端口 8081 上通过浏览器访问 Web 界面,如下所示: <hostname:8081>
.
2. 使用实验 API
此时气流已配置完毕并准备就绪。 但是,我们还需要运行实验 API。 我们的检查器是用 Python 编写的,因此所有请求都将使用该库进行处理 requests
.
实际上,该 API 已经可以处理简单的请求了。 例如,这样的请求允许您测试其工作:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
如果您收到这样的回复消息,则意味着一切正常。
然而,当我们想要触发 DAG 时,我们会遇到这样的情况:未经身份验证就无法发出此类请求。
为此,您需要执行许多操作。
首先,您需要将其添加到配置中:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
然后,您需要创建具有管理员权限的用户:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
接下来,您需要创建一个具有普通权限的用户,该用户将被允许进行 DAG 触发器。
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
现在一切都准备好了。
3.发起POST请求
POST 请求本身将如下所示:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
请求处理成功。
相应地,然后我们给DAG一些时间来处理并向ClickHouse表发出请求,尝试捕获控制数据包。
验证完成。
来源: habr.com