Experimental API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Airflow์—์„œ DAG ํŠธ๋ฆฌ๊ฑฐ๋ฅผ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•

๊ต์œก ํ”„๋กœ๊ทธ๋žจ์„ ์ค€๋น„ํ•  ๋•Œ ์ผ๋ถ€ ๋„๊ตฌ ์ž‘์—…๊ณผ ๊ด€๋ จํ•˜์—ฌ ์ฃผ๊ธฐ์ ์œผ๋กœ ์–ด๋ ค์›€์„ ๊ฒช์Šต๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์šฐ๋ฆฌ๊ฐ€ ๊ทธ๊ฒƒ๋“ค์„ ๋งŒ๋‚˜๋Š” ์ˆœ๊ฐ„, ์ด ๋ฌธ์ œ์— ๋Œ€์ฒ˜ํ•˜๋Š” ๋ฐ ๋„์›€์ด ๋  ์ถฉ๋ถ„ํ•œ ๋ฌธ์„œ์™€ ๊ธฐ์‚ฌ๊ฐ€ ํ•ญ์ƒ ์žˆ๋Š” ๊ฒƒ์€ ์•„๋‹™๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด 2015๋…„์—๋Š” Big Data Specialist ํ”„๋กœ๊ทธ๋žจ์—์„œ 35๋ช…์˜ ๋™์‹œ ์‚ฌ์šฉ์ž๋ฅผ ์œ„ํ•ด Spark์™€ ํ•จ๊ป˜ Hadoop ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์‚ฌ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค. YARN์„ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋Ÿฌํ•œ ์‚ฌ์šฉ์ž ์‚ฌ๋ก€๋ฅผ ์ค€๋น„ํ•˜๋Š” ๋ฐฉ๋ฒ•์ด ๋ช…ํ™•ํ•˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ๊ทธ ๊ฒฐ๊ณผ ๊ทธ๋“ค์€ ์Šค์Šค๋กœ ๊ธธ์„ ์ฐพ์•„ ๊ฑธ์–ด๊ฐ”๊ณ  ํ•˜๋ธŒ๋ ˆ์— ๊ฒŒ์‹œ ๋˜ํ•œ ์ˆ˜ํ–‰ ๋ชจ์Šคํฌ๋ฐ” ์ŠคํŒŒํฌ ๋ฐ‹์—….

์„ ์‚ฌ ์‹œ๋Œ€

์ด๋ฒˆ์—๋Š” ๋‹ค๋ฅธ ํ”„๋กœ๊ทธ๋žจ์— ๋Œ€ํ•ด ์ด์•ผ๊ธฐํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด. ๊ทธ ์œ„์— ์ฐธ๊ฐ€์ž๋“ค์€ ๋žŒ๋‹ค์™€ ์นดํŒŒ๋ผ๋Š” ๋‘ ๊ฐ€์ง€ ์œ ํ˜•์˜ ์•„ํ‚คํ…์ฒ˜๋ฅผ ๊ตฌ์ถ•ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  lamdba ์•„ํ‚คํ…์ฒ˜์—์„œ Airflow๋Š” ์ผ๊ด„ ์ฒ˜๋ฆฌ์˜ ์ผ๋ถ€๋กœ ์‚ฌ์šฉ๋˜์–ด HDFS์—์„œ ClickHouse๋กœ ๋กœ๊ทธ๋ฅผ ์ „์†กํ•ฉ๋‹ˆ๋‹ค.

๋ชจ๋“  ๊ฒƒ์ด ์ผ๋ฐ˜์ ์œผ๋กœ ์ข‹์Šต๋‹ˆ๋‹ค. ๊ทธ๋“ค์ด ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์ถ•ํ•˜๊ฒŒ ํ•˜์‹ญ์‹œ์˜ค. ๊ทธ๋Ÿฌ๋‚˜ "๊ทธ๋Ÿฌ๋‚˜"๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์šฐ๋ฆฌ์˜ ๋ชจ๋“  ํ”„๋กœ๊ทธ๋žจ์€ ํ•™์Šต ๊ณผ์ • ์ž์ฒด์—์„œ ๊ธฐ์ˆ ์ ์œผ๋กœ ์ง„๋ณดํ•ฉ๋‹ˆ๋‹ค. ์‹คํ—˜์‹ค์„ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด ์ž๋™ ๊ฒ€์‚ฌ๊ธฐ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ฐธ๊ฐ€์ž๋Š” ๊ฐœ์ธ ๊ณ„์ •์œผ๋กœ ์ด๋™ํ•˜์—ฌ "ํ™•์ธ" ๋ฒ„ํŠผ์„ ํด๋ฆญํ•˜๊ณ  ์ž ์‹œ ํ›„ ์ž์‹ ์ด ์ˆ˜ํ–‰ํ•œ ์ž‘์—…์— ๋Œ€ํ•œ ํ™•์žฅ๋œ ํ”ผ๋“œ๋ฐฑ์„ ํ™•์ธํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์ด ์‹œ์ ์—์„œ ์šฐ๋ฆฌ๋Š” ๋ฌธ์ œ์— ์ ‘๊ทผํ•˜๊ธฐ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.

