実験用 API を使用して Airflow で DAG トリガーを作成する方法

教育プログラムを準備する際、いくつかのツールの使用に関して、定期的に困難に遭遇します。 そして、この問題に遭遇した時点では、この問題に対処するのに役立つ十分な文書や記事が常に存在するとは限りません。

たとえば、2015 年に、ビッグ データ スペシャリスト プログラムで 35 人の同時ユーザーに対して Spark を備えた Hadoop クラスターを使用しました。 YARN を使用してそのようなユーザーケースに備えて準備する方法は明確ではありませんでした。 その結果、彼らは自分たちで道を見つけて歩き出したのです。 ハブレに投稿する そして出演もしました モスクワスパークミートアップ.

背景

今回は別のプログラムについてお話します - データエンジニア。 その上で、参加者はラムダとカッパという XNUMX 種類のアーキテクチャを構築します。 また、lamdba アーキテクチャでは、HDFS から ClickHouse にログを転送するためのバッチ処理の一部として Airflow が使用されます。

すべてが概ね良好です。 彼らにパイプラインを構築させましょう。 ただし、「しかし」があります。私たちのプログラムはすべて、学習プロセス自体の点で技術的に進歩しています。 ラボをチェックするには、自動チェッカーを使用します。参加者は自分の個人アカウントに移動し、「チェック」ボタンをクリックする必要があります。すると、しばらくすると、自分が行ったことに関する何らかの拡張フィードバックが表示されます。 そしてこの時点で私たちは問題に取り組み始めます。

このラボの確認は次のように構成されています。制御データ パケットを参加者の Kafka に送信し、その後 Gobblin がこのデータ パケットを HDFS に転送し、Airflow がこのデータ パケットを取得して ClickHouse に置きます。 重要なのは、Airflow はこれをリアルタイムで実行する必要はなく、スケジュールに従って実行することです。つまり、15 分に XNUMX 回、大量のファイルを取得してアップロードします。

チェッカーが現在実行されている間に、リクエストに応じて何らかの方法で独自に DAG をトリガーする必要があることがわかりました。 グーグルで調べたところ、Airflow の新しいバージョンには、いわゆる 実験的API。 言葉 experimental、もちろん怖く聞こえますが、どうすればよいですか... それは突然離陸します。

次に、Airflow のインストールから、Experimental API を使用して DAG をトリガーする POST リクエストの生成までのパス全体を説明します。 Ubuntu 16.04で動作します。

1. エアフローの取り付け

Python 3 と virtualenv があることを確認してみましょう。

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

これらのいずれかが不足している場合は、インストールしてください。

次に、Airflow の操作を続けるディレクトリを作成しましょう。

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

エアフローをインストールします。

(venv) $ pip install airflow

私たちが取り組んだバージョン: 1.10。

次に、ディレクトリを作成する必要があります airflow_home, ここに、DAG ファイルと Airflow プラグインが配置されます。 ディレクトリを作成したら、環境変数を設定します AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

次のステップでは、SQLite でデータフロー データベースを作成して初期化するコマンドを実行します。

(venv) $ airflow initdb

データベースは次の場所に作成されます airflow.db ディフォルト。

Airflow がインストールされているかどうかを確認します。

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

コマンドが機能した場合、Airflow は独自の構成ファイルを作成しました。 airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow には Web インターフェイスがあります。 次のコマンドを実行して起動できます。

(venv) $ airflow webserver --port 8081

次のように、Airflow が実行されていたホストのポート 8081 でブラウザの Web インターフェイスにアクセスできるようになりました。 <hostname:8081>.

2. 実験的 API の使用

これで Airflow が設定され、すぐに使用できるようになります。 ただし、Experimental API も実行する必要があります。 私たちのチェッカーは Python で書かれているため、さらにすべてのリクエストはライブラリを使用してそのチェッカーに反映されます。 requests.

実際、API は単純なリクエストに対してはすでに機能しています。 たとえば、このようなリクエストを使用すると、その動作をテストできます。

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

応答としてこのようなメッセージを受け取った場合は、すべてが機能していることを意味します。

ただし、DAG をトリガーしたい場合、この種のリクエストは認証なしでは実行できないという事実に遭遇します。

これを行うには、いくつかのアクションを実行する必要があります。

まず、これを構成に追加する必要があります。

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

次に、管理者権限を持つユーザーを作成する必要があります。

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

次に、DAG トリガーの作成を許可される通常の権限を持つユーザーを作成する必要があります。

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

これですべての準備が整いました。

3. POSTリクエストの起動

POST リクエスト自体は次のようになります。

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

リクエストは正常に処理されました。

したがって、DAG に処理する時間を与えて、ClickHouse テーブルにリクエストを送信し、制御データ パケットをキャッチしようとします。

検証が完了しました。

出所: habr.com

コメントを追加します