在準備我們的教育計劃時,我們會定期遇到使用某些工具的困難。 當我們遇到它們時,並不總是有足夠的文檔和文章來幫助解決這個問題。
例如,在 2015 年,我們在大數據專家計劃中使用了 Hadoop 集群和 Spark,支持 35 個並髮用戶。 目前尚不清楚如何使用 YARN 為此類用戶案例做好準備。 結果,他們自己摸索並走上了這條路,
底
這次我們要講的是一個不同的節目——
總體來說一切都很好。 讓他們建造管道。 然而,有一個“但是”:我們所有的課程在學習過程本身方面都是技術先進的。 為了檢查實驗室,我們使用自動檢查器:參與者需要轉到他的個人帳戶,單擊“檢查”按鈕,過了一會兒,他會看到關於他所做的事情的某種擴展反饋。 正是在這一點上,我們開始解決我們的問題。
檢查這個實驗的安排如下:我們發送一個控制數據包到參與者的Kafka,然後Gobblin將該數據包傳輸到HDFS,然後Airflow將這個數據包放入ClickHouse中。 訣竅在於 Airflow 不必實時執行此操作,而是按計劃執行:每 15 分鐘一次,它會獲取一堆文件並上傳它們。
事實證明,當檢查器此時此刻運行時,我們需要根據我們的要求以某種方式自行觸發他們的 DAG。 谷歌搜索,我們發現對於 Airflow 的更高版本,有一個所謂的 experimental
當然,這聽起來很可怕,但是該怎麼辦……它突然起飛了。
接下來,我們將描述整個路徑:從安裝 Airflow 到使用實驗 API 生成觸發 DAG 的 POST 請求。 我們將使用 Ubuntu 16.04。
1.氣流安裝
讓我們檢查一下我們是否有 Python 3 和 virtualenv。
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
如果缺少其中之一,請安裝它。
現在讓我們創建一個目錄,我們將在其中繼續使用 Airflow。
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
安裝氣流:
(venv) $ pip install airflow
我們開發的版本:1.10。
現在我們需要創建一個目錄 airflow_home
,DAG 文件和 Airflow 插件所在的位置。 創建目錄後,設置環境變量 AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
下一步是運行將在 SQLite 中創建並初始化數據流數據庫的命令:
(venv) $ airflow initdb
數據庫將創建於 airflow.db
默認。
檢查是否安裝了 Airflow:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
如果該命令有效,則 Airflow 創建了自己的配置文件 airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow 有一個網絡界面。 可以通過運行以下命令啟動它:
(venv) $ airflow webserver --port 8081
現在,您可以在運行 Airflow 的主機上的端口 8081 上通過瀏覽器訪問 Web 界面,如下所示: <hostname:8081>
.
2. 使用實驗 API
此時氣流已配置完畢並準備就緒。 但是,我們還需要運行實驗 API。 我們的檢查器是用 Python 編寫的,因此所有請求都將使用該庫進行處理 requests
.
實際上,該 API 已經可以處理簡單的請求了。 例如,這樣的請求允許您測試其工作:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
如果您收到這樣的回复消息,則意味著一切正常。
然而,當我們想要觸發 DAG 時,我們會遇到這樣的情況:未經身份驗證就無法發出此類請求。
為此,您需要執行許多操作。
首先,您需要將其添加到配置中:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
然後,您需要創建具有管理員權限的用戶:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
接下來,您需要創建一個具有普通權限的用戶,該用戶將被允許進行 DAG 觸發器。
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
現在一切都準備好了。
3.發起POST請求
POST 請求本身將如下所示:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
請求處理成功。
相應地,然後我們給DAG一些時間來處理並向ClickHouse表發出請求,嘗試捕獲控制數據包。
驗證完成。
來源: www.habr.com