์ด ๋žฉ์„ ํ™•์ธํ•˜๋Š” ๊ฒƒ์€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค. ์ œ์–ด ๋ฐ์ดํ„ฐ ํŒจํ‚ท์„ ์ฐธ๊ฐ€์ž์˜ Kafka๋กœ ๋ณด๋‚ธ ๋‹ค์Œ Gobblin์ด ์ด ๋ฐ์ดํ„ฐ ํŒจํ‚ท์„ HDFS๋กœ ์ „์†กํ•œ ๋‹ค์Œ Airflow๊ฐ€ ์ด ๋ฐ์ดํ„ฐ ํŒจํ‚ท์„ ๊ฐ€์ ธ์™€ ClickHouse์— ๋„ฃ์Šต๋‹ˆ๋‹ค. ์š”๋ น์€ Airflow๊ฐ€ ์ด ์ž‘์—…์„ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ˆ˜ํ–‰ํ•  ํ•„์š”๊ฐ€ ์—†๊ณ  ์ผ์ •์— ๋”ฐ๋ผ ์ˆ˜ํ–‰ํ•œ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. 15๋ถ„์— ํ•œ ๋ฒˆ์”ฉ ๋งŽ์€ ํŒŒ์ผ์„ ๊ฐ€์ ธ์™€ ์—…๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค.

์ฒด์ปค๊ฐ€ ์ง€๊ธˆ ์—ฌ๊ธฐ์—์„œ ์‹คํ–‰๋˜๋Š” ๋™์•ˆ ์šฐ๋ฆฌ์˜ ์š”์ฒญ์— ๋”ฐ๋ผ ์–ด๋–ป๊ฒŒ๋“  DAG๋ฅผ ์ž์ฒด์ ์œผ๋กœ ํŠธ๋ฆฌ๊ฑฐํ•ด์•ผ ํ•œ๋‹ค๋Š” ๊ฒƒ์ด ๋ฐํ˜€์กŒ์Šต๋‹ˆ๋‹ค. ์ธํ„ฐ๋„ท ๊ฒ€์ƒ‰์„ ํ†ตํ•ด ์ตœ์‹  ๋ฒ„์ „์˜ Airflow์—๋Š” ์†Œ์œ„ ์‹คํ—˜์  API. ์›Œ๋“œ experimental, ๋ฌผ๋ก  ๋ฌด์„ญ๊ฒŒ ๋“ค๋ฆฌ์ง€๋งŒ ์–ด๋–กํ•ด ... ๊ฐ‘์ž๊ธฐ ์ด๋ฅ™ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ์œผ๋กœ Airflow ์„ค์น˜๋ถ€ํ„ฐ Experimental API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ DAG๋ฅผ ํŠธ๋ฆฌ๊ฑฐํ•˜๋Š” POST ์š”์ฒญ ์ƒ์„ฑ๊นŒ์ง€์˜ ์ „์ฒด ๊ฒฝ๋กœ๋ฅผ ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค. ์šฐ๋ฆฌ๋Š” ์šฐ๋ถ„ํˆฌ 16.04๋กœ ์ž‘์—…ํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

1. ๊ธฐ๋ฅ˜ ์„ค์น˜

Python 3๊ณผ virtualenv๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•ฉ์‹œ๋‹ค.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

์ด ์ค‘ ํ•˜๋‚˜๊ฐ€ ์—†์œผ๋ฉด ์„ค์น˜ํ•˜์‹ญ์‹œ์˜ค.

์ด์ œ Airflow๋กœ ๊ณ„์† ์ž‘์—…ํ•  ๋””๋ ‰ํ„ฐ๋ฆฌ๋ฅผ ๋งŒ๋“ค์–ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

๊ธฐ๋ฅ˜ ์„ค์น˜:

(venv) $ pip install airflow

์šฐ๋ฆฌ๊ฐ€ ์ž‘์—…ํ•œ ๋ฒ„์ „: 1.10.

์ด์ œ ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์ƒ์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. airflow_home, DAG ํŒŒ์ผ ๋ฐ Airflow ํ”Œ๋Ÿฌ๊ทธ์ธ์ด ์œ„์น˜ํ•ฉ๋‹ˆ๋‹ค. ๋””๋ ‰ํ† ๋ฆฌ ์ƒ์„ฑ ํ›„ ํ™˜๊ฒฝ๋ณ€์ˆ˜ ์„ค์ • AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

๋‹ค์Œ ๋‹จ๊ณ„๋Š” SQLite์—์„œ ๋ฐ์ดํ„ฐ ํ๋ฆ„ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ๋งŒ๋“ค๊ณ  ์ดˆ๊ธฐํ™”ํ•˜๋Š” ๋ช…๋ น์„ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

(venv) $ airflow initdb

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋Š” ๋‹ค์Œ ์œ„์น˜์— ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค. airflow.db ๊ธฐ๋ณธ.

Airflow๊ฐ€ ์„ค์น˜๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธํ•˜์‹ญ์‹œ์˜ค.

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

