๊ต์ก ํ๋ก๊ทธ๋จ์ ์ค๋นํ ๋ ์ผ๋ถ ๋๊ตฌ ์์ ๊ณผ ๊ด๋ จํ์ฌ ์ฃผ๊ธฐ์ ์ผ๋ก ์ด๋ ค์์ ๊ฒช์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ฐ๋ฆฌ๊ฐ ๊ทธ๊ฒ๋ค์ ๋ง๋๋ ์๊ฐ, ์ด ๋ฌธ์ ์ ๋์ฒํ๋ ๋ฐ ๋์์ด ๋ ์ถฉ๋ถํ ๋ฌธ์์ ๊ธฐ์ฌ๊ฐ ํญ์ ์๋ ๊ฒ์ ์๋๋๋ค.
์๋ฅผ ๋ค์ด 2015๋
์๋ Big Data Specialist ํ๋ก๊ทธ๋จ์์ 35๋ช
์ ๋์ ์ฌ์ฉ์๋ฅผ ์ํด Spark์ ํจ๊ป Hadoop ํด๋ฌ์คํฐ๋ฅผ ์ฌ์ฉํ์ต๋๋ค. YARN์ ์ฌ์ฉํ์ฌ ์ด๋ฌํ ์ฌ์ฉ์ ์ฌ๋ก๋ฅผ ์ค๋นํ๋ ๋ฐฉ๋ฒ์ด ๋ช
ํํ์ง ์์์ต๋๋ค. ๊ทธ ๊ฒฐ๊ณผ ๊ทธ๋ค์ ์ค์ค๋ก ๊ธธ์ ์ฐพ์ ๊ฑธ์ด๊ฐ๊ณ
์ ์ฌ ์๋
์ด๋ฒ์๋ ๋ค๋ฅธ ํ๋ก๊ทธ๋จ์ ๋ํด ์ด์ผ๊ธฐํ๊ฒ ์ต๋๋ค.
๋ชจ๋ ๊ฒ์ด ์ผ๋ฐ์ ์ผ๋ก ์ข์ต๋๋ค. ๊ทธ๋ค์ด ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ๊ฒ ํ์ญ์์ค. ๊ทธ๋ฌ๋ "๊ทธ๋ฌ๋"๊ฐ ์์ต๋๋ค. ์ฐ๋ฆฌ์ ๋ชจ๋ ํ๋ก๊ทธ๋จ์ ํ์ต ๊ณผ์ ์์ฒด์์ ๊ธฐ์ ์ ์ผ๋ก ์ง๋ณดํฉ๋๋ค. ์คํ์ค์ ํ์ธํ๊ธฐ ์ํด ์๋ ๊ฒ์ฌ๊ธฐ๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ฐธ๊ฐ์๋ ๊ฐ์ธ ๊ณ์ ์ผ๋ก ์ด๋ํ์ฌ "ํ์ธ" ๋ฒํผ์ ํด๋ฆญํ๊ณ ์ ์ ํ ์์ ์ด ์ํํ ์์ ์ ๋ํ ํ์ฅ๋ ํผ๋๋ฐฑ์ ํ์ธํด์ผ ํฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ด ์์ ์์ ์ฐ๋ฆฌ๋ ๋ฌธ์ ์ ์ ๊ทผํ๊ธฐ ์์ํฉ๋๋ค.
์ด ๋ฉ์ ํ์ธํ๋ ๊ฒ์ ๋ค์๊ณผ ๊ฐ์ด ๊ตฌ์ฑ๋ฉ๋๋ค. ์ ์ด ๋ฐ์ดํฐ ํจํท์ ์ฐธ๊ฐ์์ Kafka๋ก ๋ณด๋ธ ๋ค์ Gobblin์ด ์ด ๋ฐ์ดํฐ ํจํท์ HDFS๋ก ์ ์กํ ๋ค์ Airflow๊ฐ ์ด ๋ฐ์ดํฐ ํจํท์ ๊ฐ์ ธ์ ClickHouse์ ๋ฃ์ต๋๋ค. ์๋ น์ Airflow๊ฐ ์ด ์์ ์ ์ค์๊ฐ์ผ๋ก ์ํํ ํ์๊ฐ ์๊ณ ์ผ์ ์ ๋ฐ๋ผ ์ํํ๋ค๋ ๊ฒ์ ๋๋ค. 15๋ถ์ ํ ๋ฒ์ฉ ๋ง์ ํ์ผ์ ๊ฐ์ ธ์ ์ ๋ก๋ํฉ๋๋ค.
์ฒด์ปค๊ฐ ์ง๊ธ ์ฌ๊ธฐ์์ ์คํ๋๋ ๋์ ์ฐ๋ฆฌ์ ์์ฒญ์ ๋ฐ๋ผ ์ด๋ป๊ฒ๋ DAG๋ฅผ ์์ฒด์ ์ผ๋ก ํธ๋ฆฌ๊ฑฐํด์ผ ํ๋ค๋ ๊ฒ์ด ๋ฐํ์ก์ต๋๋ค. ์ธํฐ๋ท ๊ฒ์์ ํตํด ์ต์ ๋ฒ์ ์ Airflow์๋ ์์ experimental
, ๋ฌผ๋ก ๋ฌด์ญ๊ฒ ๋ค๋ฆฌ์ง๋ง ์ด๋กํด ... ๊ฐ์๊ธฐ ์ด๋ฅํฉ๋๋ค.
๋ค์์ผ๋ก Airflow ์ค์น๋ถํฐ Experimental API๋ฅผ ์ฌ์ฉํ์ฌ DAG๋ฅผ ํธ๋ฆฌ๊ฑฐํ๋ POST ์์ฒญ ์์ฑ๊น์ง์ ์ ์ฒด ๊ฒฝ๋ก๋ฅผ ์ค๋ช ํฉ๋๋ค. ์ฐ๋ฆฌ๋ ์ฐ๋ถํฌ 16.04๋ก ์์ ํ ๊ฒ์ ๋๋ค.
1. ๊ธฐ๋ฅ ์ค์น
Python 3๊ณผ virtualenv๊ฐ ์๋์ง ํ์ธํฉ์๋ค.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
์ด ์ค ํ๋๊ฐ ์์ผ๋ฉด ์ค์นํ์ญ์์ค.
์ด์ Airflow๋ก ๊ณ์ ์์ ํ ๋๋ ํฐ๋ฆฌ๋ฅผ ๋ง๋ค์ด ๋ณด๊ฒ ์ต๋๋ค.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
๊ธฐ๋ฅ ์ค์น:
(venv) $ pip install airflow
์ฐ๋ฆฌ๊ฐ ์์ ํ ๋ฒ์ : 1.10.
์ด์ ๋๋ ํ ๋ฆฌ๋ฅผ ์์ฑํด์ผ ํฉ๋๋ค. airflow_home
, DAG ํ์ผ ๋ฐ Airflow ํ๋ฌ๊ทธ์ธ์ด ์์นํฉ๋๋ค. ๋๋ ํ ๋ฆฌ ์์ฑ ํ ํ๊ฒฝ๋ณ์ ์ค์ AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
๋ค์ ๋จ๊ณ๋ SQLite์์ ๋ฐ์ดํฐ ํ๋ฆ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ๋ง๋ค๊ณ ์ด๊ธฐํํ๋ ๋ช ๋ น์ ์คํํ๋ ๊ฒ์ ๋๋ค.
(venv) $ airflow initdb
๋ฐ์ดํฐ๋ฒ ์ด์ค๋ ๋ค์ ์์น์ ์์ฑ๋ฉ๋๋ค. airflow.db
๊ธฐ๋ณธ.
Airflow๊ฐ ์ค์น๋์ด ์๋์ง ํ์ธํ์ญ์์ค.
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
๋ช
๋ น์ด ์๋ํ๋ฉด Airflow๋ ์์ฒด ๊ตฌ์ฑ ํ์ผ์ ์์ฑํ์ต๋๋ค. airflow.cfg
ะฒ AIRFLOW_HOME
:
$ tree
.
โโโ airflow.cfg
โโโ unittests.cfg
Airflow์๋ ์น ์ธํฐํ์ด์ค๊ฐ ์์ต๋๋ค. ๋ค์ ๋ช ๋ น์ ์คํํ์ฌ ์์ํ ์ ์์ต๋๋ค.
(venv) $ airflow webserver --port 8081
์ด์ ๋ค์๊ณผ ๊ฐ์ด Airflow๊ฐ ์คํ ์ค์ธ ํธ์คํธ์ ํฌํธ 8081์ ์๋ ๋ธ๋ผ์ฐ์ ์์ ์น ์ธํฐํ์ด์ค์ ์ก์ธ์คํ ์ ์์ต๋๋ค. <hostname:8081>
.
2. ์คํ์ API ์์
์ด Airflow์์ ๊ตฌ์ฑ๋์ด ์ฌ์ฉํ ์ค๋น๊ฐ ๋์์ต๋๋ค. ๊ทธ๋ฌ๋ Experimental API๋ ์คํํด์ผ ํฉ๋๋ค. ์ฐ๋ฆฌ ์ฒด์ปค๋ Python์ผ๋ก ์์ฑ๋์์ผ๋ฏ๋ก ๋ชจ๋ ์์ฒญ์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ฌ ์ํ๋ฉ๋๋ค. requests
.
์ค์ ๋ก API๋ ์ด๋ฏธ ๊ฐ๋จํ ์์ฒญ์ ๋ํด ์๋ํ๊ณ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, ์ด๋ฌํ ์์ฒญ์ ํตํด ์์ ์ ํ ์คํธํ ์ ์์ต๋๋ค.
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #ะฒ ะฝะฐัะตะผ ัะปััะฐะต ัะฐะบะพะน, ะฐ ะฟะพ ะดะตัะพะปัั 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
์๋ต์ผ๋ก ์ด๋ฌํ ๋ฉ์์ง๋ฅผ ๋ฐ์๋ค๋ฉด ๋ชจ๋ ๊ฒ์ด ์๋ํ๊ณ ์์์ ์๋ฏธํฉ๋๋ค.
๊ทธ๋ฌ๋ DAG๋ฅผ ํธ๋ฆฌ๊ฑฐํ๋ ค๊ณ ํ ๋ ์ธ์ฆ ์์ด๋ ์ด๋ฌํ ์ข ๋ฅ์ ์์ฒญ์ ํ ์ ์๋ค๋ ์ฌ์ค์ ์ง๋ฉดํ๊ฒ ๋ฉ๋๋ค.
์ด๋ ๊ฒ ํ๋ ค๋ฉด ์ฌ๋ฌ ์์ ์ ์ํํด์ผ ํฉ๋๋ค.
๋จผ์ ๊ตฌ์ฑ์ ๋ค์์ ์ถ๊ฐํด์ผ ํฉ๋๋ค.
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
๊ทธ๋ฐ ๋ค์ ๊ด๋ฆฌ์ ๊ถํ์ด ์๋ ์ฌ์ฉ์๋ฅผ ์์ฑํด์ผ ํฉ๋๋ค.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
๋ค์์ผ๋ก DAG ํธ๋ฆฌ๊ฑฐ๋ฅผ ๋ง๋ค ์ ์๋ ์ผ๋ฐ ๊ถํ์ ๊ฐ์ง ์ฌ์ฉ์๋ฅผ ๋ง๋ค์ด์ผ ํฉ๋๋ค.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
์ด์ ๋ชจ๋ ๊ฒ์ด ์ค๋น๋์์ต๋๋ค.
3. POST ์์ฒญ ์คํ
POST ์์ฒญ ์์ฒด๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
์์ฒญ์ด ์ฑ๊ณต์ ์ผ๋ก ์ฒ๋ฆฌ๋์์ต๋๋ค.
๋ฐ๋ผ์ ์ฐ๋ฆฌ๋ DAG๊ฐ ClickHouse ํ ์ด๋ธ์ ์ฒ๋ฆฌํ๊ณ ์์ฒญํ์ฌ ์ ์ด ๋ฐ์ดํฐ ํจํท์ ํฌ์ฐฉํ๋ ค๊ณ ์๋ํ ์๊ฐ์ ์ค๋๋ค.
ํ์ธ์ด ์๋ฃ๋์์ต๋๋ค.
์ถ์ฒ : habr.com