์๋ ํ์ธ์, ์ ๋ Vezet ๊ทธ๋ฃน ํ์ฌ ๋ถ์ ๋ถ์์ ๋ฐ์ดํฐ ์์ง๋์ด์ธ Dmitry Logvinenko์ ๋๋ค.
ETL ํ๋ก์ธ์ค ๊ฐ๋ฐ์ ์ํ ํ๋ฅญํ ๋๊ตฌ์ธ Apache Airflow์ ๋ํด ๋ง์๋๋ฆฌ๊ฒ ์ต๋๋ค. ๊ทธ๋ฌ๋ Airflow๋ ๋งค์ฐ ๋ค์ฌ๋ค๋ฅํ๊ณ ๋ค๋ฉด์ ์ด๋ฏ๋ก ๋ฐ์ดํฐ ํ๋ฆ์ ๊ด์ฌํ์ง ์๋๋ผ๋ ์ฃผ๊ธฐ์ ์ผ๋ก ํ๋ก์ธ์ค๋ฅผ ์์ํ๊ณ ์คํ์ ๋ชจ๋ํฐ๋งํด์ผ ํ๋ ๊ฒฝ์ฐ์๋ ์์ธํ ์ดํด๋ด์ผ ํฉ๋๋ค.
๊ทธ๋ฆฌ๊ณ ์, ๋งํ ๋ฟ๋ง ์๋๋ผ ๋ณด์ฌ์ค ๊ฒ์ ๋๋ค. ํ๋ก๊ทธ๋จ์๋ ๋ง์ ์ฝ๋, ์คํฌ๋ฆฐ ์ท ๋ฐ ๊ถ์ฅ ์ฌํญ์ด ์์ต๋๋ค.
Google์์ Airflow๋ผ๋ ๋จ์ด๋ฅผ ๊ฒ์ํ๋ฉด ์ผ๋ฐ์ ์ผ๋ก ํ์๋๋ ๋ด์ฉ / Wikimedia Commons
์ฐจ๋ก
์๊ฐ ์ฃผ์ ๋ถ๋ถ, ์ค์ฉ์ ์ธ (๊ทธ๋ฆฌ๊ณ ์ฝ๊ฐ์ ์ด๋ก ์ ์ธ) ํํธ ์ต์ข , ์ฐธ์กฐ ๋ฐ ์ ๋ณด ์ฐธ์กฐ
์๊ฐ
Apache Airflow๋ Django์ ๊ฐ์ต๋๋ค.
- ํ์ด์ฌ์ผ๋ก ์์ฑ
- ํ๋ฅญํ ๊ด๋ฆฌ์ ํจ๋์ด ์์ต๋๋ค.
- ๋ฌดํ ํ์ฅ ๊ฐ๋ฅ
- ๋จ์ง ๋ ๋ซ๊ณ ์์ ํ ๋ค๋ฅธ ๋ชฉ์ ์ ์ํด ๋ง๋ค์ด์ก์ต๋๋ค.
- ๋ฌด์ ํ ์์คํ ์์ ์์ ์คํ ๋ฐ ๋ชจ๋ํฐ๋ง(Celery/Kubernetes ๋ฐ ์์ฌ์ด ํ์ฉํ๋ ๋งํผ)
- ๋งค์ฐ ์ฝ๊ฒ ์์ฑํ๊ณ ์ดํดํ ์ ์๋ Python ์ฝ๋์์ ๋์ ์ํฌํ๋ก ์์ฑ
- ๊ธฐ์ฑํ ๊ตฌ์ฑ ์์์ ์ง์์ ๋ง๋ ํ๋ฌ๊ทธ์ธ์ ๋ชจ๋ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ API๋ฅผ ์๋ก ์ฐ๊ฒฐํ๋ ๊ธฐ๋ฅ(๋งค์ฐ ๊ฐ๋จํจ).
๋ค์๊ณผ ๊ฐ์ด Apache Airflow๋ฅผ ์ฌ์ฉํฉ๋๋ค.
- ์ฐ๋ฆฌ๋ DWH ๋ฐ ODS(Vertica ๋ฐ Clickhouse๊ฐ ์์)์ ๋ค์ํ ์์ค(๋ง์ SQL Server ๋ฐ PostgreSQL ์ธ์คํด์ค, ์ ํ๋ฆฌ์ผ์ด์ ๋ฉํธ๋ฆญ์ด ์๋ ๋ค์ํ API, ์ฌ์ง์ด 1C)์์ ๋ฐ์ดํฐ๋ฅผ ์์งํฉ๋๋ค.
- ์ผ๋ง๋ ๋ฐ์ ํ๋์ง
cron
, ODS์์ ๋ฐ์ดํฐ ํตํฉ โโํ๋ก์ธ์ค๋ฅผ ์์ํ๊ณ ์ ์ง ๊ด๋ฆฌ๋ ๋ชจ๋ํฐ๋งํฉ๋๋ค.
์ต๊ทผ๊น์ง ์ฐ๋ฆฌ์ ์๊ตฌ ์ฌํญ์ 32์ฝ์ด์ 50GB RAM์ด ์๋ ํ๋์ ์์ ์๋ฒ๋ก ์ฒ๋ฆฌ๋์์ต๋๋ค. Airflow์์๋ ๋ค์๊ณผ ๊ฐ์ด ์๋ํฉ๋๋ค.
- ๋ 200 ๊ฐ (์ค์ ๋ก ์ฐ๋ฆฌ๊ฐ ์์ ์ ์ฑ์ฐ๋ ์ํฌํ๋ก),
- ๊ฐ๊ฐ ํ๊ท ์ ์ผ๋ก 70๊ฐ์ ์์ ,
- ์ด ์ ํจ์ ์์๋ฉ๋๋ค (๋ํ ํ๊ท ์ ์ผ๋ก) ํ ์๊ฐ์ ํ ๋ฒ.
๊ทธ๋ฆฌ๊ณ ์ฐ๋ฆฌ๊ฐ ์ด๋ป๊ฒ ํ์ฅํ๋์ง์ ๋ํด์๋ ์๋์ ์ฐ๊ฒ ์ง๋ง ์ด์ ์ฐ๋ฆฌ๊ฐ ํด๊ฒฐํ รผber-problem์ ์ ์ํด ๋ณด๊ฒ ์ต๋๋ค.
๊ฐ๊ฐ 50๊ฐ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค๊ฐ ์๋ XNUMX๊ฐ์ ์์ค SQL Server๊ฐ ์์ต๋๋ค. ๊ฐ๊ฐ ํ๋์ ํ๋ก์ ํธ ์ธ์คํด์ค์ด๋ฉฐ ๊ตฌ์กฐ๊ฐ ๋์ผํฉ๋๋ค(๊ฑฐ์ ๋ชจ๋ ๊ณณ, mua-ha-ha). ์ฆ, ๊ฐ๊ฐ Orders ํ ์ด๋ธ(๋คํํ๋ ํด๋น ํ ์ด๋ธ ์ด๋ฆ์ ๋ชจ๋ ๋น์ฆ๋์ค์ ์ ์ฉํ ์ ์์ต๋๋ค). ์ฐ๋ฆฌ๋ ์๋น์ค ํ๋(์์ค ์๋ฒ, ์์ค ๋ฐ์ดํฐ๋ฒ ์ด์ค, ETL ์์ ID)๋ฅผ ์ถ๊ฐํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ณ ์์งํ๊ฒ Vertica์ ๋ฃ์ต๋๋ค.
๊ฐ์!
์ฃผ์ ๋ถ๋ถ, ์ค์ฉ์ ์ธ (๊ทธ๋ฆฌ๊ณ ์ฝ๊ฐ์ ์ด๋ก ์ ์ธ)
์ ์ฐ๋ฆฌ(๊ทธ๋ฆฌ๊ณ ๋น์ )๋
๋๋ฌด๊ฐ ํฌ๊ณ ๋ด๊ฐ ๋จ์ํ์ ๋ SQL
-schik, ํ ๋ฌ์์ ์๋งค์ ์์ ์ฐ๋ฆฌ๋ ์ฌ์ฉํ ์ ์๋ ๋ ๊ฐ์ง ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ฌ ETL ํ๋ก์ธ์ค, ์ฆ ๋ฐ์ดํฐ ํ๋ฆ์ ์ฌ๊ธฐ์ณค์ต๋๋ค.
- ์ธํฌ๋งคํฐ์นด ํ์ ์ผํฐ - ์์ฒด ํ๋์จ์ด, ์์ฒด ๋ฒ์ ๊ด๋ฆฌ ๊ธฐ๋ฅ์ ๊ฐ์ถ ๊ทน๋๋ก ํ์ฐ๋๊ณ ๋งค์ฐ ์์ฐ์ ์ธ ์์คํ
. ๋๋ ๊ทธ ๋ฅ๋ ฅ์ 1%๋ฅผ ๊ธ์งํ๋ ์ ์ ์ฌ์ฉํ์ต๋๋ค. ์? ๊ธ์์, ์ฐ์ 380 ๋
๋ ์ด๋๊ฐ์์๋์ด ์ธํฐํ์ด์ค๋ ์ ์ ์ ์ผ๋ก ์ฐ๋ฆฌ์๊ฒ ์๋ ฅ์๊ฐํฉ๋๋ค. ๋์งธ, ์ด ์ฅ์น๋ ๋งค์ฐ ๋ฉ์ง ํ๋ก์ธ์ค, ๊ฒฉ๋ ฌํ ๊ตฌ์ฑ ์์ ์ฌ์ฌ์ฉ ๋ฐ ๊ธฐํ ๋งค์ฐ ์ค์ํ ์ํฐํ๋ผ์ด์ฆ ํธ๋ฆญ์ ์ํด ์ค๊ณ๋์์ต๋๋ค. Airbus AXNUMX / year์ ๋ ๊ฐ์ ๊ฐ์ด ๋น์ฉ์ด ๋ ๋ค๋ ์ฌ์ค์ ๋ํด์๋ ์๋ฌด ๋ง๋ํ์ง ์๊ฒ ์ต๋๋ค.
์ฃผ์, ์คํฌ๋ฆฐ์ท์ 30์ธ ๋ฏธ๋ง์ ์ฌ๋๋ค์๊ฒ ์ฝ๊ฐ์ ์์ฒ๋ฅผ ์ค ์ ์์ต๋๋ค.
- SQL ์๋ฒ ํตํฉ ์๋ฒ - ์ฐ๋ฆฌ๋ ํ๋ก์ ํธ ๋ด ํ๋ฆ์์ ์ด ๋์ง๋ฅผ ์ฌ์ฉํ์ต๋๋ค. ์ฌ์ค, ์ฐ๋ฆฌ๋ ์ด๋ฏธ SQL Server๋ฅผ ์ฌ์ฉํ๊ณ ์์ผ๋ฉฐ ETL ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ง ์๋ ๊ฒ์ด ์ด๋ป๊ฒ๋ ๋นํฉ๋ฆฌ์ ์ผ ๊ฒ์
๋๋ค. ๊ทธ๊ฒ์ ๋ชจ๋ ๊ฒ์ด ์ข์ต๋๋ค : ์ธํฐํ์ด์ค๊ฐ ์๋ฆ๋ต๊ณ ์งํ๋ฅ ๋ณด๊ณ ์ ... ํ์ง๋ง ์ด๊ฒ์ด ์ฐ๋ฆฌ๊ฐ ์ํํธ์จ์ด ์ ํ์ ์ข์ํ๋ ์ด์ ๊ฐ ์๋๋๋ค. ๊ทธ๊ฒ์ ๋ฒ์
dtsx
(์ ์ฅํ ๋ ๋ ธ๋๊ฐ ์์ธ XML) ํ ์ ์์ง๋ง ์์ ์ด ๋ฌด์์ ๋๊น? ํ ์๋ฒ์์ ๋ค๋ฅธ ์๋ฒ๋ก ์๋ฐฑ ๊ฐ์ ํ ์ด๋ธ์ ๋๋๊ทธํ๋ ์์ ํจํค์ง๋ฅผ ๋ง๋๋ ๊ฒ์ ์ด๋ป์ต๋๊น? ์, ๋ฐฑ, ๋ง์ฐ์ค ๋ฒํผ์ ํด๋ฆญํ๋ฉด ์ง๊ฒ ์๊ฐ๋ฝ์ด ์ค๋ฌด ์กฐ๊ฐ์์ ๋จ์ด์ง๋๋ค. ํ์ง๋ง ํ์คํ ๋ ํจ์ ๋๋ธํด ๋ณด์ ๋๋ค.
์ฐ๋ฆฌ๋ ํ์คํ ํ์ถ๊ตฌ๋ฅผ ์ฐพ์์ต๋๋ค. ์ผ์ด์ค ์ง์ ๊ฑฐ์ ์์ฒด ์์ฑ SSIS ํจํค์ง ์์ฑ๊ธฐ์ ์์ต๋๋ค ...
... ๊ทธ๋ฆฌ๊ณ ์๋ก์ด ์ง์ ์ด ์ ๋ฅผ ์ฐพ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ Apache Airflow๊ฐ ์ ๋ฅผ ์ถ์ํ์ต๋๋ค.
ETL ํ๋ก์ธ์ค ์ค๋ช ์ด ๊ฐ๋จํ Python ์ฝ๋๋ผ๋ ๊ฒ์ ์์์ ๋ ๋๋ ๊ธฐ๋ป์ ์ถค์ถ์ง ์์์ต๋๋ค. ์ด๊ฒ์ด ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๋ฒ์ ์ด ์ง์ ๋๊ณ ๊ตฌ๋ถ๋๋ ๋ฐฉ์์ด๋ฉฐ, ์๋ฐฑ ๊ฐ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๋จ์ผ ๊ตฌ์กฐ์ ํ ์ด๋ธ์ ํ๋์ ๋์์ผ๋ก ์์ ๋ถ๋ ๊ฒ์ 13 ๋๋ XNUMX๊ฐ์ XNUMX์ธ์น ํ๋ฉด์์ Python ์ฝ๋์ ๋ฌธ์ ๊ฐ ๋์์ต๋๋ค.
ํด๋ฌ์คํฐ ์กฐ๋ฆฝ
์์ ํ ์ ์น์์ ์ค๋นํ์ง ๋ง๊ณ ์ฌ๊ธฐ์์ Airflow, ์ ํํ ๋ฐ์ดํฐ๋ฒ ์ด์ค, Celery ๋ฐ ๋ํฌ์ ์ค๋ช ๋ ๊ธฐํ ์ฌ๋ก ์ค์น์ ๊ฐ์ด ์์ ํ ๋ช ๋ฐฑํ ๊ฒ์ ๋ํด ์ด์ผ๊ธฐํ์ง ๋ง์ญ์์ค.
์ฆ์ ์คํ์ ์์ํ ์ ์๋๋ก ์ค์ผ์นํ์ต๋๋ค. docker-compose.yml
์ฌ๊ธฐ์:
- ์ค์ ๋ก ์ฌ๋ฆฌ์ ๊ธฐ๋ฅ: ์ค์ผ์ค๋ฌ, ์น์๋ฒ. ๊ฝ์ ์
๋ฌ๋ฆฌ ์์
์ ๋ชจ๋ํฐ๋งํ๊ธฐ ์ํด ๊ทธ๊ณณ์์ ํ์ ํ ๊ฒ์
๋๋ค(์ด๋ฏธ
apache/airflow:1.10.10-python3.7
, ๊ทธ๋ฌ๋ ์ฐ๋ฆฌ๋ ์๊ดํ์ง ์์ต๋๋ค) - PostgreSQL, Airflow๋ ์๋น์ค ์ ๋ณด(์ค์ผ์ค๋ฌ ๋ฐ์ดํฐ, ์คํ ํต๊ณ ๋ฑ)๋ฅผ ์์ฑํ๊ณ Celery๋ ์๋ฃ๋ ์์ ์ ํ์ํฉ๋๋ค.
- Redis, Celery์ ํ์คํฌ ๋ธ๋ก์ปค ์ญํ ์ ํฉ๋๋ค.
- ์ ๋ฌ๋ฆฌ ์ผ๊พผ, ์์ ์ ์ง์ ์คํํฉ๋๋ค.
- ํด๋๋ก
./dags
dags์ ๋ํ ์ค๋ช ๊ณผ ํจ๊ป ํ์ผ์ ์ถ๊ฐํฉ๋๋ค. ์ฆ์์์ ํฝ์ ๋๋ฏ๋ก ์ฌ์ฑ๊ธฐ๋ฅผ ํ ๋๋ง๋ค ์ ์ฒด ์คํ์ ์ ๊ธ๋งํ ํ์๊ฐ ์์ต๋๋ค.
์ด๋ค ๊ณณ์์๋ ์์ ์ ์ฝ๋๊ฐ ์์ ํ ํ์๋์ง ์์ง๋ง(ํ ์คํธ๋ฅผ ์ด์ง๋ฝํ์ง ์๋๋ก) ์ด๋๊ฐ์์ ํ๋ก์ธ์ค์์ ์์ ๋ฉ๋๋ค. ์ ์ฒด ์์ ์ฝ๋ ์์ ๋ ์ ์ฅ์์์ ์ฐพ์ ์ ์์ต๋๋ค.
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
์ฐธ๊ณ ์ฌํญ :
- ์๊ณก์ ์กฐ๋ฆฝ์์ ๋๋ ์ ์๋ ค์ง ์ด๋ฏธ์ง์ ํฌ๊ฒ ์์กดํ์ต๋๋ค.
ํผํด/๋์ปค-๊ธฐ๋ฅ - ๊ผญ ํ์ธํ์ธ์. ์๋ง๋ ๋น์ ์ ๋น์ ์ ์ถ์์ ๋ค๋ฅธ ๊ฒ์ด ํ์ํ์ง ์์ ๊ฒ์ ๋๋ค. - ๋ชจ๋ ๊ธฐ๋ฅ ์ค์ ์
airflow.cfg
, ๋ฟ๋ง ์๋๋ผ (๊ฐ๋ฐ์ ๋๋ถ์) ํ๊ฒฝ ๋ณ์๋ฅผ ํตํด ์ ์์ ์ผ๋ก ์ด์ฉํ์ต๋๋ค. - ๋น์ฐํ ํ๋ก๋์ ์ค๋น๊ฐ ๋์ด ์์ง ์์ต๋๋ค. ์ผ๋ถ๋ฌ ์ปจํ ์ด๋์ ํํธ๋นํธ๋ฅผ ๋ฃ์ง ์์๊ณ ๋ณด์์ ์ ๊ฒฝ ์ฐ์ง ์์์ต๋๋ค. ๊ทธ๋ฌ๋ ๋๋ ์คํ์๋ค์๊ฒ ์ ํฉํ ์ต์ํ์ ์์ ์ ์ํํ์ต๋๋ค.
- ์ฐธ๊ณ :
- dag ํด๋๋ ์ค์ผ์ค๋ฌ์ ์์ ์ ๋ชจ๋์ ์ก์ธ์คํ ์ ์์ด์ผ ํฉ๋๋ค.
- ๋ชจ๋ ํ์ฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์๋ ๋์ผํ๊ฒ ์ ์ฉ๋ฉ๋๋ค. ์ค์ผ์ค๋ฌ์ ์์ ์๊ฐ ์๋ ์์คํ ์ ๋ชจ๋ ์ค์นํด์ผ ํฉ๋๋ค.
์ด์ ๊ฐ๋จํฉ๋๋ค.
$ docker-compose up --scale worker=3
๋ชจ๋ ๊ฒ์ด ์์นํ๋ฉด ์น ์ธํฐํ์ด์ค๋ฅผ ๋ณผ ์ ์์ต๋๋ค.
- ๊ธฐ๋ฅ :
http://127.0.0.1:8080/admin/ - ๊ฝ:
http://127.0.0.1:5555/dashboard
๊ธฐ๋ณธ ๊ฐ๋
์ด ๋ชจ๋ "dags"์์ ์๋ฌด๊ฒ๋ ์ดํดํ์ง ๋ชปํ๋ค๋ฉด ์ฌ๊ธฐ์ ์งง์ ์ฌ์ ์ด ์์ต๋๋ค.
- ์ค์ผ์ค๋ฌ -Airflow์์ ๊ฐ์ฅ ์ค์ํ ์ผ์ด, ๋ก๋ด์ด ์ฌ๋์ด ์๋ ์ด์ฌํ ์๋ํ๋๋ก ์ ์ด: ์ผ์ ๋ชจ๋ํฐ๋ง, dags ์
๋ฐ์ดํธ, ์์
์์.
์ผ๋ฐ์ ์ผ๋ก ์ด์ ๋ฒ์ ์์๋ ๋ฉ๋ชจ๋ฆฌ์ ๋ฌธ์ ๊ฐ ์์๊ณ (๊ธฐ์ต ์์ค์ด ์๋๋ผ ๋์) ๋ ๊ฑฐ์ ๋งค๊ฐ๋ณ์๊ฐ ๊ตฌ์ฑ์ ๋จ์ ์์์ต๋๋ค.
run_duration
- ์ฌ์์ ๊ฐ๊ฒฉ. ๊ทธ๋ฌ๋ ์ด์ ๋ชจ๋ ๊ฒ์ด ๊ด์ฐฎ์ต๋๋ค. - DAG (์ผ๋ช
"dag") - "๋ฐฉํฅ์ฑ ๋น์ํ ๊ทธ๋ํ", ๊ทธ๋ฌ๋ ๊ทธ๋ฌํ ์ ์๋ ์์์ ์ฌ๋๋ค์๊ฒ๋ง ์๋ ค์ค ๊ฒ์
๋๋ค. ๊ทธ๋ฌ๋ ์ค์ ๋ก๋ ์๋ก ์ํธ ์์ฉํ๋ ์์
์ ์ํ ์ปจํ
์ด๋(์๋ ์ฐธ์กฐ)์ด๊ฑฐ๋ SSIS์ ํจํค์ง ๋ฐ Informatica์ ์ํฌํ๋ก์ ์ ์ฌํฉ๋๋ค. .
dags ์ธ์๋ ์ฌ์ ํ ํ์ dag๊ฐ ์์ ์ ์์ง๋ง ๋๋ถ๋ถ ๋๋ฌํ์ง ๋ชปํ ๊ฒ์ ๋๋ค.
- DAG ์คํ - ์์ฒด์ ์ผ๋ก ํ ๋น๋ ์ด๊ธฐํ๋ dag
execution_date
. ๊ฐ์ dag์ Dagrans๋ ๋ณ๋ ฌ๋ก ์์ ํ ์ ์์ต๋๋ค(๋ฌผ๋ก ์์ ์ ๋ฉฑ๋ฑ์ฑ์ผ๋ก ๋ง๋ ๊ฒฝ์ฐ). - ์ด์์ ํน์ ์์
์ ์ํํ๋ ์ฝ๋ ์กฐ๊ฐ์
๋๋ค. ์ธ ๊ฐ์ง ์ ํ์ ์ฐ์ฐ์๊ฐ ์์ต๋๋ค.
- ๋์์ฐ๋ฆฌ๊ฐ ๊ฐ์ฅ ์ข์ํ๋ ๊ฒ์ฒ๋ผ
PythonOperator
๋ชจ๋ (์ ํจํ) Python ์ฝ๋๋ฅผ ์คํํ ์ ์์ต๋๋ค. - ์ด์ , ์๋ฅผ ๋ค์ด ์ฅ์์์ ์ฅ์๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๋
MsSqlToHiveTransfer
; - ๊ฐ์ง๊ธฐ ๋ฐ๋ฉด์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ๋๊น์ง dag์ ์ถ๊ฐ ์คํ์ ๋ฐ์ํ๊ฑฐ๋ ์๋๋ฅผ ๋ฆ์ถ ์ ์์ต๋๋ค.
HttpSensor
์ง์ ๋ ๋์ ์ ๊ฐ์ ธ์ฌ ์ ์์ผ๋ฉฐ ์ํ๋ ์๋ต์ด ๋๊ธฐ ์ค์ผ ๋ ์ ์ก์ ์์ํฉ๋๋ค.GoogleCloudStorageToS3Operator
. ํธ๊ธฐ์ฌ ๋ง์ ๋ง์์ "์? ๊ฒฐ๊ตญ ์ฐ์ฐ์์์ ๋ฐ๋ก ๋ฐ๋ณต์ ์ํํ ์ ์์ต๋๋ค!โ ๊ทธ๋ฐ ๋ค์ ์ ์ง๋ ์ด์์๋ก ์์ ํ์ ๋งํ์ง ์๋๋ก ํฉ๋๋ค. ์ผ์๋ ๋ค์ ์๋ ์ ์ ์์, ํ์ธ ๋ฐ ์ข ๋ฃ๋ฉ๋๋ค.
- ๋์์ฐ๋ฆฌ๊ฐ ๊ฐ์ฅ ์ข์ํ๋ ๊ฒ์ฒ๋ผ
- ํ์คํฌ - ์ ์ธ๋ ์ฐ์ฐ์๋ ์ ํ์ ๊ด๊ณ์์ด dag์ ๋ถ์ฐฉ๋์ด ์์ ์ ์์๋ก ์น๊ฒฉ๋ฉ๋๋ค.
- ์์
์ธ์คํด์ค -์ผ๋ฐ ๊ธฐํ์๊ฐ ์ํ์-์์
์์ ๋ํ ์ ํฌ์ ์์
์ ๋ณด๋ผ ์๊ฐ์ด๋ผ๊ณ ๊ฒฐ์ ํ์ ๋ (์ฐ๋ฆฌ๊ฐ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ๋ฐ๋ก ๊ทธ ์๋ฆฌ์์)
LocalExecutor
๋๋ ์๊ฒฉ ๋ ธ๋์ ๊ฒฝ์ฐCeleryExecutor
) ์ปจํ ์คํธ๋ฅผ ํ ๋นํ๊ณ (์ฆ, ๋ณ์ ์ธํธ - ์คํ ๋งค๊ฐ๋ณ์) ๋ช ๋ น ๋๋ ์ฟผ๋ฆฌ ํ ํ๋ฆฟ์ ํ์ฅํ๊ณ ํ๋งํฉ๋๋ค.
์ฐ๋ฆฌ๋ ์์ ์ ์์ฑํฉ๋๋ค
๋จผ์ doug์ ์ผ๋ฐ์ ์ธ ๊ณํ์ ๊ฐ๋ต์ ์ผ๋ก ์ค๋ช ํ ๋ค์ ์ฌ์ํ ์๋ฃจ์ ์ ์ ์ฉํ๊ธฐ ๋๋ฌธ์ ์ธ๋ถ ์ฌํญ์ ์ ์ ๋ ์์ธํ ์ดํด ๋ณด๊ฒ ์ต๋๋ค.
๋ฐ๋ผ์ ๊ฐ์ฅ ๊ฐ๋จํ ํํ๋ก ์ด๋ฌํ dag๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
๊ทธ๊ฒ์ ์์ ๋ด ์๋ค :
- ๋จผ์ ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ๊ฐ์ ธ์ค๊ณ ๋ค๋ฅธ ๊ฒ;
sql_server_ds
-๊ฐ์List[namedtuple[str, str]]
Airflow Connections์ ์ฐ๊ฒฐ ์ด๋ฆ๊ณผ ํ๋ ์ดํธ๋ฅผ ๊ฐ์ ธ์ฌ ๋ฐ์ดํฐ๋ฒ ์ด์ค;dag
-๋ฐ๋์ ์์ด์ผํ๋ ์ฐ๋ฆฌ dag์ ๋ฐํglobals()
, ๊ทธ๋ ์ง ์์ผ๋ฉด Airflow์์ ์ฐพ์ ์ ์์ต๋๋ค. Doug๋ ๋ํ ๋ค์๊ณผ ๊ฐ์ด ๋งํ ํ์๊ฐ ์์ต๋๋ค.- ๊ทธ์ ์ด๋ฆ์ ๋ฌด์์
๋๊น
orders
- ๊ทธ๋ฌ๋ฉด ์ด ์ด๋ฆ์ด ์น ์ธํฐํ์ด์ค์ ํ์๋ฉ๋๋ค. - ๊ทธ๋ XNUMX์ XNUMX์ผ ์์ ๋ถํฐ ์ผํ ๊ฒ์ด๋ฉฐ,
- ์ฝ 6์๊ฐ๋ง๋ค ์คํ๋์ด์ผ ํฉ๋๋ค.
timedelta()
ํ์ฉcron
-์0 0 0/6 ? * * *
, ๋ ๋ฉ์ง ๊ฒฝ์ฐ - ๋ค์๊ณผ ๊ฐ์ ํํ@daily
);
- ๊ทธ์ ์ด๋ฆ์ ๋ฌด์์
๋๊น
workflow()
์ฃผ์ ์์ ์ ์ํํ์ง๋ง ์ง๊ธ์ ์๋๋๋ค. ์ง๊ธ์ ์ปจํ ์คํธ๋ฅผ ๋ก๊ทธ์ ๋คํํฉ๋๋ค.- ์ด์ ์์
์์ฑ์ ๊ฐ๋จํ ๋ง๋ฒ:
- ์ฐ๋ฆฌ๋ ์์ค๋ฅผ ํตํด ์คํํฉ๋๋ค.
- ์ด๊ธฐํ
PythonOperator
, ๋๋ฏธ๋ฅผ ์คํํ ๊ฒ์ ๋๋ค.workflow()
. ์์ ์ ๊ณ ์ ํ(dag ๋ด) ์ด๋ฆ์ ์ง์ ํ๊ณ dag ์์ฒด๋ฅผ ์ฐ๊ฒฐํ๋ ๊ฒ์ ์์ง ๋ง์ญ์์ค. ๊น๋ฐprovide_context
์ฐจ๋ก๋ก ํจ์์ ์ถ๊ฐ ์ธ์๋ฅผ ์ถ๊ฐํ๊ณ ๋ค์์ ์ฌ์ฉํ์ฌ ์ ์คํ๊ฒ ์์งํฉ๋๋ค.**context
.
์ง๊ธ์ ๊ทธ๊ฒ ๋ค์ ๋๋ค. ์ฐ๋ฆฌ๊ฐ ์ป์ ๊ฒ:
- ์น ์ธํฐํ์ด์ค์ ์๋ก์ด dag,
- ๋ณ๋ ฌ๋ก ์คํ๋ XNUMX๊ฐ์ ์์ (Airflow, Celery ์ค์ ๋ฐ ์๋ฒ ์ฉ๋์ด ํ์ฉํ๋ ๊ฒฝ์ฐ).
๊ธ์, ๊ฑฐ์ ์์์ด์.
๋๊ฐ ์ข
์์ฑ์ ์ค์นํฉ๋๊น?
์ด ๋ชจ๋ ๊ฒ์ ๋จ์ํํ๊ธฐ ์ํด ๋๋ ๋ง์ณค์ต๋๋ค docker-compose.yml
๊ฐ๊ณต requirements.txt
๋ชจ๋ ๋
ธ๋์์.
์ด์ ์ฌ๋ผ์ก์ต๋๋ค.
ํ์ ์ฌ๊ฐํ์ ์ค์ผ์ค๋ฌ๊ฐ ์ฒ๋ฆฌํ๋ ์์ ์ธ์คํด์ค์ ๋๋ค.
์ฐ๋ฆฌ๋ ์กฐ๊ธ ๊ธฐ๋ค๋ฆฌ๋ฉด ์์ ์ด ์์ ์์ ์ํด ์ค๋ ๋ฉ๋๋ค.
๋ฌผ๋ก ๋ น์์ ์ฑ๊ณต์ ์ผ๋ก ์์ ์ ์๋ฃํ์ต๋๋ค. Reds๋ ๊ทธ๋ค์ง ์ฑ๊ณต์ ์ด์ง ์์ต๋๋ค.
๊ทธ๋ฐ๋ฐ ์ฐ๋ฆฌ ์ ํ์๋ ํด๋๊ฐ ์์ต๋๋ค.
./dags
, ๊ธฐ๊ณ๊ฐ์ ๋๊ธฐํ๊ฐ ์์ต๋๋ค. ๋ชจ๋ dag๊ฐ ์์ต๋๋ค.git
Gitlab์์ Gitlab CI๋ ๋ณํฉํ ๋ ์ปดํจํฐ์ ์ ๋ฐ์ดํธ๋ฅผ ๋ฐฐํฌํฉ๋๋ค.master
.
๊ฝ์ ๋ํด ์กฐ๊ธ
์ผ๊พผ๋ค์ด ์ ๊ผญ์ง๋ฅผ ๋๋ฆฌ๋ ๋์ ์ฐ๋ฆฌ์๊ฒ ๋ฌด์์ธ๊ฐ๋ฅผ ๋ณด์ฌ์ค ์ ์๋ ๋ ๋ค๋ฅธ ๋๊ตฌ์ธ ๊ฝ์ ๊ธฐ์ตํด ๋ด ์๋ค.
์์ ์ ๋ ธ๋์ ๋ํ ์์ฝ ์ ๋ณด๊ฐ ์๋ ์ฒซ ๋ฒ์งธ ํ์ด์ง:
์์ ์ด ์งํ๋ ๊ฐ์ฅ ์ง์ค์ ์ธ ํ์ด์ง:
๋ธ๋ก์ปค ์ํ๊ฐ ์๋ ๊ฐ์ฅ ์ง๋ฃจํ ํ์ด์ง:
๊ฐ์ฅ ๋ฐ์ ํ์ด์ง๋ ์์ ์ํ ๊ทธ๋ํ์ ์คํ ์๊ฐ์ด ์๋ ํ์ด์ง์ ๋๋ค.
์ฐ๋ฆฌ๋ ์ ๋ถํ๋ฅผ ๋ก๋ํฉ๋๋ค
๋ฐ๋ผ์ ๋ชจ๋ ์์ ์ด ์๋ฃ๋์์ผ๋ฏ๋ก ๋ถ์์๋ฅผ ์ฎ๊ธธ ์ ์์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ ์ฌ๋ฌ ๊ฐ์ง ์ด์ ๋ก ๋ถ์์๊ฐ ๋ง์์ต๋๋ค. Airflow๋ฅผ ์ฌ๋ฐ๋ฅด๊ฒ ์ฌ์ฉํ ๊ฒฝ์ฐ ๋ฐ๋ก ์ด๋ฌํ ์ฌ๊ฐํ์ ๋ฐ์ดํฐ๊ฐ ํ์คํ ๋์ฐฉํ์ง ์์์์ ๋ํ๋ ๋๋ค.
๋ก๊ทธ๋ฅผ ๋ณด๊ณ ํ๋ฝํ ์์ ์ธ์คํด์ค๋ฅผ ๋ค์ ์์ํด์ผ ํฉ๋๋ค.
์ฌ๊ฐํ์ ํด๋ฆญํ๋ฉด ์ฌ์ฉ ๊ฐ๋ฅํ ์์ ์ด ํ์๋ฉ๋๋ค.
์ฐ๋ฌ์ง ์๋ค์ ๋ฐ๋ ค๊ฐ์ ํด๋ฆฌ์ดํ ์ ์์ต๋๋ค. ์ฆ, ๊ฑฐ๊ธฐ์์ ๋ฌด์ธ๊ฐ ์คํจํ๋ค๋ ์ฌ์ค์ ์๊ณ ๋์ผํ ์ธ์คํด์ค ์์ ์ด ์ค์ผ์ค๋ฌ๋ก ์ด๋ํฉ๋๋ค.
๋ชจ๋ ๋นจ๊ฐ์ ์ฌ๊ฐํ์ด ์๋ ๋ง์ฐ์ค๋ก ์ด ์์
์ ์ํํ๋ ๊ฒ์ ๊ทธ๋ค์ง ์ธ๋์ ์ด์ง ์๋ค๋ ๊ฒ์ด ๋ถ๋ช
ํฉ๋๋ค. ์ด๊ฒ์ Airflow์์ ๊ธฐ๋ํ๋ ๊ฒ์ด ์๋๋๋ค. ๋น์ฐํ ์ฐ๋ฆฌ์๊ฒ๋ ๋๋ ์ด์ ๋ฌด๊ธฐ๊ฐ ์์ต๋๋ค. Browse/Task Instances
ํ ๋ฒ์ ๋ชจ๋ ํญ๋ชฉ์ ์ ํํ๊ณ XNUMX์ผ๋ก ์ฌ์ค์ ํ๊ณ ์ฌ๋ฐ๋ฅธ ํญ๋ชฉ์ ํด๋ฆญํฉ๋๋ค.
์ฒญ์ ํ ํ์๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค(์ค์ผ์ค๋ฌ๊ฐ ์์ฝํ๊ธฐ๋ฅผ ์ด๋ฏธ ๊ธฐ๋ค๋ฆฌ๊ณ ์์).
์ฐ๊ฒฐ, ํํฌ ๋ฐ ๊ธฐํ ๋ณ์
๋ค์ DAG๋ฅผ ์ดํด๋ณผ ์๊ฐ์
๋๋ค. update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ะะพัะฟะพะดะฐ ั
ะพัะพัะธะต, ะพััะตัั ะพะฑะฝะพะฒะปะตะฝั"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ะะฐัะฐั, ะฟัะพััะฟะฐะนัั, ะผั {{ dag.dag_id }} ััะพะฝะธะปะธ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
๋ชจ๋๊ฐ ๋ณด๊ณ ์ ์ ๋ฐ์ดํธ๋ฅผ ์ํํ ์ ์ด ์์ต๋๊น? ์ด๊ฒ์ ๋ค์ ๊ทธ๋ ์ ๋๋ค. ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ ์์ค ๋ชฉ๋ก์ด ์์ต๋๋ค. ๋ฃ์ ๋ชฉ๋ก์ด ์์ต๋๋ค. ๋ชจ๋ ์ผ์ด ๋ฐ์ํ๊ฑฐ๋ ์ค๋จ๋์์ ๋ ๊ฒฝ์ ์ ์ธ๋ฆฌ๋ ๊ฒ์ ์์ง ๋ง์ญ์์ค. (์, ์ด๊ฒ์ ์ฐ๋ฆฌ์ ๊ดํ ๊ฒ์ด ์๋๋๋ค.)
ํ์ผ์ ๋ค์ ์ดํด๋ณด๊ณ ๋ชจํธํ ์ ํญ๋ชฉ์ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
from commons.operators import TelegramBotSendMessage
- ์ฐ๋ฆฌ๊ฐ Unblocked์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ธฐ ์ํ ์์ ๋ํผ๋ฅผ ๋ง๋ค์ด ํ์ฉํ ์์ฒด ์ฐ์ฐ์๋ฅผ ๋ง๋๋ ๊ฒ์ ๋ฐฉํดํ๋ ๊ฒ์ ์์ต๋๋ค. (์๋์์ ์ด ์ฐ์ฐ์์ ๋ํด ์์ธํ ์ค๋ช ํ๊ฒ ์ต๋๋ค.)default_args={}
- dag๋ ๋ชจ๋ ์ฐ์ฐ์์ ๋์ผํ ์ธ์๋ฅผ ๋ฐฐํฌํ ์ ์์ต๋๋ค.to='{{ var.value.all_the_kings_men }}'
- ํ๋to
์ฐ๋ฆฌ๋ ํ๋์ฝ๋ฉํ์ง ์๊ณ Jinja์ ๋ด๊ฐ ์กฐ์ฌ์ค๋ฝ๊ฒ ์ ๋ ฅํ ์ด๋ฉ์ผ ๋ชฉ๋ก์ด ์๋ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ ๋์ ์ผ๋ก ์์ฑํฉ๋๋ค.Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
- ์คํผ๋ ์ดํฐ ์์ ์กฐ๊ฑด. ์ฐ๋ฆฌ์ ๊ฒฝ์ฐ ๋ชจ๋ ์ข ์์ฑ์ด ํด๊ฒฐ๋ ๊ฒฝ์ฐ์๋ง ํธ์ง๊ฐ ์์ฌ์๊ฒ ์ ๋ฌ๋ฉ๋๋ค. ์ฑ๊ณต์ ์ผ๋ก;tg_bot_conn_id='tg_main'
- ์ธ์conn_id
์ฐ๋ฆฌ๊ฐ ๋ง๋ ์ฐ๊ฒฐ ID๋ฅผ ์๋ฝํฉ๋๋ค.Admin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- Telegram์ ๋ฉ์์ง๋ ๋จ์ด์ง ์์ ์ด ์๋ ๊ฒฝ์ฐ์๋ง ๋ ์๊ฐ๋๋ค.task_concurrency=1
- ํ๋์ ์์ ์ ๋ํ ์ฌ๋ฌ ์์ ์ธ์คํด์ค์ ๋์ ์์์ ๊ธ์งํฉ๋๋ค. ๊ทธ๋ ์ง ์์ผ๋ฉด ์ฌ๋ฌ ์ ํ์ ๋์์ ์ถ์ํ ๊ฒ์ ๋๋ค.VerticaOperator
(ํ๋์ ํ ์ด๋ธ์ ๋ณด๊ณ );report_update >> [email, tg]
- ๋ชจ๋VerticaOperator
๋ค์๊ณผ ๊ฐ์ด ํธ์ง์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ๋ฐ ์๋ ดํฉ๋๋ค.
๊ทธ๋ฌ๋ ์๋ฆผ ์ฐ์ฐ์๋ ์์ ์กฐ๊ฑด์ด ๋ค๋ฅด๊ธฐ ๋๋ฌธ์ ํ๋๋ง ์๋ํฉ๋๋ค. ํธ๋ฆฌ ๋ณด๊ธฐ์์๋ ๋ชจ๋ ๊ฒ์ด ๋ ์๊ฐ์ ์ผ๋ก ๋ณด์ ๋๋ค.
๋๋์ ๋ํด ๋ช ๋ง๋ ๋งํ ๊ฒ์ด๋ค ๋งคํฌ๋ก ๊ทธ๋ฆฌ๊ณ ๊ทธ๋ค์ ์น๊ตฌ๋ค - ๋ณ์.
๋งคํฌ๋ก๋ ๋ค์ํ ์ ์ฉํ ์ ๋ณด๋ฅผ ์ฐ์ฐ์ ์ธ์๋ก ๋์ฒดํ ์ ์๋ Jinja ์๋ฆฌ ํ์์์ ๋๋ค. ์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
์ปจํ
์คํธ ๋ณ์์ ๋ด์ฉ์ผ๋ก ํ์ฅ๋ฉ๋๋ค. execution_date
ํ์์ผ๋ก YYYY-MM-DD
: 2020-07-14
. ๊ฐ์ฅ ์ข์ ์ ์ ์ปจํ
์คํธ ๋ณ์๊ฐ ํน์ ์์
์ธ์คํด์ค(ํธ๋ฆฌ ๋ณด๊ธฐ์ ์ฌ๊ฐํ)์ ๊ณ ์ ๋๊ณ ๋ค์ ์์ํ๋ฉด ์๋ฆฌ ํ์์๊ฐ ๋์ผํ ๊ฐ์ผ๋ก ํ์ฅ๋๋ค๋ ๊ฒ์
๋๋ค.
ํ ๋น๋ ๊ฐ์ ๊ฐ ์์ ์ธ์คํด์ค์ ๋ ๋๋ง๋จ ๋ฒํผ์ ์ฌ์ฉํ์ฌ ๋ณผ ์ ์์ต๋๋ค. ํธ์ง๋ฅผ ๋ณด๋ด๋ ์์ ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
๋ฐ๋ผ์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ์์ ์์ ๋ค์์ ์ํํฉ๋๋ค.
์ฌ์ฉ ๊ฐ๋ฅํ ์ต์ ๋ฒ์ ์ ๋ด์ฅ ๋งคํฌ๋ก ์ ์ฒด ๋ชฉ๋ก์ ์ฌ๊ธฐ์์ ํ์ธํ ์ ์์ต๋๋ค.
๋ํ ํ๋ฌ๊ทธ์ธ์ ๋์์ผ๋ก ์ฐ๋ฆฌ๋ ์ฐ๋ฆฌ ์์ ์ ๋งคํฌ๋ก๋ฅผ ์ ์ธํ ์ ์์ง๋ง ๊ทธ๊ฒ์ ๋ ๋ค๋ฅธ ์ด์ผ๊ธฐ์ ๋๋ค.
๋ฏธ๋ฆฌ ์ ์๋ ๊ฒ ์ธ์๋ ๋ณ์์ ๊ฐ์ ๋์ฒดํ ์ ์์ต๋๋ค(์ด๋ฏธ ์์ ์ฝ๋์์ ์ฌ์ฉํ์ต๋๋ค). ์์ ์์ฑํ์ Admin/Variables
๋ช ๊ฐ์ง:
์ฌ์ฉํ ์ ์๋ ๋ชจ๋ ๊ฒ:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
๊ฐ์ ์ค์นผ๋ผ์ด๊ฑฐ๋ JSON์ผ ์๋ ์์ต๋๋ค. JSON์ ๊ฒฝ์ฐ:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
์ํ๋ ํค์ ๊ฒฝ๋ก๋ฅผ ์ฌ์ฉํ์ญ์์ค. {{ var.json.bot_config.bot.token }}
.
๋๋ ๋ฌธ์ ๊ทธ๋๋ก ํ ๋จ์ด๋ฅผ ๋งํ๊ณ ํ๋์ ์คํฌ๋ฆฐ ์ท์ ๋ณด์ฌ์ค๋๋ค. ์ฐ๊ฒฐ. ์ฌ๊ธฐ์์๋ ๋ชจ๋ ๊ฒ์ด ๊ธฐ๋ณธ์
๋๋ค. ํ์ด์ง์์ Admin/Connections
์ฐ๋ฆฌ๋ ์ฐ๊ฒฐ์ ์์ฑํ๊ณ ๊ฑฐ๊ธฐ์ ๋ก๊ทธ์ธ/์ํธ ๋ฐ ๋ณด๋ค ๊ตฌ์ฒด์ ์ธ ๋งค๊ฐ ๋ณ์๋ฅผ ์ถ๊ฐํฉ๋๋ค. ์ด์ ๊ฐ์ด:
์ํธ๋ฅผ ์ํธํํ๊ฑฐ๋(๊ธฐ๋ณธ๊ฐ๋ณด๋ค ๋ ์ฒ ์ ํ๊ฒ) ์ฐ๊ฒฐ ์ ํ์ ์๋ตํ ์ ์์ต๋๋ค(์: tg_main
) - ์ฌ์ค์ ์ ํ ๋ชฉ๋ก์ด Airflow ๋ชจ๋ธ์ ๊ณ ์ ๋์ด ์๊ณ ์์ค ์ฝ๋์ ๋ค์ด๊ฐ์ง ์๊ณ ๋ ํ์ฅํ ์ ์๋ค๋ ๊ฒ์
๋๋ค(๊ฐ์๊ธฐ Google์ ๋ฌด์ธ๊ฐ๋ฅผ ๊ฒ์ํ์ง ์์ ๊ฒฝ์ฐ ์์ ํด ์ฃผ์ธ์). ์ด๋ฆ.
๋์ผํ ์ด๋ฆ์ผ๋ก ์ฌ๋ฌ ์ฐ๊ฒฐ์ ๋ง๋ค ์๋ ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ ๋ฉ์๋๋ BaseHook.get_connection()
, ์ด๋ฆ์ผ๋ก ์ฐ๋ฆฌ๋ฅผ ์ฐ๊ฒฐ์์ผ ์ค ๊ฒ์
๋๋ค. ๋ฌด์์์ ์ฌ๋ฌ ์ด๋ฆ์์ ๋ฐ์์ต๋๋ค(Round Robin์ ๋ง๋๋ ๊ฒ์ด ๋ ๋
ผ๋ฆฌ์ ์ด์ง๋ง Airflow ๊ฐ๋ฐ์์ ์์ฌ์ ๋งก๊ธฐ๊ฒ ์ต๋๋ค).
๋ณ์ ๋ฐ ์ฐ๊ฒฐ์ ํ์คํ ๋ฉ์ง ๋๊ตฌ์ด์ง๋ง ๊ท ํ์ ์์ง ์๋ ๊ฒ์ด ์ค์ํฉ๋๋ค. ์ฝ๋ ์์ฒด์ ์ ์ฅํ๋ ํ๋ฆ ๋ถ๋ถ๊ณผ ์ ์ฅ์ ์ํด Airflow์ ์ ๊ณตํ๋ ๋ถ๋ถ. ํํธ์ผ๋ก๋ UI๋ฅผ ํตํด ์ฐํธํจ๊ณผ ๊ฐ์ ๊ฐ์ ๋น ๋ฅด๊ฒ ๋ณ๊ฒฝํ๋ ๊ฒ์ด ํธ๋ฆฌํ ์ ์์ต๋๋ค. ๋ฐ๋ฉด์ ์ด๊ฒ์ ์ฌ์ ํ โโ์ฐ๋ฆฌ๊ฐ ์ ๊ฑฐํ๊ณ ์ถ์๋ ๋ง์ฐ์ค ํด๋ฆญ์ผ๋ก์ ๋ณต๊ท์ ๋๋ค.
์ฐ๊ฒฐ ์์
์ ์์
์ค ํ๋์
๋๋ค. ํํฌ. ์ผ๋ฐ์ ์ผ๋ก Airflow hook์ ์ด๋ฅผ ํ์ฌ ์๋น์ค ๋ฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ฐ๊ฒฐํ๊ธฐ ์ํ ์ง์ ์
๋๋ค. ์๋ฅผ ๋ค์ด, JiraHook
Jira์ ์ํธ ์์ฉํ ์ ์๋๋ก ํด๋ผ์ด์ธํธ๋ฅผ ์ด ๊ฒ์ด๋ฉฐ(์์
์ ์๋ค๋ก ์ด๋ํ ์ ์์) SambaHook
๋ก์ปฌ ํ์ผ์ smb
-๊ฐ๋ฆฌํค๋ค.
์ฌ์ฉ์ ์ง์ ์ฐ์ฐ์ ๊ตฌ๋ฌธ ๋ถ์
๊ทธ๋ฆฌ๊ณ ์ฐ๋ฆฌ๋ ๊ทธ๊ฒ์ด ์ด๋ป๊ฒ ๋ง๋ค์ด์ง๋์ง ๊ฑฐ์ ๋ณผ ์ ์์์ต๋๋ค. TelegramBotSendMessage
์ํธ commons/operators.py
์ค์ ์ฐ์ฐ์:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
์ฌ๊ธฐ์์ Airflow์ ๋ค๋ฅธ ๋ชจ๋ ๊ฒ๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก ๋ชจ๋ ๊ฒ์ด ๋งค์ฐ ๊ฐ๋จํฉ๋๋ค.
- ์์ ์์
BaseOperator
, ๊ฝค ๋ง์ Airflow ๊ด๋ จ ์ฌํญ์ ๊ตฌํํฉ๋๋ค(์ฌ๊ฐ๋ฅผ ๋ณด์ญ์์ค). - ์ ์ธ๋ ํ๋
template_fields
, ์ฌ๊ธฐ์ Jinja๋ ์ฒ๋ฆฌํ ๋งคํฌ๋ก๋ฅผ ์ฐพ์ต๋๋ค. - ์ ๋ํ ์ฌ๋ฐ๋ฅธ ์ฃผ์ฅ์ ์ ๋ฆฌํ์ต๋๋ค.
__init__()
, ํ์ํ ๊ฒฝ์ฐ ๊ธฐ๋ณธ๊ฐ์ ์ค์ ํฉ๋๋ค. - ์กฐ์์ ์ด๊ธฐํ๋ ์์ง ์์์ต๋๋ค.
- ํด๋น ํํฌ๋ฅผ ์ด์์ต๋๋ค.
TelegramBotHook
๊ทธ๊ฒ์ผ๋ก๋ถํฐ ํด๋ผ์ด์ธํธ ๊ฐ์ฒด๋ฅผ ๋ฐ์์ต๋๋ค. - ์ฌ์ ์(์ฌ์ ์) ๋ฉ์๋
BaseOperator.execute()
, Airfow๋ ์ด์์๋ฅผ ์์ํ ๋๊ฐ๋๋ฉด ํธ ์์นํฉ๋๋ค. ๋ก๊ทธ์ธํ๋ ๊ฒ์ ์๊ณ ๊ธฐ๋ณธ ์์ ์ ๊ตฌํํฉ๋๋ค. (๊ทธ๋ฐ๋ฐ, ๋ฐ๋ก ๋ก๊ทธ์ธํฉ๋๋ค.stdout
ะธstderr
- ๊ธฐ๋ฅ๊ฐ ๋ชจ๋ ๊ฒ์ ๊ฐ๋ก๋ง๊ณ ์๋ฆ๋ต๊ฒ ๊ฐ์ธ๊ณ ํ์ํ ๊ณณ์ ๋ถํดํฉ๋๋ค.)
์ฐ๋ฆฌ๊ฐ ๊ฐ์ง ๊ฒ์ ๋ณด์ commons/hooks.py
. ํํฌ ์์ฒด๊ฐ ํฌํจ๋ ํ์ผ์ ์ฒซ ๋ฒ์งธ ๋ถ๋ถ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
์ฌ๊ธฐ์์ ๋ฌด์์ ์ค๋ช ํด์ผํ ์ง์กฐ์ฐจ ๋ชจ๋ฅด๊ฒ ๊ณ ์ค์ํ ์ฌํญ๋ง ์ธ๊ธํ๊ฒ ์ต๋๋ค.
- ์ฐ๋ฆฌ๋ ์์ํ๊ณ ์ธ์์ ๋ํด ์๊ฐํฉ๋๋ค. ๋๋ถ๋ถ์ ๊ฒฝ์ฐ ๋ค์ ์ค ํ๋์
๋๋ค.
conn_id
; - ํ์ค ๋ฐฉ๋ฒ ์ฌ์ ์: ๋๋ ์ค์ค๋ก๋ฅผ ์ ํํ๋ค
get_conn()
, ์ด๋ฆ์ผ๋ก ์ฐ๊ฒฐ ๋งค๊ฐ๋ณ์๋ฅผ ๊ฐ์ ธ์ค๊ณ ์น์ ๋ง ๊ฐ์ ธ์ต๋๋ค.extra
(์ด๊ฒ์ JSON ํ๋์ ๋๋ค) (๋ด ์์ ์ ์ง์นจ์ ๋ฐ๋ผ!) Telegram ๋ด ํ ํฐ์ ๋ฃ์ต๋๋ค.{"bot_token": "YOuRAwEsomeBOtToKen"}
. - ๋๋ ์ฐ๋ฆฌ์ ์ธ์คํด์ค๋ฅผ ์์ฑ
TelegramBot
, ํน์ ํ ํฐ์ ์ ๊ณตํฉ๋๋ค.
๊ทธ๊ฒ ๋ค์ผ. ๋ค์์ ์ฌ์ฉํ์ฌ ํํฌ์์ ํด๋ผ์ด์ธํธ๋ฅผ ๊ฐ์ ธ์ฌ ์ ์์ต๋๋ค. TelegramBotHook().clent
๋๋ TelegramBotHook().get_conn()
.
๊ทธ๋ฆฌ๊ณ Telegram REST API์ ๋ํ ๋ง์ดํฌ๋ก ๋ฉํผ๋ฅผ ๋ง๋๋ ํ์ผ์ ๋ ๋ฒ์งธ ๋ถ๋ถ์ ๋์ผํ ํญ๋ชฉ์ ๋๋๊ทธํ์ง ์๋๋ก ํฉ๋๋ค. python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
์ฌ๋ฐ๋ฅธ ๋ฐฉ๋ฒ์ ๋ชจ๋ ๋ํ๋ ๊ฒ์ ๋๋ค.
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- ํ๋ฌ๊ทธ์ธ์์ ๊ณต๊ฐ ์ ์ฅ์์ ๋ฃ๊ณ ์คํ ์์ค์ ์ ๊ณตํ์ญ์์ค.
์ด ๋ชจ๋ ๊ฒ์ ์ฐ๊ตฌํ๋ ๋์ ๋ณด๊ณ ์ ์ ๋ฐ์ดํธ๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์คํจํ๊ณ ์ฑ๋์์ ์ค๋ฅ ๋ฉ์์ง๋ฅผ ๋ณด๋์ต๋๋ค. ์๋ชป๋๊ฑด ์๋์ง ํ์ธํด๋ด์ผ๊ฒ ๋ค์...
์ฐ๋ฆฌ ๊ฐ์์ ๋ญ๊ฐ ๊ณ ์ฅ๋ฌ์ต๋๋ค! ๊ทธ๊ฒ์ด ์ฐ๋ฆฌ๊ฐ ๊ธฐ๋ํ๋ ๊ฒ์ด ์๋๋๊น? ์ ํํ!
๋ฟ๋ฆด๊น?
๋ด๊ฐ ๋ญ๊ฐ๋ฅผ ๋์น ๊ฒ ๊ฐ๋? ๊ทธ๋ SQL Server์์ Vertica๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๊ฒ ๋ค๊ณ ์ฝ์ํ ๋ค์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ ์ ๋น์ด๋ผ๋ ์ฃผ์ ์์ ๋ฒ์ด๋ฌ์ต๋๋ค!
์ด ์ํ ํ์๋ ์๋์ ์ด์์ต๋๋ค. ๋๋ ๋น์ ์ ์ํด ๋ช ๊ฐ์ง ์ฉ์ด๋ฅผ ํด๋ ํด์ผ ํ์ต๋๋ค. ์ด์ ๋ ๋ฉ๋ฆฌ ๊ฐ ์ ์์ต๋๋ค.
์ฐ๋ฆฌ์ ๊ณํ์ ์ด๋ฌ์ต๋๋ค.
- ๋๊ทธ๋ฅผ ํ๋ค
- ์์ ์์ฑ
- ๋ชจ๋ ๊ฒ์ด ์ผ๋ง๋ ์๋ฆ๋ค์ด์ง๋ณด์ญ์์ค
- ์ฑ์ฐ๊ธฐ์ ์ธ์ ๋ฒํธ ํ ๋น
- SQL Server์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ
- Vertica์ ๋ฐ์ดํฐ ๋ฃ๊ธฐ
- ํต๊ณ ์์ง
๊ทธ๋์ ์ด ๋ชจ๋ ๊ฒ์ ์์ํ๊ณ ์คํํ๊ธฐ ์ํด ์ ๋ ์ฐ๋ฆฌ docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
๊ฑฐ๊ธฐ์์ ์ฐ๋ฆฌ๋ ๋ค์์ ์ ๊ธฐํฉ๋๋ค.
- Vertica๋ฅผ ํธ์คํธ๋ก
dwh
๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ์ค์ ์ผ๋ก - ์ธ ๊ฐ์ SQL Server ์ธ์คํด์ค,
- ์ฐ๋ฆฌ๋ ํ์์ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ์ผ๋ถ ๋ฐ์ดํฐ๋ก ์ฑ์๋๋ค(์ด๋ ํ ๊ฒฝ์ฐ์๋
mssql_init.py
!)
์ฐ๋ฆฌ๋ ์ง๋ ๋ฒ๋ณด๋ค ์ฝ๊ฐ ๋ ๋ณต์กํ ๋ช ๋ น์ ๋์์ผ๋ก ๋ชจ๋ ์ข์ ๊ฒ์ ์์ํฉ๋๋ค.
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
์ฐ๋ฆฌ์ Miracle Randomizer๊ฐ ์์ฑํ ํญ๋ชฉ์ ์ฌ์ฉํ ์ ์์ต๋๋ค. Data Profiling/Ad Hoc Query
:
๊ฐ์ฅ ์ค์ํ ๊ฒ์ ๋ถ์๊ฐ์๊ฒ ๋ณด์ฌ์ฃผ์ง ์๋ ๊ฒ์
๋๋ค.
~์ ๋ํด ์์ธํ๊ฒ ๋งํ๋ค ETL ์ธ์ ๊ฑฐ๊ธฐ์๋ ๋ชจ๋ ๊ฒ์ด ์ฌ์ํฉ๋๋ค. ๋ฒ ์ด์ค๋ฅผ ๋ง๋ค๊ณ ๊ทธ ์์ ์ฌ์ธ์ด ์๊ณ ์ปจํ ์คํธ ๊ด๋ฆฌ์๋ก ๋ชจ๋ ๊ฒ์ ๋ํํ๊ณ ์ด์ ๋ค์์ ์ํํฉ๋๋ค.
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
๋๊ฐ ์๋ค ๋ฐ์ดํฐ ์์ง ์ฐ๋ฆฌ์ XNUMX๊ฐ ํ ์ด๋ธ์์. ๋งค์ฐ ์๋ฐํ ๋ผ์ธ์ ๋์์ผ๋ก ์ด๊ฒ์ ํด๋ณด์:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Airflow์์ ์ป์ ํํฌ์ ๋์์ผ๋ก
pymssql
-์ฐ๊ฒฐํ๋ค - ๋ ์ง ํ์์ ์ ํ์ ์์ฒญ์ผ๋ก ๋์ฒดํด ๋ณด๊ฒ ์ต๋๋ค. ์ ํ์ ํ ํ๋ฆฟ ์์ง์ ์ํด ํจ์์ ๋์ ธ์ง๋๋ค.
- ์ฐ๋ฆฌ์ ์์ฒญ์ ๊ณต๊ธ
pandas
๋๊ฐ ์ฐ๋ฆฌ๋ฅผ ์ป์ ๊ฒ์ธ๊ฐDataFrame
- ์์ผ๋ก ์ฐ๋ฆฌ์๊ฒ ์ ์ฉํ ๊ฒ์ ๋๋ค.
๋๋ ๋์ฒด๋ฅผ ์ฌ์ฉํ๊ณ ์์ต๋๋ค
{dt}
์์ฒญ ๋งค๊ฐ๋ณ์ ๋์%s
๋ด๊ฐ ์ฌ์ ํ ํผ๋ ธํค์ค๋ผ์๊ฐ ์๋๋ผpandas
๋์ฒํ ์ ์๋ค.pymssql
๊ทธ๋ฆฌ๊ณ ๋ง์ง๋ง์ ์ฌ์ฉparams: List
๊ทธ๊ฐ ์ ๋ง๋ก ์ํ์ง๋งtuple
.
๊ฐ๋ฐ์๋ ์ฐธ๊ณ ํ์ธ์pymssql
๋ ์ด์ ๊ทธ๋ฅผ ์ง์ํ์ง ์๊ธฐ๋ก ๊ฒฐ์ ํ๊ณ ์ด์ ์ด์ฌ๋ฅผ ๊ฐ์ผ ํ ๋์ ๋๋คpyodbc
.
Airflow๊ฐ ํจ์์ ์ธ์๋ฅผ ๋ฌด์์ผ๋ก ์ฑ์ ๋์ง ์ดํด๋ณด๊ฒ ์ต๋๋ค.
๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด ๊ณ์ํ ํ์๊ฐ ์์ต๋๋ค. ๊ทธ๋ฌ๋ ์ฑ์ฐ๊ธฐ๊ฐ ์ฑ๊ณต์ ์ด๋ผ๊ณ ์๊ฐํ๋ ๊ฒ๋ ์ด์ํฉ๋๋ค. ๊ทธ๋ฌ๋ ์ด๊ฒ์ ์ค์๊ฐ ์๋๋๋ค. ์์์, ์ด๋กํด?! ๋ค์์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
Airflow์ ์ค๋ฅ๊ฐ ์๋ค๊ณ ์๋ ค์ฃผ์ง๋ง ์์
์ ๊ฑด๋๋๋๋ค. ์ธํฐํ์ด์ค์๋ ๋
น์ ๋๋ ๋นจ๊ฐ์ ์ฌ๊ฐํ์ด ์๋๋ผ ๋ถํ์์ด ์์ต๋๋ค.
๋ฐ์ดํฐ๋ฅผ ๋์ง์ ์ฌ๋ฌ ์ด:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
์ฆ
- ์ฐ๋ฆฌ๊ฐ ์ฃผ๋ฌธ์ ๋ฐ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค,
- ํ๋ฌ๋ฉ ์ธ์ ์ ID(๋ค๋ฅผ ๊ฒ์ ๋๋ค. ๋ชจ๋ ์์ ์ ๋ํด),
- ์์ค ๋ฐ ์ฃผ๋ฌธ ID์ ํด์ - ์ต์ข ๋ฐ์ดํฐ๋ฒ ์ด์ค(๋ชจ๋ ๊ฒ์ด ํ๋์ ํ ์ด๋ธ์ ์์์ง๋ ๊ณณ)์์ ๊ณ ์ ํ ์ฃผ๋ฌธ ID๋ฅผ ๊ฐ๊ฒ ๋ฉ๋๋ค.
๋ง์ง๋ง ๋ ๋ฒ์งธ ๋จ๊ณ๊ฐ ๋จ์ ์์ต๋๋ค. ๋ชจ๋ ๊ฒ์ Vertica์ ๋ถ์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ด์ํ๊ฒ๋ ์ด๋ฅผ ์ํํ๋ ๊ฐ์ฅ ํ๋ฅญํ๊ณ ํจ์จ์ ์ธ ๋ฐฉ๋ฒ ์ค ํ๋๋ CSV๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๋๋ค!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- ์ฐ๋ฆฌ๋ ํน๋ณํ ์์ ๊ธฐ๋ฅผ ๋ง๋ค๊ณ ์์ต๋๋ค
StringIO
. pandas
์น์ ํ๊ฒ ์ฐ๋ฆฌ์DataFrame
์ ํํ๋กCSV
-์ค๊ณฝ.- ํํฌ๋ก ์ฐ๋ฆฌ๊ฐ ๊ฐ์ฅ ์ข์ํ๋ Vertica์ ๋ํ ์ฐ๊ฒฐ์ ์ด์ด ๋ด ์๋ค.
- ์ด์ ๋์์ ๋ฐ์
copy()
๋ฐ์ดํฐ๋ฅผ Vertika๋ก ์ง์ ๋ณด๋ด์ญ์์ค!
์ฐ๋ฆฌ๋ ๋๋ผ์ด๋ฒ์์ ์ผ๋ง๋ ๋ง์ ๋ผ์ธ์ด ์ฑ์์ก๋์ง ๊ฐ์ ธ์ค๊ณ ์ธ์ ๊ด๋ฆฌ์์๊ฒ ๋ชจ๋ ๊ฒ์ด ์ ์์ด๋ผ๊ณ ์๋ฆด ๊ฒ์ ๋๋ค.
session.loaded_rows = cursor.rowcount
session.successful = True
๊ทธ๊ฒ ๋ค์ผ.
ํ๋งค์ ํ๊ฒ ํ๋ ์ดํธ๋ฅผ ์๋์ผ๋ก ์์ฑํฉ๋๋ค. ์ฌ๊ธฐ์์ ๋๋ ์์ ๊ธฐ๊ณ๋ฅผ ํ์ฉํ์ต๋๋ค.
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
๋ด๊ฐ ์ฌ์ฉํ๊ณ
VerticaOperator()
๋ฐ์ดํฐ๋ฒ ์ด์ค ์คํค๋ง์ ํ ์ด๋ธ์ ์์ฑํฉ๋๋ค(๋ฌผ๋ก ์์ง ์กด์ฌํ์ง ์๋ ๊ฒฝ์ฐ). ๊ฐ์ฅ ์ค์ํ ๊ฒ์ ์ข ์์ฑ์ ์ฌ๋ฐ๋ฅด๊ฒ ์ ๋ ฌํ๋ ๊ฒ์ ๋๋ค.
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
ํฉ์ฐ
-๊ธ์,-์์ ์ฅ๊ฐ ๋งํ๋ค.-๊ทธ๋ ์ง, ์ง๊ธ
๋ด๊ฐ ์ฒ์์ ๊ฐ์ฅ ๋์ฐํ ๋๋ฌผ์ด๋ผ๊ณ ํ์ ํฉ๋๊น?
์ค๋ฆฌ์ ๋๋ ๋์จ, Gruffalo
๋๋ฃ๋ค๊ณผ ๊ฒฝ์์ด ์์๋ค๋ฉด ์ฒ์๋ถํฐ ETL ํ๋ก์ธ์ค๋ฅผ ์ ์ํ๊ฒ ์์ฑํ๊ณ ์์ํ ์ฌ๋ : ๊ทธ๋ค์ SSIS์ ๋ง์ฐ์ค๋ฅผ ์ฌ์ฉํ๊ณ Airflow๋ฅผ ์ฌ์ฉํ๋ฉด ... ๊ทธ๋ฆฌ๊ณ ์ ์ง ๊ด๋ฆฌ์ ์ฉ์ด์ฑ์ ๋น๊ตํ ๊ฒ์ ๋๋ค ... ์์ฐ, ๋ด๊ฐ ๋ชจ๋ ๋ฉด์์ ๊ทธ๋ค์ ์ด๊ธธ ๊ฒ์ด๋ผ๋ ๋ฐ ๋์ํ์ค ๊ฒ ๊ฐ์์!
์กฐ๊ธ ๋ ์ง์งํ๊ฒ ๋งํ์๋ฉด, ํ๋ก๊ทธ๋จ ์ฝ๋์ ํํ๋ก ํ๋ก์ธ์ค๋ฅผ ์ค๋ช ํจ์ผ๋ก์จ Apache Airflow๊ฐ ์ ์ผ์ ํ์ต๋๋ค. ๋ง์ด ๋ ํธํ๊ณ ์ฆ๊ฒ๊ฒ.
ํ๋ฌ๊ทธ์ธ ๋ฐ ํ์ฅ์ฑ ์ธก๋ฉด์์ ๋ฌด์ ํ ํ์ฅ์ฑ์ ๊ฑฐ์ ๋ชจ๋ ์์ญ์์ Airflow๋ฅผ ์ฌ์ฉํ ์ ์๋ ๊ธฐํ๋ฅผ ์ ๊ณตํฉ๋๋ค. ๋ฐ์ดํฐ ์์ง, ์ค๋น ๋ฐ ์ฒ๋ฆฌ์ ์ ์ฒด ์ฃผ๊ธฐ, ๋ก์ผ ๋ฐ์ฌ(ํ์ฑ, ๊ฐ์).
ํํธ ์ต์ข , ์ฐธ์กฐ ๋ฐ ์ ๋ณด
์ฐ๋ฆฌ๊ฐ ๋น์ ์ ์ํด ๋ชจ์ ๊ฐํด
start_date
. ์, ์ด๊ฒ์ ์ด๋ฏธ ์ง์ญ ๋ฐ์ ๋๋ค. Doug์ ์ฃผ์ ์ฃผ์ฅ์ ํตํดstart_date
๋ชจ๋ ํต๊ณผ ํจ. ๊ฐ๋จํ ๋งํด์, ๋น์ ์ด ์ง์ ํ๋ ๊ฒฝ์ฐstart_date
ํ์ฌ ๋ ์ง ๋ฐschedule_interval
- ์ธ์ ๊ฐ๋ DAG๊ฐ ๋ด์ผ ๋ ์ผ์ฐ ์์๋์ง ์์ต๋๋ค.start_date = datetime(2020, 7, 7, 0, 1, 2)
๊ทธ๋ฆฌ๊ณ ๋ ์ด์ ๋ฌธ์ ๊ฐ ์์ต๋๋ค.
์ด์ ๊ด๋ จ๋ ๋ ๋ค๋ฅธ ๋ฐํ์ ์ค๋ฅ๊ฐ ์์ต๋๋ค.
Task is missing the start_date parameter
, ์ด๊ฒ์ dag ์ฐ์ฐ์์ ๋ฐ์ธ๋ฉํ๋ ๊ฒ์ ์์๋ค๋ ๊ฒ์ ๊ฐ์ฅ ์์ฃผ ๋ํ๋ ๋๋ค.- ๋ชจ๋ ํ๋์ ๊ธฐ๊ณ์ ์์ต๋๋ค. ์, ๋ฒ ์ด์ค(Airflow ์์ฒด ๋ฐ ์ฝํ ), ์น ์๋ฒ, ์ค์ผ์ค๋ฌ ๋ฐ ์์ ์. ๊ทธ๋ฆฌ๊ณ ๊ทธ๊ฒ์ ํจ๊ณผ๊ฐ ์์์ต๋๋ค. ๊ทธ๋ฌ๋ ์๊ฐ์ด ์ง๋จ์ ๋ฐ๋ผ ์๋น์ค์ ๋ํ ์์ ์๊ฐ ์ฆ๊ฐํ๊ณ PostgreSQL์ด 20ms๊ฐ ์๋ 5s์์ ์ธ๋ฑ์ค์ ์๋ตํ๊ธฐ ์์ํ์ ๋ ์ฐ๋ฆฌ๋ ๊ทธ๊ฒ์ ๊ฐ์ ธ ์์ ๊ฐ์ ธ๊ฐ์ต๋๋ค.
- ๋ก์ปฌ ์คํ์. ์, ์ฐ๋ฆฌ๋ ์ฌ์ ํ ๊ทธ ์์ ์์ ์๊ณ ์ด๋ฏธ ์ฌ์ฐ์ ๊ฐ์ฅ์๋ฆฌ์ ์์ต๋๋ค. ์ง๊ธ๊น์ง๋ LocalExecutor๋ก๋ ์ถฉ๋ถํ์ง๋ง ์ด์ ์ ์ด๋ ํ ๋ช ์ ์์ ์๋ก ํ์ฅํด์ผ ํ ๋์ด๋ฉฐ CeleryExecutor๋ก ์ด๋ํ๊ธฐ ์ํด ์ด์ฌํ ๋ ธ๋ ฅํด์ผ ํ ๊ฒ์ ๋๋ค. ๊ทธ๋ฆฌ๊ณ ํ๋์ ์์คํ ์์ ์์ ํ ์ ์๋ค๋ ์ฌ์ค์ ๊ณ ๋ คํ ๋ ์๋ฒ์์๋ ์ ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๋ง์ ์ ์๋ ๊ฒ์ ์์ต๋๋ค.
- ๋ฏธ์ฌ์ฉ ๋ด์ฅ ๋๊ตฌ:
- ์ฐ๊ฒฐ ์๋น์ค ์๊ฒฉ ์ฆ๋ช ์ ์ ์ฅํ๊ธฐ ์ํด
- SLA ๋๋ฝ ์ ์๊ฐ์ ํด๊ฒฐ๋์ง ์์ ์์ ์ ์๋ตํ๊ธฐ ์ํด
- ์์ค์ปด ๋ฉํ ๋ฐ์ดํฐ ๊ตํ์ ์ํด (๋ด๊ฐ ๋งํ๋ค ๋ฉํ๋ฐ์ดํฐ!) dag ์์ ์ฌ์ด.
- ๋ฉ์ผ ๋จ์ฉ. ๊ธ์, ๋ด๊ฐ ๋ญ๋ผ๊ณ ๋งํ ์ ์๋? ๋์ด์ง ์์ ์ ๋ชจ๋ ๋ฐ๋ณต์ ๋ํด ๊ฒฝ๊ณ ๊ฐ ์ค์ ๋์์ต๋๋ค. ์ด์ ๋ด ์์ Gmail์๋ Airflow์์ ๋ณด๋ธ 90๊ฐ ์ด์์ ์ด๋ฉ์ผ์ด ์๊ณ ์น ๋ฉ์ผ ์ด๊ตฌ๋ ํ ๋ฒ์ 100๊ฐ ์ด์์ ์ ํํ๊ณ ์ญ์ ํ๋ ๊ฒ์ ๊ฑฐ๋ถํฉ๋๋ค.
๋ ๋ง์ ํจ์ :
Apache Airflow ํจ์
๋ ๋ง์ ์๋ํ ๋๊ตฌ
์ฐ๋ฆฌ๊ฐ ์์ด ์๋ ๋จธ๋ฆฌ๋ก ๋ ๋ง์ด ์ผํ ์ ์๋๋ก Airflow๋ ๋ค์๊ณผ ๊ฐ์ด ์ค๋นํ์ต๋๋ค.
REST API - ๊ทธ๋ ์ฌ์ ํ ์์ ์ ๋ฐฉํดํ์ง ์๋ ์คํ์ ์ํ๋ฅผ ์ ์งํฉ๋๋ค. ์ด๋ฅผ ํตํด dag ๋ฐ ์์ ์ ๋ํ ์ ๋ณด๋ฅผ ์ป์ ์ ์์ ๋ฟ๋ง ์๋๋ผ dag๋ฅผ ์ค์ง/์์ํ๊ณ DAG ์คํ ๋๋ ํ์ ๋ง๋ค ์ ์์ต๋๋ค.CLI - WebUI๋ฅผ ํตํด ์ฌ์ฉํ๊ธฐ ๋ถํธํ ๋ฟ๋ง ์๋๋ผ ์ผ๋ฐ์ ์ผ๋ก ์๋ ๋ง์ ๋๊ตฌ๋ฅผ ๋ช ๋ น์ค์ ํตํด ์ฌ์ฉํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด:backfill
์์ ์ธ์คํด์ค๋ฅผ ๋ค์ ์์ํ๋ ๋ฐ ํ์ํฉ๋๋ค.
์๋ฅผ ๋ค์ด, ๋ถ์๊ฐ๋ค์ด ์์ ์ด๋ ๊ฒ ๋งํ์ต๋๋ค. ๊ณ ์ณ๋ผ, ๊ณ ์ณ๋ผ, ๊ณ ์ณ๋ผ, ๊ณ ์ณ๋ผ!" ๊ทธ๋ฆฌ๊ณ ๋น์ ์ ๊ทธ๋ฐ ํธ๋ธ์ ๋๋ค.airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- ๊ธฐ๋ณธ ์๋น์ค:
initdb
,resetdb
,upgradedb
,checkdb
. run
, ํ๋์ ์ธ์คํด์ค ์์ ์ ์คํํ๊ณ ๋ชจ๋ ์ข ์์ฑ์ ๋ํด ์ ์๋ฅผ ๋งค๊ธธ ์ ์์ต๋๋ค. ๋ํ ๋ค์์ ํตํด ์คํํ ์ ์์ต๋๋ค.LocalExecutor
, ์ ๋ฌ๋ฆฌ ํด๋ฌ์คํฐ๊ฐ ์๋ ๊ฒฝ์ฐ์๋ ๋ง์ฐฌ๊ฐ์ง์ ๋๋ค.- ๊ฑฐ์ ๊ฐ์ ์ผ์ ํ๋ค
test
, ๊ธฐ์ง์์๋ง ์๋ฌด๊ฒ๋ ์ฐ์ง ์์ต๋๋ค. connections
์ ธ์์ ์ฐ๊ฒฐ์ ๋๋์ผ๋ก ์์ฑํ ์ ์์ต๋๋ค.
ํ์ด์ฌ API -ํ๋ฌ๊ทธ์ธ์ ์ํ ๋ค์ ํ๋์ฝ์ดํ ์ํธ ์์ฉ ๋ฐฉ์์ด๋ฉฐ ์์ ์์ผ๋ก ๋ฌด๋ฆฌ๋ฅผ ์ง์ง ์์ต๋๋ค. ํ์ง๋ง ๋๊ฐ ์ฐ๋ฆฌ๊ฐ ๊ฐ๋ ๊ฑธ ๋ง๊ฒ ์ด/home/airflow/dags
, ๋ฌ๋ฆฌ๋คipython
๊ทธ๋ฆฌ๊ณ ์ฅ๋์ ์์? ์๋ฅผ ๋ค์ด ๋ค์ ์ฝ๋๋ฅผ ์ฌ์ฉํ์ฌ ๋ชจ๋ ์ฐ๊ฒฐ์ ๋ด๋ณด๋ผ ์ ์์ต๋๋ค.from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- Airflow ๋ฉํ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ ์ค์
๋๋ค. ์ฌ๊ธฐ์ ์์ฑํ๋ ๊ฒ์ ๊ถ์ฅํ์ง ์์ง๋ง ๋ค์ํ ํน์ ๋ฉํธ๋ฆญ์ ๋ํ ์์
์ํ๋ฅผ ๊ฐ์ ธ์ค๋ ๊ฒ์ด API๋ฅผ ํตํ๋ ๊ฒ๋ณด๋ค ํจ์ฌ ๋น ๋ฅด๊ณ ์ฌ์ธ ์ ์์ต๋๋ค.
๋ชจ๋ ์์ ์ด idempotent๋ ์๋์ง๋ง ๋๋๋ก ๋จ์ด์ง ์ ์์ผ๋ฉฐ ์ด๋ ์ ์์ ๋๋ค. ๊ทธ๋ฌ๋ ๋ช ๊ฐ์ง ๋งํ์ ์ด๋ฏธ ์์ฌ์ค๋ฝ๊ณ ํ์ธ์ด ํ์ํฉ๋๋ค.
SQL์ ์ฃผ์ํ์ธ์!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
์ฐธ์กฐ
๋ฌผ๋ก Google ๋ฐํ์ ์ฒ์ XNUMX๊ฐ ๋งํฌ๋ ๋ด ์ฑ ๊ฐํผ์ Airflow ํด๋ ๋ด์ฉ์ ๋๋ค.
Apache Airflow ์ค๋ช ์ - ๋ฌผ๋ก ์ฌ๋ฌด์ค๋ถํฐ ์์ํด์ผ ํฉ๋๋ค. ์ค๋ช ์๋ ์์ง๋ง ์ง์นจ์ ์ฝ๋ ์ฌ๋์ ๋๊ตฌ์ ๋๊น?๋ชจ๋ฒ ์ฌ๋ก -๊ธ์์, ์ต์ํ ์ ์์์ ๊ถ์ฅ ์ฌํญ์ ์ฝ์ผ์ญ์์ค.๊ธฐ๋ฅ UI - ๋งจ ์ฒ์: ์ฌ์ง์ ์ฌ์ฉ์ ์ธํฐํ์ด์คApache Airflow์ ์ฃผ์ ๊ฐ๋ ์ดํด -๊ธฐ๋ณธ ๊ฐ๋ ์ด ์ ์ค๋ช ๋์ด ์์ต๋๋ค. ๋ง์ฝ (๊ฐ์๊ธฐ!) ์ ๋ง์ ์ดํดํ์ง ๋ชปํ์ จ๋ค๋ฉด์.Tianlong์ ๋ธ๋ก๊ทธ โ Airflow ์๋ฒ/ํด๋ฌ์คํฐ ๊ตฌ์ถ ๋ฐฉ๋ฒ์ ๋ํ ๊ฐ์ด๋ - Airflow ํด๋ฌ์คํฐ ์ค์ ์ ์ํ ๊ฐ๋จํ ๊ฐ์ด๋์ ๋๋ค.Lyft์์ Apache Airflow ์คํ - ์๋ง๋ ๋ ๋ง์ ํ์์ฃผ์์ ๋ ์ ์ ์๋ฅผ ์ ์ธํ๊ณ ๋ ๊ฑฐ์ ๋์ผํ ํฅ๋ฏธ๋ก์ด ๊ธฐ์ฌ์ ๋๋ค.Apache Airflow๊ฐ Celery ์์ ์์ ์์ ์ ๋ฐฐํฌํ๋ ๋ฐฉ๋ฒ โ ์ ๋ฌ๋ฆฌ์ ํจ๊ป ์ผํ๋ ๊ฒ์ ๋ํด.Apache Airflow์ DAG ์์ฑ ๋ชจ๋ฒ ์ฌ๋ก - ์์ ์ ๋ฉฑ๋ฑ์ฑ, ๋ ์ง ๋์ ID๋ก ๋ก๋, ๋ณํ, ํ์ผ ๊ตฌ์กฐ ๋ฐ ๊ธฐํ ํฅ๋ฏธ๋ก์ด ์ฌํญ์ ๋ํด.Apache Airflow์์ ์ข ์์ฑ ๊ด๋ฆฌ - ์ง๋๊ฐ ๋๋ง ์ธ๊ธํ ์์ ๋ฐ ํธ๋ฆฌ๊ฑฐ ๊ท์น์ ์ข ์์ฑ.๊ณต๊ธฐ ํ๋ฆ: DAG๊ฐ ์ผ์ ๋ณด๋ค ํจ์ฌ ๋ค์ฒ์ง๋ ๊ฒฝ์ฐ - ์ค์ผ์ค๋ฌ์์ ์ผ๋ถ "์๋ํ ๋๋ก ์๋"์ ๊ทน๋ณตํ๊ณ ์์ค๋ ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๊ณ ์์ ์ ์ฐ์ ์์๋ฅผ ์ง์ ํ๋ ๋ฐฉ๋ฒ.Apache Airflow์ ์ ์ฉํ SQL ์ฟผ๋ฆฌ โ Airflow ๋ฉํ๋ฐ์ดํฐ์ ๋ํ ์ ์ฉํ SQL ์ฟผ๋ฆฌ.Apache Airflow๋ก ์ํฌํ๋ก ๊ฐ๋ฐ ์์ํ๊ธฐ - ๋ง์ถคํ ์ผ์ ์์ฑ์ ๋ํ ์ ์ฉํ ์น์ ์ด ์์ต๋๋ค.Presto ๋ฐ Airflow๋ฅผ ์ฌ์ฉํ์ฌ AWS์์ Fetchr ๋ฐ์ดํฐ ๊ณผํ ์ธํ๋ผ ๊ตฌ์ถ โ ๋ฐ์ดํฐ ๊ณผํ์ ์ํด AWS์์ ์ธํ๋ผ๋ฅผ ๊ตฌ์ถํ๋ ๊ฒ์ ๋ํ ํฅ๋ฏธ๋ก์ด ์งง์ ๋ฉ๋ชจ์ ๋๋ค.Airflow DAG๋ฅผ ๋๋ฒ๊น ํ ๋ ํ์ธํด์ผ ํ 7๊ฐ์ง ์ผ๋ฐ์ ์ธ ์ค๋ฅ - ์ผ๋ฐ์ ์ธ ์ค์(๋๊ตฐ๊ฐ ์ฌ์ ํ ์ง์นจ์ ์ฝ์ง ์๋ ๊ฒฝ์ฐ).Apache Airflow๋ฅผ ์ฌ์ฉํ์ฌ ๋น๋ฐ๋ฒํธ ์ ์ฅ ๋ฐ ์ก์ธ์ค - Connections๋ง ์ฌ์ฉํ ์ ์์ง๋ง ์ฌ๋๋ค์ด ๋น๋ฐ๋ฒํธ๋ฅผ ์ ์ฅํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์ผ์ธ์.Zen of Python๊ณผ Apache Airflow - ์์์ DAG ํฌ์๋ฉ, ํจ์์์ ์ปจํ ์คํธ ๋์ง๊ธฐ, ๋ค์ ์ข ์์ฑ์ ๋ํด, ์์ ์คํ ๊ฑด๋๋ฐ๊ธฐ์ ๋ํด.๊ธฐ๋ฅ: ์ ์๋ ค์ง์ง ์์ ํ, ์๋ น ๋ฐ ๋ชจ๋ฒ ์ฌ๋ก - ์ด์ฉ์ ๊ดํ์ฌdefault arguments
ะธparams
ํ ํ๋ฆฟ, ๋ณ์ ๋ฐ ์ฐ๊ฒฐ์์.Airflow ์ค์ผ์ค๋ฌ ํ๋กํ์ผ๋ง -๊ธฐํ์๊ฐ ์์ดํ๋ก์ฐ 2.0์ ์ด๋ป๊ฒ ์ค๋นํ๊ณ ์๋์ง์ ๋ํ ์ด์ผ๊ธฐ.docker-compose์์ 3๊ฐ์ Celery ์์ ์๊ฐ ์๋ Apache Airflow - ํด๋ฌ์คํฐ ๋ฐฐํฌ์ ๋ํ ์ฝ๊ฐ ์ค๋๋ ๊ธฐ์ฌdocker-compose
.4 Airflow ์ปจํ ์คํธ๋ฅผ ์ฌ์ฉํ ํ ํ๋ฆฟ ์์ - ํ ํ๋ฆฟ ๋ฐ ์ปจํ ์คํธ ์ ๋ฌ์ ์ฌ์ฉํ๋ ๋์ ์์ .Airflow์ ์ค๋ฅ ์๋ฆผ โ ๋ฉ์ผ ๋ฐ Slack์ ํตํ ํ์ค ๋ฐ ์ฌ์ฉ์ ์ง์ ์๋ฆผ.๊ธฐ๋ฅ ์ํฌ์: ๋ชฉ๋ฐ์ด ์๋ ๋ณต์กํ DAG - ๋ถ๊ธฐ ์์ , ๋งคํฌ๋ก ๋ฐ XCom.
๊ทธ๋ฆฌ๊ณ ๊ธฐ์ฌ์ ์ฌ์ฉ๋ ๋งํฌ:
๋งคํฌ๋ก ์ฐธ์กฐ - ํ ํ๋ฆฟ์์ ์ฌ์ฉํ ์ ์๋ ์๋ฆฌ ํ์์.์ผ๋ฐ์ ์ธ ํจ์ - ๊ณต๊ธฐ ํ๋ฆ โ dag๋ฅผ ๋ง๋ค ๋ ํํ ์ ์ง๋ฅด๋ ์ค์.puckel/docker-airflow: ๋์ปค ์ํ์น ๊ณต๊ธฐ ํ๋ฆ -docker-compose
์คํ, ๋๋ฒ๊น ๋ฑ์ ์ํด.python-telegram-bot/python-telegram-bot: ๊ฑฐ๋ถํ ์ ์๋ ๋ํผ๋ฅผ ๋ง๋ค์์ต๋๋ค. โ Telegram REST API์ฉ Python ๋ํผ.
์ถ์ฒ : habr.com