ื›ื™ืฆื“ ืœื”ืคืขื™ืœ DAG ื‘-Airflow ื‘ืืžืฆืขื•ืช ื”-API ื”ื ื™ืกื™ื•ื ื™

ื‘ืขืช ื”ื›ื ืช ื”ืชื•ื›ื ื™ื•ืช ื”ื—ื™ื ื•ื›ื™ื•ืช ืฉืœื ื•, ืื ื• ื ืชืงืœื™ื ืžืขืช ืœืขืช ื‘ืงืฉื™ื™ื ื‘ื›ืœ ื”ืงืฉื•ืจ ืœืขื‘ื•ื“ื” ืขื ื›ืœื™ื ืžืกื•ื™ืžื™ื. ื•ื›ืจื’ืข ืฉืื ื• ื ืชืงืœื™ื ื‘ื”ื, ืœื ืชืžื™ื“ ื™ืฉ ืžืกืคื™ืง ืชื™ืขื•ื“ ื•ืžืืžืจื™ื ืฉื™ืขื–ืจื• ืœื ื• ืœื”ืชืžื•ื“ื“ ืขื ื”ื‘ืขื™ื” ื”ื–ื•.

ื–ื” ื”ื™ื” ื”ืžืงืจื”, ืœืžืฉืœ, ื‘ืฉื ืช 2015, ื•ื‘ืžื”ืœืš ื”ืชื•ื›ื ื™ืช "Big Data Specialist" ื”ืฉืชืžืฉื ื• ื‘ืืฉื›ื•ืœ Hadoop ืขื Spark ืขื‘ื•ืจ 35 ืžืฉืชืžืฉื™ื ื‘ื• ื–ืžื ื™ืช. ืœื ื”ื™ื” ื‘ืจื•ืจ ื›ื™ืฆื“ ืœื”ื›ื™ืŸ ืื•ืชื• ืœืžืงืจื” ืฉื™ืžื•ืฉ ื›ื–ื” ื‘ืืžืฆืขื•ืช YARN. ื‘ืกื•ืคื• ืฉืœ ื“ื‘ืจ, ืœืื—ืจ ืฉื”ื‘ื ื• ืืช ื–ื” ื•ื”ืœื›ื ื• ื‘ืฉื‘ื™ืœ ื‘ืขืฆืžื ื•, ืขืฉื™ื ื• ื–ืืช ืคื•ืกื˜ ืขืœ Habrรฉ ื•ื’ื ื”ื•ืคื™ืข ื‘ ืžื•ืกืงื‘ื” ืกืคืืจืง ืžืคื’ืฉ.

ืคืจื”ื™ืกื˜ื•ืจื™ื”

ื”ืคืขื ื ื“ื‘ืจ ืขืœ ืชื•ื›ื ื™ืช ืื—ืจืช - ืžื”ื ื“ืก ื ืชื•ื ื™ื. ื”ืžืฉืชืชืคื™ื ืฉืœื ื• ื‘ื•ื ื™ื ืขืœื™ื• ืฉื ื™ ืกื•ื’ื™ ืืจื›ื™ื˜ืงื˜ื•ืจื”: ืœืžื‘ื“ื” ื•ืงืืคื”. ื•ื‘ืืจื›ื™ื˜ืงื˜ื•ืจืช lamdba, ื›ื—ืœืง ืžืขื™ื‘ื•ื“ ืืฆื•ื•ื”, ื ืขืฉื” ืฉื™ืžื•ืฉ ื‘-Airflow ืœื”ืขื‘ืจืช ื™ื•ืžื ื™ื ืž-HDFS ืœ-ClickHouse.

