如何使用實驗 API 在 Airflow 中創建 DAG 觸發器

在準備我們的教育計劃時,我們會定期遇到使用某些工具的困難。 當我們遇到它們時,並不總是有足夠的文檔和文章來幫助解決這個問題。

例如,在 2015 年,我們在大數據專家計劃中使用了 Hadoop 集群和 Spark,支持 35 個並髮用戶。 目前尚不清楚如何使用 YARN 為此類用戶案例做好準備。 結果,他們自己摸索並走上了這條路, 發表在哈布雷 並且還表演了 莫斯科 Spark 聚會.

這次我們要講的是一個不同的節目—— 數據工程師。 在其上,我們的參與者構建了兩種類型的架構:lambda 和 kappa。 並且在lamdba架構中,Airflow被用作批處理的一部分,將日誌從HDFS傳輸到ClickHouse。

總體來說一切都很好。 讓他們建造管道。 然而,有一個“但是”:我們所有的課程在學習過程本身方面都是技術先進的。 為了檢查實驗室,我們使用自動檢查器:參與者需要轉到他的個人帳戶,單擊“檢查”按鈕,過了一會兒,他會看到關於他所做的事情的某種擴展反饋。 正是在這一點上,我們開始解決我們的問題。

檢查這個實驗的安排如下:我們發送一個控制數據包到參與者的Kafka,然後Gobblin將該數據包傳輸到HDFS,然後Airflow將這個數據包放入ClickHouse中。 訣竅在於 Airflow 不必實時執行此操作,而是按計劃執行:每 15 分鐘一次,它會獲取一堆文件並上傳它們。

事實證明,當檢查器此時此刻運行時,我們需要根據我們的要求以某種方式自行觸發他們的 DAG。 谷歌搜索,我們發現對於 Airflow 的更高版本,有一個所謂的 實驗性API。 單詞 experimental當然,這聽起來很可怕,但是該怎麼辦……它突然起飛了。

接下來,我們將描述整個路徑:從安裝 Airflow 到使用實驗 API 生成觸發 DAG 的 POST 請求。 我們將使用 Ubuntu 16.04。

1.氣流安裝

讓我們檢查一下我們是否有 Python 3 和 virtualenv。

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

如果缺少其中之一,請安裝它。

現在讓我們創建一個目錄,我們將在其中繼續使用 Airflow。

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

安裝氣流:

(venv) $ pip install airflow

我們開發的版本:1.10。

現在我們需要創建一個目錄 airflow_home,DAG 文件和 Airflow 插件所在的位置。 創建目錄後,設置環境變量 AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

下一步是運行將在 SQLite 中創建並初始化數據流數據庫的命令:

(venv) $ airflow initdb

數據庫將創建於 airflow.db 默認。

檢查是否安裝了 Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

如果該命令有效,則 Airflow 創建了自己的配置文件 airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow 有一個網絡界面。 可以通過運行以下命令啟動它:

(venv) $ airflow webserver --port 8081

現在,您可以在運行 Airflow 的主機上的端口 8081 上通過瀏覽器訪問 Web 界面,如下所示: <hostname:8081>.

2. 使用實驗 API

此時氣流已配置完畢並準備就緒。 但是,我們還需要運行實驗 API。 我們的檢查器是用 Python 編寫的,因此所有請求都將使用該庫進行處理 requests.

實際上,該 API 已經可以處理簡單的請求了。 例如,這樣的請求允許您測試其工作:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

如果您收到這樣的回复消息,則意味著一切正常。

然而,當我們想要觸發 DAG 時,我們會遇到這樣的情況:未經身份驗證就無法發出此類請求。

為此,您需要執行許多操作。

首先,您需要將其添加到配置中:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

然後,您需要創建具有管理員權限的用戶:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

接下來,您需要創建一個具有普通權限的用戶,該用戶將被允許進行 DAG 觸發器。

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

現在一切都準備好了。

3.發起POST請求

POST 請求本身將如下所示:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

請求處理成功。

相應地,然後我們給DAG一些時間來處理並向ClickHouse表發出請求,嘗試捕獲控制數據包。

驗證完成。

來源: www.habr.com

添加評論