๋ช…๋ น์ด ์ž‘๋™ํ•˜๋ฉด Airflow๋Š” ์ž์ฒด ๊ตฌ์„ฑ ํŒŒ์ผ์„ ์ƒ์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค. airflow.cfg ะฒ AIRFLOW_HOME:

$ tree
.
โ”œโ”€โ”€ airflow.cfg
โ””โ”€โ”€ unittests.cfg

Airflow์—๋Š” ์›น ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์Œ ๋ช…๋ น์„ ์‹คํ–‰ํ•˜์—ฌ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

(venv) $ airflow webserver --port 8081

์ด์ œ ๋‹ค์Œ๊ณผ ๊ฐ™์ด Airflow๊ฐ€ ์‹คํ–‰ ์ค‘์ธ ํ˜ธ์ŠคํŠธ์˜ ํฌํŠธ 8081์— ์žˆ๋Š” ๋ธŒ๋ผ์šฐ์ €์—์„œ ์›น ์ธํ„ฐํŽ˜์ด์Šค์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. <hostname:8081>.

2. ์‹คํ—˜์  API ์ž‘์—…

์ด Airflow์—์„œ ๊ตฌ์„ฑ๋˜์–ด ์‚ฌ์šฉํ•  ์ค€๋น„๊ฐ€ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ Experimental API๋„ ์‹คํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์šฐ๋ฆฌ ์ฒด์ปค๋Š” Python์œผ๋กœ ์ž‘์„ฑ๋˜์—ˆ์œผ๋ฏ€๋กœ ๋ชจ๋“  ์š”์ฒญ์€ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ˆ˜ํ–‰๋ฉ๋‹ˆ๋‹ค. requests.

์‹ค์ œ๋กœ API๋Š” ์ด๋ฏธ ๊ฐ„๋‹จํ•œ ์š”์ฒญ์— ๋Œ€ํ•ด ์ž‘๋™ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์ด๋Ÿฌํ•œ ์š”์ฒญ์„ ํ†ตํ•ด ์ž‘์—…์„ ํ…Œ์ŠคํŠธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #ะฒ ะฝะฐัˆะตะผ ัะปัƒั‡ะฐะต ั‚ะฐะบะพะน, ะฐ ะฟะพ ะดะตั„ะพะปั‚ัƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

์‘๋‹ต์œผ๋กœ ์ด๋Ÿฌํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•˜๋‹ค๋ฉด ๋ชจ๋“  ๊ฒƒ์ด ์ž‘๋™ํ•˜๊ณ  ์žˆ์Œ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค.

๊ทธ๋Ÿฌ๋‚˜ DAG๋ฅผ ํŠธ๋ฆฌ๊ฑฐํ•˜๋ ค๊ณ  ํ•  ๋•Œ ์ธ์ฆ ์—†์ด๋Š” ์ด๋Ÿฌํ•œ ์ข…๋ฅ˜์˜ ์š”์ฒญ์„ ํ•  ์ˆ˜ ์—†๋‹ค๋Š” ์‚ฌ์‹ค์— ์ง๋ฉดํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

์ด๋ ‡๊ฒŒ ํ•˜๋ ค๋ฉด ์—ฌ๋Ÿฌ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๋จผ์ € ๊ตฌ์„ฑ์— ๋‹ค์Œ์„ ์ถ”๊ฐ€ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

๊ทธ๋Ÿฐ ๋‹ค์Œ ๊ด€๋ฆฌ์ž ๊ถŒํ•œ์ด ์žˆ๋Š” ์‚ฌ์šฉ์ž๋ฅผ ์ƒ์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

๋‹ค์Œ์œผ๋กœ DAG ํŠธ๋ฆฌ๊ฑฐ๋ฅผ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋Š” ์ผ๋ฐ˜ ๊ถŒํ•œ์„ ๊ฐ€์ง„ ์‚ฌ์šฉ์ž๋ฅผ ๋งŒ๋“ค์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

์ด์ œ ๋ชจ๋“  ๊ฒƒ์ด ์ค€๋น„๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

3. POST ์š”์ฒญ ์‹คํ–‰

POST ์š”์ฒญ ์ž์ฒด๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

์š”์ฒญ์ด ์„ฑ๊ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

๋”ฐ๋ผ์„œ ์šฐ๋ฆฌ๋Š” DAG๊ฐ€ ClickHouse ํ…Œ์ด๋ธ”์„ ์ฒ˜๋ฆฌํ•˜๊ณ  ์š”์ฒญํ•˜์—ฌ ์ œ์–ด ๋ฐ์ดํ„ฐ ํŒจํ‚ท์„ ํฌ์ฐฉํ•˜๋ ค๊ณ  ์‹œ๋„ํ•  ์‹œ๊ฐ„์„ ์ค๋‹ˆ๋‹ค.

ํ™•์ธ์ด ์™„๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

์ถœ์ฒ˜ : habr.com

์ฝ”๋ฉ˜ํŠธ๋ฅผ ์ถ”๊ฐ€