ื”ื›ืœ ื˜ื•ื‘ ื‘ื“ืจืš ื›ืœืœ. ืชืŸ ืœื”ื ืœื‘ื ื•ืช ืฆื™ื ื•ืจื•ืช ืžืฉืœื”ื. ืขื ื–ืืช, ื™ืฉ "ืื‘ืœ": ื›ืœ ื”ืชื›ื ื™ื•ืช ืฉืœื ื• ืžืชืงื“ืžื•ืช ืžื‘ื—ื™ื ื” ื˜ื›ื ื•ืœื•ื’ื™ืช ืžื ืงื•ื“ืช ื”ืžื‘ื˜ ืฉืœ ืชื”ืœื™ืš ื”ืœืžื™ื“ื” ืขืฆืžื•. ื›ื“ื™ ืœื‘ื“ื•ืง ืืช ื”ืžืขื‘ื“ื”, ืื ื• ืžืฉืชืžืฉื™ื ื‘ื‘ื•ื“ืงื™ื ืื•ื˜ื•ืžื˜ื™ื™ื: ื”ืžืฉืชืชืฃ ืฆืจื™ืš ืœืขื‘ื•ืจ ืœื—ืฉื‘ื•ืŸ ื”ืื™ืฉื™ ืฉืœื•, ืœืœื—ื•ืฅ ืขืœ ื›ืคืชื•ืจ "ื‘ื“ื•ืง", ื•ืœืื—ืจ ื–ืžืŸ ืžื” ื”ื•ื ืจื•ืื” ืกื•ื’ ืฉืœ ืžืฉื•ื‘ ืžื•ืจื—ื‘ ืขืœ ืžื” ืฉืขืฉื”. ื•ื‘ืจื’ืข ื–ื” ืื ื• ืžืชื—ื™ืœื™ื ืœื”ืชืงืจื‘ ืœื‘ืขื™ื” ืฉืœื ื•.

ื”ืื™ืžื•ืช ืฉืœ ืžืขื‘ื“ื” ื–ื• ื‘ื ื•ื™ ื›ืš: ืื ื• ืฉื•ืœื—ื™ื ื—ื‘ื™ืœืช ื ืชื•ื ื™ ื‘ืงืจื” ืœืงืคืงื ืฉืœ ื”ืžืฉืชืชืฃ, ื•ืื– Gobblin ืžืขื‘ื™ืจ ืืช ื—ื‘ื™ืœืช ื”ื ืชื•ื ื™ื ื”ื–ื• ืœ-HDFS, ื•ืื– Airflow ืœื•ืงื— ืืช ื—ื‘ื™ืœืช ื”ื ืชื•ื ื™ื ื”ื–ื• ื•ืžื›ื ื™ืก ืื•ืชื” ืœ-ClickHouse. ื”ื—ื•ื›ืžื” ื”ื™ื ืฉ-Airflow ืœื ื—ื™ื™ื‘ืช ืœืขืฉื•ืช ื–ืืช ื‘ื–ืžืŸ ืืžืช, ื”ื™ื ืขื•ืฉื” ื–ืืช ืœืคื™ ืœื•ื— ื–ืžื ื™ื: ื›ืœ 15 ื“ืงื•ืช ื”ื™ื ืœื•ืงื—ืช ื—ื‘ื•ืจื” ืฉืœ ืงื‘ืฆื™ื ื•ืžืขืœื” ืื•ืชื.

ืžืกืชื‘ืจ ืฉืื ื—ื ื• ืฆืจื™ื›ื™ื ืื™ื›ืฉื”ื• ืœื”ืคืขื™ืœ ืืช ื”-DAG ืฉืœื”ื ื‘ืขืฆืžื ื• ืœื‘ืงืฉืชื ื• ื‘ื–ืžืŸ ืฉื”ื‘ื•ื“ืง ืคื•ืขืœ ื›ืืŸ ื•ืขื›ืฉื™ื•. ืœืื—ืจ ื—ื™ืคื•ืฉ ื‘ื’ื•ื’ืœ, ื’ื™ืœื™ื ื• ืฉืขื‘ื•ืจ ื’ืจืกืื•ืช ืžืื•ื—ืจื•ืช ื™ื•ืชืจ ืฉืœ Airflow ื™ืฉ ืžื” ืฉื ืงืจื API ื ื™ืกื™ื•ื ื™. ืžื™ืœื” experimental, ื›ืžื•ื‘ืŸ, ื–ื” ื ืฉืžืข ืžืคื—ื™ื“, ืื‘ืœ ืžื” ืœืขืฉื•ืช... ืคืชืื•ื ื–ื” ืžืžืจื™ื.

ืœืื—ืจ ืžื›ืŸ, ื ืชืืจ ืืช ื›ืœ ื”ื ืชื™ื‘: ืžื”ืชืงื ืช Airflow ื•ืขื“ ืœื™ืฆื™ืจืช ื‘ืงืฉืช POST ืฉืžืคืขื™ืœื” ืืช ื”-DAG ื‘ืืžืฆืขื•ืช ื”-API ื”ื ื™ืกื™ื•ื ื™. ืื ื• ื ืขื‘ื•ื“ ืขื ืื•ื‘ื•ื ื˜ื• 16.04.

1. ื”ืชืงื ืช ื–ืจื™ืžืช ืื•ื•ื™ืจ

ื‘ื•ื ื ื‘ื“ื•ืง ืฉื™ืฉ ืœื ื• Python 3 ื•-virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

ืื ืžืฉื”ื• ืžื–ื” ื—ืกืจ, ืื– ื”ืชืงืŸ ืื•ืชื•.

ื›ืขืช ื ื™ืฆื•ืจ ืกืคืจื™ื™ื” ื‘ื” ื ืžืฉื™ืš ืœืขื‘ื•ื“ ืขื Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

ื”ืชืงืŸ ื–ืจื™ืžืช ืื•ื•ื™ืจ:

(venv) $ pip install airflow

ื”ื’ืจืกื” ืขืœื™ื” ืขื‘ื“ื ื•: 1.10.

ืขื›ืฉื™ื• ืื ื—ื ื• ืฆืจื™ื›ื™ื ืœื™ืฆื•ืจ ืกืคืจื™ื™ื” airflow_home, ื”ื™ื›ืŸ ื™ืžื•ืงืžื• ืงื‘ืฆื™ DAG ื•ืชื•ืกืคื™ Airflow. ืœืื—ืจ ื™ืฆื™ืจืช ื”ืกืคืจื™ื™ื”, ื”ื’ื“ืจ ืืช ืžืฉืชื ื” ื”ืกื‘ื™ื‘ื” AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

ื”ืฉืœื‘ ื”ื‘ื ื”ื•ื ืœื”ืคืขื™ืœ ืคืงื•ื“ื” ืฉืชื™ืฆื•ืจ ื•ืชืืชื—ืœ ืžืกื“ ื ืชื•ื ื™ื ืฉืœ ื–ืจื™ืžืช ื ืชื•ื ื™ื ื‘- SQLite:

(venv) $ airflow initdb

ืžืกื“ ื”ื ืชื•ื ื™ื ื™ื™ื•ื•ืฆืจ ื‘ airflow.db ื›ื‘ืจื™ืจืช ืžื—ื“ืœ.

ื‘ื•ื ื ื‘ื“ื•ืง ืื Airflow ืžื•ืชืงืŸ:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

ืื ื”ืคืงื•ื“ื” ืขื‘ื“ื”, ืื– Airflow ื™ืฆืจื” ืงื•ื‘ืฅ ืชืฆื•ืจื” ืžืฉืœื” airflow.cfg ะฒ AIRFLOW_HOME:

$ tree
.
โ”œโ”€โ”€ airflow.cfg
โ””โ”€โ”€ unittests.cfg

ืœื–ืจื™ืžืช ื”ืื•ื•ื™ืจ ื™ืฉ ืžืžืฉืง ืื™ื ื˜ืจื ื˜. ื ื™ืชืŸ ืœื”ืคืขื™ืœ ืื•ืชื• ืขืœ ื™ื“ื™ ื”ืคืขืœืช ื”ืคืงื•ื“ื”:

(venv) $ airflow webserver --port 8081

ื›ืขืช ืชื•ื›ืœ ืœื”ืงื™ืฉ ืขืœ ืžืžืฉืง ื”ืื™ื ื˜ืจื ื˜ ื‘ื“ืคื“ืคืŸ ื‘ื™ืฆื™ืื” 8081 ื‘ืžืืจื— ืฉื‘ื• ืคืขืœ Airflow, ืœื“ื•ื’ืžื”: <hostname:8081>.

2. ืขื‘ื•ื“ื” ืขื API ื ื™ืกื™ื•ื ื™

ื‘ืฉืœื‘ ื–ื”, ื–ืจื™ืžืช ื”ืื•ื•ื™ืจ ืžื•ื’ื“ืจืช ื•ืžื•ื›ื ื” ืœืคืขื•ืœื”. ืขื ื–ืืช, ืขืœื™ื ื• ืœื”ืคืขื™ืœ ื’ื ืืช ื”-API ื”ื ื™ืกื™ื•ื ื™. ื”ื“ืžืงื” ืฉืœื ื• ื›ืชื•ื‘ื” ื‘-Python, ื›ืš ืฉื‘ื”ืžืฉืš ื›ืœ ื”ื‘ืงืฉื•ืช ื™ื”ื™ื• ื‘ื• ื‘ืืžืฆืขื•ืช ื”ืกืคืจื™ื™ื” requests.

ืœืžืขืฉื”, ื”-API ื›ื‘ืจ ืขื•ื‘ื“ ืขื‘ื•ืจ ื‘ืงืฉื•ืช ืคืฉื•ื˜ื•ืช. ืœื“ื•ื’ืžื”, ื‘ืงืฉื” ื–ื• ืžืืคืฉืจืช ืœืš ืœื‘ื“ื•ืง ืืช ืคืขื•ืœืชื”:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #ะฒ ะฝะฐัˆะตะผ ัะปัƒั‡ะฐะต ั‚ะฐะบะพะน, ะฐ ะฟะพ ะดะตั„ะพะปั‚ัƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

ืื ืืชื” ืžืงื‘ืœ ื”ื•ื“ืขื” ื›ื–ื• ื‘ืชื’ื•ื‘ื”, ื–ื” ืื•ืžืจ ืฉื”ื›ืœ ืขื•ื‘ื“.

ืขื ื–ืืช, ื›ืืฉืจ ืื ื• ืจื•ืฆื™ื ืœื”ืคืขื™ืœ DAG, ืื ื• ืขื•ืžื“ื™ื ื‘ืคื ื™ ื”ืขื•ื‘ื“ื” ืฉืœื ื ื™ืชืŸ ืœื”ื’ื™ืฉ ื‘ืงืฉื” ืžืกื•ื’ ื–ื” ืœืœื ืื™ืžื•ืช.

ืœืฉื ื›ืš, ืชืฆื˜ืจืš ืœื‘ืฆืข ืžืกืคืจ ืฉืœื‘ื™ื ื ื•ืกืคื™ื.

ืจืืฉื™ืช, ืขืœื™ืš ืœื”ื•ืกื™ืฃ ืืช ื–ื” ืœืชืฆื•ืจื”:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

ืœืื—ืจ ืžื›ืŸ, ืขืœื™ืš ืœื™ืฆื•ืจ ืืช ื”ืžืฉืชืžืฉ ืฉืœืš ืขื ื–ื›ื•ื™ื•ืช ืื“ืžื™ืŸ:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ืœืื—ืจ ืžื›ืŸ, ืขืœื™ืš ืœื™ืฆื•ืจ ืžืฉืชืžืฉ ืขื ื–ื›ื•ื™ื•ืช ืจื’ื™ืœื•ืช ืฉื™ื•ืจืฉื” ืœื”ืคืขื™ืœ ืืช ื”-DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ืขื›ืฉื™ื• ื”ื›ืœ ืžื•ื›ืŸ.

3. ื”ืคืขืœ ื‘ืงืฉืช POST

ื‘ืงืฉืช ื”-POST ืขืฆืžื” ืชื™ืจืื” ื›ืš:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

ื”ื‘ืงืฉื” ื˜ื•ืคืœื” ื‘ื”ืฆืœื—ื”.

ื‘ื”ืชืื ืœื›ืš, ืื ื• ื ื•ืชื ื™ื ืœ-DAG ืงืฆืช ื–ืžืŸ ืœืขื‘ื“ ื•ืœื”ื’ื™ืฉ ื‘ืงืฉื” ืœื˜ื‘ืœืช ClickHouse, ื‘ื ื™ืกื™ื•ืŸ ืœืชืคื•ืก ืืช ื—ื‘ื™ืœืช ื ืชื•ื ื™ ื”ื‘ืงืจื”.

ื”ื‘ื“ื™ืงื” ื”ื•ืฉืœืžื”.

ืžืงื•ืจ: www.habr.com

ื”ื•ืกืคืช ืชื’ื•ื‘ื”