ããã«ã¡ã¯ãç§ã¯ããããªãŒ ãã°ãŽã£ãã³ã³ã§ããVezet ã°ã«ãŒãäŒæ¥ã®åæéšéã®ããŒã¿ ãšã³ãžãã¢ã§ãã
ETL ããã»ã¹ãéçºããããã®çŽ æŽãããããŒã«ã§ãã Apache Airflow ã«ã€ããŠèª¬æããŸãã ãã ããAirflow ã¯éåžžã«å€æ©èœã§å€é¢çã§ãããããããŒã¿ ãããŒã«é¢äžããŠããªãããå®æçã«ããã»ã¹ãèµ·åããŠãã®å®è¡ãç£èŠããå¿ èŠãããå Žåã§ããAirflow ã詳ããæ€èšããå¿ èŠããããŸãã
ãããŠãã¯ããç§ã¯äŒããã ãã§ãªããããã°ã©ã ã«ãå€ãã®ã³ãŒããã¹ã¯ãªãŒã³ã·ã§ãããæšå¥šäºé ãå«ãŸããŠããããšã瀺ããŸãã
Airflow ãšããåèªã Google ã§æ€çŽ¢ãããšãã衚瀺ããããã® / ãŠã£ãã¡ãã£ã¢ ã³ã¢ã³ãº
ç®æ¬¡
å°å ¥ äž»èŠéšåãå®è·µçïŒãããŠå°ãçè«çïŒ æçµããŒããåèè³æããã³æ å ± ãªãã¡ã¬ã³ã¹
å°å ¥
Apache Airflow 㯠Django ã«äŒŒãŠããŸãã
- Pythonã§æžããã
- çŽ æŽããã管çããã«ãããã
- ç¡éã«æ¡å€§ãã
- ããè¯ãã ãã§ããããŸã£ããç°ãªãç®çã®ããã«äœæãããŸãããã€ãŸãã(kat ã®åã«æžãããŠããããã«):
- ç¡å¶éã®æ°ã®ãã·ã³ã§ã¿ã¹ã¯ãå®è¡ããã³ç£èŠ (Celery / Kubernetes ãšè¯å¿ãèš±ãéã)
- éåžžã«æžããããç解ãããã Python ã³ãŒãããã®åçãªã¯ãŒã¯ãããŒçæã䜿çš
- ãããŠãæ¢è£œã®ã³ã³ããŒãã³ããšèªå®¶è£œãã©ã°ã€ã³ã®äž¡æ¹ã䜿çšããŠãããŒã¿ããŒã¹ãš API ãçžäºã«æ¥ç¶ããæ©èœ (ããã¯éåžžã«ç°¡åã§ã)ã
Apache Airflow ã次ã®ããã«äœ¿çšããŸãã
- åœç€Ÿã¯ãDWH ãš ODS (Vertica ãš Clickhouse ããããŸã) ã®ããŸããŸãªãœãŒã¹ (å€ãã® SQL Server ããã³ PostgreSQL ã€ã³ã¹ã¿ã³ã¹ãã¢ããªã±ãŒã·ã§ã³ ã¡ããªã¯ã¹ãåããããŸããŸãª APIã1C ãå«ã) ããããŒã¿ãåéããŸãã
- ã©ã®ãããé²ãã§ããã®ã
cron
ãODS äžã§ããŒã¿çµ±åããã»ã¹ãéå§ãããã®ã¡ã³ããã³ã¹ãç£èŠããŸãã
æè¿ãŸã§ãç§ãã¡ã®ããŒãºã¯ 32 ã³ã¢ãš 50 GB ã® RAM ãåãã XNUMX å°ã®å°åãµãŒããŒã§ã«ããŒãããŠããŸããã Airflow ã§ã¯ãããã¯æ¬¡ã®ããã«æ©èœããŸãã
- бПлее 200ãã° (å®éã«ã¯ã¿ã¹ã¯ãè©°ã蟌ãã ã¯ãŒã¯ãããŒ)ã
- ããããå¹³åã㊠70ã®ã¿ã¹ã¯,
- ãã®è¯ãã¯å§ãŸããŸãïŒå¹³åçã«ãïŒ XNUMXæéã«XNUMXå.
ãããŠãã©ã®ããã«æ¡åŒµãããã«ã€ããŠã¯ã以äžã«æžããŸãããããã§ã解決ããè¶ åé¡ãå®çŸ©ããŸãããã
ãªãªãžãã«ã® SQL Server ã 50 ã€ãããããããã« XNUMX ã®ããŒã¿ããŒã¹ (ãããã XNUMX ã€ã®ãããžã§ã¯ãã®ã€ã³ã¹ã¿ã³ã¹) ãããããããã¯åãæ§é (ã»ãŒã©ãã§ããã ã¢ããã) ãæã£ãŠããŸããã€ãŸããããããã« Orders ããŒãã« (幞ããªããšã«ããã®ããŒãã«ãå«ãŸããŠããŸã) ããããŸããååã¯ããããããžãã¹ã«ããã·ã¥ã§ããŸã)ã ãµãŒãã¹ ãã£ãŒã«ã (ãœãŒã¹ ãµãŒããŒããœãŒã¹ ããŒã¿ããŒã¹ãETL ã¿ã¹ã¯ ID) ãè¿œå ããŠããŒã¿ãååŸããããããããšãã° Vertica ã«åçŽã«æå ¥ããŸãã
è¡ããïŒ
äž»èŠéšåãå®è·µçïŒãããŠå°ãçè«çïŒ
ãªãç§ãã¡ïŒãããŠããªãïŒã¯
æšã倧ãããŠç§ãçŽ æŽã ã£ãé SQL
-schik ãã·ã¢ã®ããå°å£²åºã§ã¯ãå©çšå¯èœãª XNUMX ã€ã®ããŒã«ã䜿çšã㊠ETL ããã»ã¹ãå¥åããŒã¿ ãããŒãè©æ¬ºããŸããã
- ã€ã³ãã©ããã£ã« ãã¯ãŒ ã»ã³ã¿ãŒ - ç¬èªã®ããŒããŠã§ã¢ãšç¬èªã®ããŒãžã§ã³ç®¡çãåãããéåžžã«æ®åããéåžžã«çç£æ§ã®é«ãã·ã¹ãã ã ç¥ããã®èœåã®1%ãçŠããã®ã§äœ¿çšããŸããã ãªãïŒ ãŸã第äžã«ããã®ã€ã³ã¿ãŒãã§ã€ã¹ã¯ 380 幎代ã®ãã®ã§ãç§ãã¡ã«ç²Ÿç¥çãªãã¬ãã·ã£ãŒãäžããŸããã 第äºã«ããã®è£
眮ã¯éåžžã«è€éãªããã»ã¹ãã³ã³ããŒãã³ãã®ççãªåå©çšããã®ä»ã®éåžžã«éèŠãªäŒæ¥ã®ããªãã¯åãã«èšèšãããŠããŸãã ãšã¢ãã¹AXNUMXã®ç¿Œã®ããã«ã幎éè²»çšãããããšããäºå®ã«ã€ããŠã¯ãäœãèšããŸããã
ã¹ã¯ãªãŒã³ã·ã§ãã㯠30 æ³æªæºã®äººãå°ãå·ã€ããå¯èœæ§ãããã®ã§æ³šæããŠãã ãã
- SQL ãµãŒããŒçµ±åãµãŒã㌠- ãããžã§ã¯ãå
ãããŒã§ãã®ä»²éã䜿çšããŸããã å®éã®ãšãããç§ãã¡ã¯ãã§ã« SQL Server ã䜿çšããŠããããã® ETL ããŒã«ã䜿çšããªãã®ã¯ã©ããããããäžåçã§ãã ãã®å
容ã¯ãã¹ãŠè¯å¥œã§ããã€ã³ã¿ãŒãã§ãŒã¹ãçŸãããé²æã¬ããŒããåªããŠããŸãããããããããç§ãã¡ããœãããŠã§ã¢è£œåãæããçç±ã§ã¯ãããŸããããããããã®ããã§ã¯ãããŸããã ããŒãžã§ã³ã¢ãããã
dtsx
(ä¿åæã«ããŒããã·ã£ããã«ããã XML ã§ã) ã§ããŸãããäœãæå³ãããã®ã§ãããã? äœçŸãã®ããŒãã«ããããµãŒããŒããå¥ã®ãµãŒããŒã«ãã©ãã°ããã¿ã¹ã¯ ããã±ãŒãžãäœæããŠã¿ãŠã¯ã©ãã§ãããã? ã¯ããäœãš XNUMX åã§ããããŠã¹ã®ãã¿ã³ãã¯ãªãã¯ãããšã人差ãæã XNUMX åã®éšåããèœã¡ãŠããŸããŸãã ãããããã®ã»ãããã¡ãã·ã§ããã«ã«èŠããã®ã¯ééããããŸããã
ç§ãã¡ã¯ç¢ºãã«åºå£ãæ¢ããŠããŸããã å¶æ°ã®å Žå æ®ã© èªäœã® SSIS ããã±ãŒãž ãžã§ãã¬ãŒã¿ãŒã«ãã©ãçããŸãã ...
âŠãããŠãæ°ããä»äºãç§ãèŠã€ããŸããã ãããŠãApache Airflow ãç§ãè¿œãè¶ããŸããã
ETL ããã»ã¹ã®èšè¿°ãåçŽãª Python ã³ãŒãã§ãããšç¥ã£ããšããç§ã¯ãã åã³ã«èžããããŸããã§ããã ãã®ããã«ããŠãããŒã¿ ã¹ããªãŒã ã®ããŒãžã§ã³ç®¡çãšå·®åãè¡ãããæ°çŸã®ããŒã¿ããŒã¹ããåäžã®æ§é ãæã€ããŒãã«ã 13 ã€ã®ã¿ãŒã²ããã«æµã蟌ãããšããXNUMX ã€åãŸã㯠XNUMX ã€ã® XNUMX ã€ã³ãç»é¢å ã® Python ã³ãŒãã®åé¡ã«ãªããŸããã
ã¯ã©ã¹ã¿ãŒã®çµã¿ç«ãŠ
å®å šã«å¹Œçšåã®ãããªãã®ãæé ããããAirflowãéžæããããŒã¿ããŒã¹ãCeleryãããã¯ã§èª¬æãããŠãããã®ä»ã®ã±ãŒã¹ã®ã€ã³ã¹ããŒã«ãªã©ãå®å šã«æçœãªããšã«ã€ããŠããã§è©±ãã®ã¯ãããŸãããã
ããã«å®éšãå§ããããããã«ãã¹ã±ãããããŸãã docker-compose.yml
ãã®äžã§ïŒ
- å®éã«è²ãŠãŠã¿ãŸããã ãšã¢ãããŒ: ã¹ã±ãžã¥ãŒã©ãWeb ãµãŒããŒã Flower ãããã§å転ã㊠Celery ã¿ã¹ã¯ãç£èŠããŸã (ãã§ã«ããã·ã¥ãããŠãããã)
apache/airflow:1.10.10-python3.7
ãã§ãæ°ã«ããŸããïŒ - PostgreSQLããã«ãAirflow ã¯ãµãŒãã¹æ å ± (ã¹ã±ãžã¥ãŒã©ãŒ ããŒã¿ãå®è¡çµ±èšãªã©) ãæžã蟌ã¿ãCelery ã¯å®äºããã¿ã¹ã¯ãããŒã¯ããŸãã
- Redisã®ãCelery ã®ã¿ã¹ã¯ ãããŒã«ãŒãšããŠæ©èœããŸãã
- ã»ããªåŽåè ãã¿ã¹ã¯ã®çŽæ¥å®è¡ã«åŸäºããŸãã
- ãã©ã«ããž
./dags
DAG ã®èª¬æãå«ããã¡ã€ã«ãè¿œå ããŸãã ãããã¯ãã®å Žã§ååŸãããããããããã¿ããããã³ã«ã¹ã¿ãã¯å šäœããžã£ã°ãªã³ã°ããå¿ èŠã¯ãããŸããã
äŸã®ã³ãŒãã¯äžéšå®å šã«ã¯ç€ºãããŠããŸããã (ããã¹ããä¹±éã«ãªããªãããã«)ãããã»ã¹ã®ã©ããã§å€æŽãããŠããŸãã å®å šã«åäœããã³ãŒãäŸã¯ãªããžããªã«ãããŸãã
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
åèïŒ
- æ§å³ã®çµã¿ç«ãŠã«ãããŠã¯ãããç¥ãããã€ã¡ãŒãžã«å€§ããäŸåããŸããã
ããã±ã«/ããã«ãŒãšã¢ãã㌠- å¿ ããã§ãã¯ããŠãã ããã ãããããããããªãã®äººçã«ã¯ä»ã«äœãå¿ èŠãªããããããŸããã - ãã¹ãŠã®ãšã¢ãããŒèšå®ã¯ã
airflow.cfg
ã ãã§ãªãã(éçºè ã®ãããã§) ç°å¢å€æ°ãä»ããŠãç§ã¯ãããæªæãæã£ãŠå©çšããŸããã - åœç¶ã®ããšãªãããæ¬çªç°å¢ã«å¯Ÿå¿ãããã®ã§ã¯ãããŸãããæå³çã«ã³ã³ããã«ããŒãããŒããèšå®ãããã»ãã¥ãªãã£ãæ°ã«ããŸããã§ããã ããããç§ã¯å®éšè ã«ãµããããæäœéã®ããšã¯è¡ããŸããã
- ãäºæ¿ãã ããïŒ
- DAG ãã©ã«ããŒã¯ãã¹ã±ãžã¥ãŒã©ãŒãšã¯ãŒã«ãŒã®äž¡æ¹ãã¢ã¯ã»ã¹ã§ããå¿ èŠããããŸãã
- åãããšããã¹ãŠã®ãµãŒãããŒã㣠ã©ã€ãã©ãªã«ãåœãŠã¯ãŸããŸãããããã¯ãã¹ãŠãã¹ã±ãžã¥ãŒã©ãšã¯ãŒã«ãŒãåãããã·ã³ã«ã€ã³ã¹ããŒã«ããå¿ èŠããããŸãã
ããŠãããã§ç°¡åã§ãã
$ docker-compose up --scale worker=3
ãã¹ãŠãèµ·åããããWeb ã€ã³ã¿ãŒãã§ã€ã¹ã確èªã§ããŸãã
- æ°æµïŒ
http://127.0.0.1:8080/admin/ - è±ïŒ
http://127.0.0.1:5555/dashboard
ã³ã³ã»ãã
ããããã¹ãŠã®ããã°ãã§äœãç解ã§ããªãã£ãå Žåã¯ã次ã®çãèŸæžãåç §ããŠãã ããã
- ã¹ã±ãžã¥ãŒã© - Airflow ã§æãéèŠãªããããã人éã§ã¯ãªããããããäžçæžåœåãããšãå¶åŸ¡ããŸããã¹ã±ãžã¥ãŒã«ãç£èŠããããŒã¿ãæŽæ°ããã¿ã¹ã¯ãéå§ããŸãã
äžè¬ã«ãå€ãããŒãžã§ã³ã§ã¯ã¡ã¢ãªã«åé¡ããã (èšæ¶åªå€±ã§ã¯ãªãããªãŒã¯ã§ã)ãåŸæ¥ã®ãã©ã¡ãŒã¿ãèšå®ã«æ®ã£ãŠããããšããããŸããã
run_duration
â åèµ·åééã ããããä»ã¯ãã¹ãŠãé 調ã§ãã - DAG (å¥åãdugã) - ãæåéå·¡åã°ã©ããã§ããããã®ãããªå®çŸ©ã§ããã人ã¯ã»ãšãã©ããŸããããå®éã«ã¯ãçžäºäœçšããã¿ã¹ã¯ (äžèšãåç
§) ã®ã³ã³ãããŒããŸã㯠SSIS ã®ããã±ãŒãžã Informatica ã®ã¯ãŒã¯ãããŒã«çžåœããŸãã
ãã°ã«å ããŠããµããã°ããŸã ããå¯èœæ§ããããŸããããããããããã«ã¯å°éããªãã§ãããã
- DAG å®è¡ - ç¬èªã«å²ãåœãŠãããåæåããã DAG
execution_date
ã åã DAG ã® Dagran ã¯äžŠè¡ããŠåäœã§ããŸã (ãã¡ãããã¿ã¹ã¯ãåªçã«ããŠããå Žå)ã - æŒç®å ç¹å®ã®ã¢ã¯ã·ã§ã³ã®å®è¡ãæ
åœããã³ãŒãã®äžéšã§ãã æŒç®åã«ã¯æ¬¡ã® XNUMX çš®é¡ããããŸãã
- ã¢ã¯ã·ã§ã³ç§ãã¡ã®ãæ°ã«å
¥ãã®ããã«
PythonOperator
ãä»»æã® (æå¹ãª) Python ã³ãŒããå®è¡ã§ããŸãã - 転éãããŒã¿ãããå Žæããå¥ã®å Žæãžè»¢éããŸãã
MsSqlToHiveTransfer
; - ã»ã³ãµãŒ äžæ¹ãã€ãã³ããçºçãããŸã§åå¿ããããDAG ã®ãããªãå®è¡ãé
ããããããããšãã§ããŸãã
HttpSensor
æå®ããããšã³ããã€ã³ãããã«ããå¿ èŠãªå¿çãåŸ æ©ããã転éãéå§ããŸãGoogleCloudStorageToS3Operator
ã 奜å¥å¿æºçãªäººã¯ããå°ããŸãã çµå±ã®ãšãããæŒç®åã§çŽæ¥ç¹°ãè¿ããå®è¡ã§ããã®ã§ããã ãããŠãäžæããããªãã¬ãŒã¿ãŒã«ãã£ãŠã¿ã¹ã¯ã®ããŒã«ãè©°ãŸããªãããã«ããããã§ãã ã»ã³ãµãŒã¯èµ·åããŠç¢ºèªãã次ã®è©Šè¡ã®åã«åæ¢ããŸãã
- ã¢ã¯ã·ã§ã³ç§ãã¡ã®ãæ°ã«å
¥ãã®ããã«
- ä»äº - 宣èšããããªãã¬ãŒã¿ãŒã¯ãã¿ã€ãã«é¢ä¿ãªããDAG ã«ã¢ã¿ãããããã¿ã¹ã¯ã®ã©ã³ã¯ã«ææ ŒããŸãã
- ã¿ã¹ã¯ã€ã³ã¹ã¿ã³ã¹ - ç·åãã©ã³ããŒããããã©ãŒããŒãšã¯ãŒã«ãŒã®æŠéã«ã¿ã¹ã¯ãéä¿¡ããææãæ¥ããšå€æãããšã (䜿çšããå Žåã¯ãã®å Žã§)
LocalExecutor
ãŸãã¯ã次ã®å Žåã¯ãªã¢ãŒã ããŒãã«éä¿¡ãããŸããCeleryExecutor
ïŒããããã«ã³ã³ããã¹ãïŒã€ãŸããäžé£ã®å€æ° - å®è¡ãã©ã¡ãŒã¿ïŒãå²ãåœãŠãã³ãã³ããŸãã¯ã¯ãšãª ãã³ãã¬ãŒããå±éããããããããŒã«ããŸãã
ã¿ã¹ã¯ãçæããŸã
ãŸãããã°ã®äžè¬çãªã¹ããŒã ã®æŠèŠã説æããŸãã次ã«ãããã€ãã®éèŠãªè§£æ±ºçãé©çšããããã詳现ãããã«è©³ããèŠãŠãããŸãã
ãããã£ãŠãæãåçŽãªåœ¢åŒã§ã¯ããã®ãã㪠DAG ã¯æ¬¡ã®ããã«ãªããŸãã
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
ãããç解ããŸãããïŒ
- ãŸããå¿ èŠãªã©ã€ãã©ãªãã€ã³ããŒããã äœãä»ã®ãã®;
sql_server_ds
- ã§ãList[namedtuple[str, str]]
Airflow Connections ããã®æ¥ç¶ã®ååãšãã¬ãŒãã®ååŸå ãšãªãããŒã¿ããŒã¹ãdag
- ç§ãã¡ã®ãã°ã®çºè¡šãããã¯å¿ ãå«ãŸããŠããå¿ èŠããããŸãglobals()
ããã§ãªãå ŽåãAirflow ã¯ãããèŠã€ããããŸããã ãã°ã¯æ¬¡ã®ããã«ãèšãå¿ èŠããããŸãã- 圌ã®ååã¯
orders
- ãã®åå㯠Web ã€ã³ã¿ãŒãã§ã€ã¹ã«è¡šç€ºãããŸãã - 圌ã¯XNUMXæXNUMXæ¥ã®çå€äžããåãäºå®ã ãšããã
- ãããŠãããã¯ããã 6 æéããšã«å®è¡ãããã¯ãã§ã (ããã§ã¯ã代ããã«ã¿ããªäººã®ããã«
timedelta()
蚱容å¯èœcron
-ã©ã€ã³0 0 0/6 ? * * *
ãããŸãã¯ãŒã«ã§ã¯ãªãå Žåã¯ã次ã®ãããªè¡šçŸã§ã@daily
);
- 圌ã®ååã¯
workflow()
ãäž»ãªä»äºãããŸãããä»ã¯ããã§ã¯ãããŸããã çŸæç¹ã§ã¯ãã³ã³ããã¹ãããã°ã«ãã³ãããã ãã§ãã- 次ã«ãã¿ã¹ã¯ãäœæãããšããç°¡åãªéæ³ã玹ä»ããŸãã
- ç§ãã¡ã¯æ å ±æºã調ã¹ãŸãã
- åæåãã
PythonOperator
ããããŒãå®è¡ããŸãworkflow()
ã ã¿ã¹ã¯ã®äžæã® (DAG å ã§ã®) ååãæå®ããDAG èªäœãçµã³ä»ããããšãå¿ããªãã§ãã ããã åœæprovide_context
次ã«ãè¿œå ã®åŒæ°ãé¢æ°ã«æ³šã蟌ã¿ãããã䜿çšããŠæ éã«åéããŸãã**context
.
ä»ã®ãšããã¯ããã ãã§ãã åŸããããã®:
- Webã€ã³ã¿ãŒãã§ã€ã¹ã®æ°ããDAGã
- XNUMX åã®ã¿ã¹ã¯ã䞊è¡ããŠå®è¡ãããŸã (AirflowãCelery ã®èšå®ãããã³ãµãŒããŒã®å®¹éã蚱容ããå Žå)ã
ãŸããã»ãŒããããŸããã
äŸåé¢ä¿ãã€ã³ã¹ããŒã«ããã®ã¯èª°ã§ãã?
ãã®å
šäœãåçŽåããããã«ãç§ã¯ãã蟌ã¿ãŸãã docker-compose.yml
åŠç requirements.txt
ãã¹ãŠã®ããŒãäžã§ã
ä»ã§ã¯ããã¯ãªããªããŸãã:
ç°è²ã®åè§ã¯ãã¹ã±ãžã¥ãŒã©ã«ãã£ãŠåŠçãããã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ã§ãã
å°ãåŸ ã¡ãŸããã¯ãŒã«ãŒã«ãã£ãŠã¿ã¹ã¯ãå²ãåœãŠãããŸãã
ãã¡ãããç·è²ã®ãã®ã¯ç¡äºã«äœæ¥ãå®äºããŸããã ã¬ããºã¯ããŸãæåããŠããªãã
ã¡ãªã¿ã«ã補åã«ã¯ãã©ã«ããŒã¯ãããŸãã
./dags
ããã·ã³éã«åæã¯ãããŸãã - ãã¹ãŠã® DAG ã¯git
Gitlab äžã§ãGitlab CI ã¯ããŒãžæã«ãã·ã³ã«æŽæ°ãé åžããŸããmaster
.
è±ã«ã€ããŠå°ã
åŽåè ãã¡ãç§ãã¡ã®ãããã¶ããå©ãã€ããŠããéãç§ãã¡ã«äœããæããŠãããããäžã€ã®ããŒã«ããã©ã¯ãŒãæãåºããŠã¿ãŸãããã
ã¯ãŒã«ãŒ ããŒãã®æŠèŠæ å ±ãå«ãæåã®ããŒãž:
å®è¡ãããã¿ã¹ã¯ãå«ãŸããæãéäžçãªããŒãž:
ç§ãã¡ã®ãããŒã«ãŒã®ã¹ããŒã¿ã¹ãèšèŒãããæãéå±ãªããŒãž:
æãæããããŒãžã«ã¯ãã¿ã¹ã¯ã®ã¹ããŒã¿ã¹ ã°ã©ããšãã®å®è¡æéã衚瀺ãããŸãã
äžè¶³ããŠãããã®ãããŒãããŸã
ããã§ããã¹ãŠã®ã¿ã¹ã¯ãããŸããããŸãããããªãã¯è² å·è ãéã³å»ãããšãã§ããŸãã
ãããŠãäœããã®çç±ã§å€ãã®è² å·è ãããŸããã Airflow ãæ£ãã䜿çšããå Žåããããã®åè§åœ¢ã¯ãããŒã¿ã確å®ã«å°çããŠããªãããšã瀺ããŠããŸãã
ãã°ãç£èŠããé害ãçºçããã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ãåèµ·åããå¿ èŠããããŸãã
ä»»æã®åè§åœ¢ãã¯ãªãã¯ãããšãå©çšå¯èœãªã¢ã¯ã·ã§ã³ã衚瀺ãããŸãã
åããŠãããã®ã奪ã£ãŠã¯ãªã¢ããããšãã§ããŸãã ã€ãŸããããã§äœãã倱æããããšãå¿ããåãã€ã³ã¹ã¿ã³ã¹ ã¿ã¹ã¯ãã¹ã±ãžã¥ãŒã©ã«éãããããšã«ãªããŸãã
èµ€ãåè§ããã¹ãŠè¡šç€ºãããç¶æ
ã§ããŠã¹ã䜿ã£ãŠãããè¡ãã®ã¯ãããŸã人éçã§ã¯ãªãããšã¯æããã§ããããã¯ç§ãã¡ã Airflow ã«æåŸ
ãããã®ã§ã¯ãããŸããã åœç¶ã®ããšãªãããç§ãã¡ã¯å€§éç Žå£å
µåšãæã£ãŠããŸãã Browse/Task Instances
ãã¹ãŠãäžåºŠã«éžæããŠãŒãã«ãªã»ããããæ£ããé ç®ãã¯ãªãã¯ããŠã¿ãŸãããã
æž æåŸã®ã¿ã¯ã·ãŒã¯æ¬¡ã®ããã«ãªããŸã (ã¿ã¯ã·ãŒã¯ãã§ã«ã¹ã±ãžã¥ãŒã©ãŒãã¹ã±ãžã¥ãŒã«ãèšå®ããã®ãåŸ ã£ãŠããŸã)ã
æ¥ç¶ãããã¯ããã®ä»ã®å€æ°
次㮠DAG ãèŠãŠã¿ãŸãããã update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ÐПÑпПЎа Ñ
ПÑПÑОе, ПÑÑеÑÑ ÐŸÐ±ÐœÐŸÐ²Ð»ÐµÐœÑ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ÐаÑаÑ, пÑПÑÑпайÑÑ, ÐŒÑ {{ dag.dag_id }} ÑÑПМОлО
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
çããã¯ã¬ããŒãæŽæ°ãããããšããããŸãã? ããã圌女ã®è©±ã§ããããŒã¿ãååŸãããœãŒã¹ã®ãªã¹ãããããŸãã ã©ãã«çœ®ãããªã¹ãããããŸãã äœããèµ·ãã£ããå£ããããããšãã¯ãã¯ã©ã¯ã·ã§ã³ã鳎ããããšãå¿ããªãã§ãã ããïŒãŸããããã¯ç§ãã¡ã®ããšã§ã¯ãããŸããããããïŒã
ãã¡ã€ã«ãããäžåºŠèª¿ã¹ãŠãæ°ãã«äžæçã«ãªã£ãç¹ãèŠãŠã¿ãŸãããã
from commons.operators import TelegramBotSendMessage
- ç¬èªã®ãªãã¬ãŒã¿ãŒãäœæããããšã劚ãããã®ã¯äœããããŸããããããå©çšããŠãUnblocked ã«ã¡ãã»ãŒãžãéä¿¡ããããã®å°ããªã©ãããŒãäœæããŸããã (ãã®æŒç®åã«ã€ããŠã¯åŸã§è©³ãã説æããŸã)ãdefault_args={}
- dag ã¯ããã¹ãŠã®æŒç®åã«åãåŒæ°ãé åžã§ããŸããto='{{ var.value.all_the_kings_men }}'
- åéto
ããŒãã³ãŒãã£ã³ã°ã¯ããŸããããJinja ãšé»åã¡ãŒã«ã®ãªã¹ããå«ãå€æ°ã䜿çšããŠåçã«çæããŸãããããæ éã«å ¥åããŸãããAdmin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
â ãªãã¬ãŒã¿ãŒãéå§ããããã®æ¡ä»¶ã ç§ãã¡ã®å Žåããã¹ãŠã®äŸåé¢ä¿ãããŸããã£ãå Žåã«ã®ã¿ãæçŽãäžåžã«å±ããŸãã éŠå°Ÿãã;tg_bot_conn_id='tg_main'
- åŒæ°conn_id
ã§äœæããæ¥ç¶ ID ãåãå ¥ããŸãAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- Telegram å ã®ã¡ãã»ãŒãžã¯ãèœã¡ãã¿ã¹ã¯ãããå Žåã«ã®ã¿é£ã³å»ããŸããtask_concurrency=1
- XNUMX ã€ã®ã¿ã¹ã¯ã®è€æ°ã®ã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ãåæã«èµ·åããããšã¯çŠæ¢ãããŠããŸãã ããããªããšãè€æ°ã®è£œåãåæã«çºå£²ãããããšã«ãªããŸããVerticaOperator
ïŒXNUMXã€ã®ããŒãã«ãèŠãªããïŒreport_update >> [email, tg]
- å šãŠVerticaOperator
次ã®ãããªæçŽãã¡ãã»ãŒãžã®éä¿¡ã«éäžããŸãã
ãã ããNotifier ãªãã¬ãŒã¿ãŒã¯èµ·åæ¡ä»¶ãç°ãªããããæ©èœããã®ã¯ XNUMX ã€ã ãã§ãã ããªãŒ ãã¥ãŒã§ã¯ããã¹ãŠãå°ãèŠèŠçã«å£ã£ãŠèŠããŸãã
ã«ã€ããŠå°ãã話ããŸã ãã¯ã ãããŠåœŒãã®å人ãã¡ - å€æ°.
ãã¯ãã¯ãããŸããŸãªæçšãªæ å ±ãæŒç®åã®åŒæ°ã«çœ®ãæããããšãã§ãã Jinja ãã¬ãŒã¹ãã«ããŒã§ãã ããšãã°ã次ã®ããã«ãªããŸãã
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
ã³ã³ããã¹ãå€æ°ã®å
容ã«å±éãããŸã execution_date
圢åŒã§ YYYY-MM-DD
: 2020-07-14
ã æãåªããŠããç¹ã¯ãã³ã³ããã¹ãå€æ°ãç¹å®ã®ã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ (ããªãŒ ãã¥ãŒã®åè§åœ¢) ã«åºå®ãããŠãããåèµ·åãããšãã¬ãŒã¹ãã«ããŒãåãå€ã«å±éãããããšã§ãã
å²ãåœãŠãããå€ã¯ãåã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ã® [ã¬ã³ããªã³ã°] ãã¿ã³ã䜿çšããŠè¡šç€ºã§ããŸãã æçŽãéãã¿ã¹ã¯ã¯æ¬¡ã®ããã«ãªããŸãã
ãããŠãã¡ãã»ãŒãžãéä¿¡ããã¿ã¹ã¯ã§ã次ã®ããã«ãªããŸãã
å©çšå¯èœãªææ°ããŒãžã§ã³ã®çµã¿èŸŒã¿ãã¯ãã®å®å
šãªãªã¹ãã¯ãããããå
¥æã§ããŸãã
ããã«ããã©ã°ã€ã³ã䜿çšãããšç¬èªã®ãã¯ãã宣èšã§ããŸãããããã¯ãŸãå¥ã®è©±ã§ãã
äºåå®çŸ©ããããã®ã«å ããŠãå€æ°ã®å€ã眮ãæããããšãã§ããŸã (ããã¯äžèšã®ã³ãŒãã§ãã§ã«äœ¿çšããŠããŸã)ã ã§äœæããŸããã Admin/Variables
ããã€ãã®ããš:
䜿ãããã®ã¯ãã¹ãŠïŒ
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
å€ã¯ã¹ã«ã©ãŒã«ããããšããJSON ã«ããããšãã§ããŸãã JSONã®å Žå:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
ç®çã®ããŒãžã®ãã¹ã䜿çšããã ãã§ãã {{ var.json.bot_config.bot.token }}
.
æåéãäžèšèšã£ãŠãã¹ã¯ãªãŒã³ã·ã§ããã XNUMX æèŠããŸãã æ¥ç¶ã ããã§ã¯ãã¹ãŠãåºæ¬çãªãã®ã§ã: ããŒãžäž Admin/Connections
æ¥ç¶ãäœæããããã«ãã°ã€ã³/ãã¹ã¯ãŒããšããå
·äœçãªãã©ã¡ãŒã¿ãè¿œå ããŸãã ãã®ãããªïŒ
ãã¹ã¯ãŒã㯠(ããã©ã«ãããã培åºçã«) æå·åããããšãã(ç§ãè¡ã£ãããã«) æ¥ç¶ã¿ã€ããçç¥ããããšãã§ããŸãã tg_main
) - å®éã®ãšãããã¿ã€ãã®ãªã¹ã㯠Airflow ã¢ãã«ã«çµã¿èŸŒãŸããŠããããœãŒã¹ ã³ãŒãã«ã¢ã¯ã»ã¹ããªãéãæ¡åŒµããããšã¯ã§ããŸãã (çªç¶äœããã°ãŒã°ã«ã§æ€çŽ¢ããªãã£ãå Žåã¯ãä¿®æ£ããŠãã ãã) ãããã ããã ãã§ã¯ã¬ãžãããååŸããããšã劚ãããã®ã¯äœããããŸãããååã
åãååã§è€æ°ã®æ¥ç¶ãäœæããããšãã§ããŸãããã®å Žåãã¡ãœãã㯠BaseHook.get_connection()
ãååã§æ¥ç¶ãååŸããŸãã ã©ã³ãã ããã€ãã®ååè
ããã®ååã§ã (ã©ãŠã³ãããã³ãäœæããæ¹ãè«ççã§ããããã㯠Airflow éçºè
ã®è¯å¿ã«ä»»ããŸããã)ã
å€æ°ãšæ¥ç¶ã¯ç¢ºãã«åªããããŒã«ã§ããããããŒã®ã©ã®éšåãã³ãŒãèªäœã«ä¿åããã©ã®éšåãä¿åã®ããã« Airflow ã«æž¡ãããšãããã©ã³ã¹ã倱ããªãããšãéèŠã§ãã äžæ¹ã§ãUI ãéããŠã¡ãŒã« ããã¯ã¹ãªã©ã®å€ããã°ããå€æŽã§ãããšäŸ¿å©ã§ãã äžæ¹ã§ãããã¯äŸç¶ãšããŠãç§ãã¡ (ç§) ãæé€ãããã£ãããŠã¹ ã¯ãªãã¯ãžã®ååž°ã§ãã
æ¥ç¶ã®æäœãã¿ã¹ã¯ã® XNUMX ã€ã§ã ããã¯ã äžè¬ã«ãAirflow ããã¯ã¯ããµãŒãããŒãã£ã®ãµãŒãã¹ãã©ã€ãã©ãªã«æ¥ç¶ããããã®ãã€ã³ãã§ãã äŸãã°ã JiraHook
Jira ãšå¯Ÿè©±ããããã®ã¯ã©ã€ã¢ã³ããéããŸã (ã¿ã¹ã¯ãååŸã«ç§»åã§ããŸã)ã SambaHook
ããŒã«ã«ãã¡ã€ã«ãããã·ã¥ã§ããŸã smb
-ç¹ã
ã«ã¹ã¿ã æŒç®åã®è§£æ
ãããŠãç§ãã¡ã¯ãããã©ã®ããã«äœãããŠããããèŠãããšã«è¿ã¥ããŸãã TelegramBotSendMessage
ã³ãŒã commons/operators.py
å®éã®æŒç®åã䜿çšããŠ:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
ããã§ã¯ãAirflow ã®ä»ã®ãã¹ãŠãšåæ§ã«ããã¹ãŠãéåžžã«ã·ã³ãã«ã§ãã
- ç¶æ¿å
BaseOperator
ããªãã®æ°ã® Airflow åºæã®æ©èœãå®è£ ãããŠããŸã (æãªãšãã«èŠãŠãã ãã) - 宣èšããããã£ãŒã«ã
template_fields
ããã§ãJinja ã¯åŠçãããã¯ããæ¢ããŸãã - ïœã«å¯Ÿããæ£ããè°è«ãæŽçãã
__init__()
ãå¿ èŠã«å¿ããŠããã©ã«ããèšå®ããŸãã - ç¥å ã®åæåãå¿ããŠããŸããã
- 察å¿ããããã¯ãéãã
TelegramBotHook
ããããã¯ã©ã€ã¢ã³ããªããžã§ã¯ããåãåããŸããã - ãªãŒããŒã©ã€ãããã (åå®çŸ©ããã) ã¡ãœãã
BaseOperator.execute()
ããªãã¬ãŒã¿ãŒãèµ·åããæéãæ¥ããšãAirfowãããããããŸã - ãã®äžã«ããã°ã€ã³ãå¿ããŠã¡ã€ã³ã¢ã¯ã·ã§ã³ãå®è£ ããŸãã ïŒã¡ãªã¿ã«ãç§ãã¡ã¯ãã°ã€ã³ããŠããŸãstdout
Оstderr
- æ°æµããããããã®ãé®æããçŸããå ã¿èŸŒã¿ãå¿ èŠã«å¿ããŠå解ããŸãã)
äœããããèŠãŠã¿ãŸããã commons/hooks.py
ã ãã¡ã€ã«ã®æåã®éšåãšããã¯èªäœã¯æ¬¡ã®ãšããã§ãã
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
ããã§äœã説æããã°ããã®ãããããŸããããéèŠãªç¹ã ããã¡ã¢ããŠãããŸãã
- ç¶æ¿ããåŒæ°ã«ã€ããŠèããŸã - ã»ãšãã©ã®å ŽåãåŒæ°ã¯æ¬¡ã® XNUMX ã€ã«ãªããŸãã
conn_id
; - æšæºçãªæ¹æ³ããªãŒããŒã©ã€ããã: èªåèªèº«ã«éçãèšãã
get_conn()
ãããã§ã¯æ¥ç¶ãã©ã¡ãŒã¿ãååã§ååŸããã»ã¯ã·ã§ã³ãååŸããã ãã§ãextra
(ãã㯠JSON ãã£ãŒã«ãã§ã)ãããã« (ç§èªèº«ã®æ瀺ã«åŸã£ãŠ!) Telegram ããã ããŒã¯ã³ãé 眮ããŸãã{"bot_token": "YOuRAwEsomeBOtToKen"}
. - ç§ãã¡ã®ã€ã³ã¹ã¿ã³ã¹ãäœæããŸã
TelegramBot
ãç¹å®ã®ããŒã¯ã³ãäžããŸãã
ããã§å
šéšã§ãã 次ã䜿çšããŠããã¯ããã¯ã©ã€ã¢ã³ããååŸã§ããŸã TelegramBotHook().clent
ãŸã㯠TelegramBotHook().get_conn()
.
ãã¡ã€ã«ã® XNUMX çªç®ã®éšåã§ã¯ãåããã®ããã©ãã°ããªãããã« Telegram REST API ã®ãã€ã¯ãã©ãããŒãäœæããŸãã python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
æ£ããæ¹æ³ã¯ããã¹ãŠãåèšããããšã§ãã
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- ãã©ã°ã€ã³ããããªãã¯ãªããžããªã«çœ®ãããªãŒãã³ãœãŒã¹ã«æäŸããŸãã
ããããã¹ãŠã調æ»ããŠããéãã¬ããŒãã®æŽæ°ã¯æ£åžžã«å€±æãããã£ãã«ã§ãšã©ãŒ ã¡ãã»ãŒãžãéä¿¡ãããŸããã ééã£ãŠãªãã確èªããŠã¿ãŸãâŠ
ç§ãã¡ã®ç¬ã®äžã§äœããå£ããŸããïŒ ããã¯ç§ãã¡ãæåŸ
ããŠããããšã§ã¯ãããŸãããïŒ ãã®éãïŒ
泚ãã€ããã§ããïŒ
äœããèŠéãããããªæ°ãããŸããïŒ SQL Server ãã Vertica ã«ããŒã¿ã転éãããšçŽæããŠããããã§ããããããåãåã£ãŠæ¬é¡ããéžããŠããŸããŸãããæªå !
ãã®æ®èè¡çºã¯æå³çãªãã®ã§ãç§ã¯ããªãã®ããã«ããã€ãã®çšèªã解èªããå¿ èŠããã£ãã ãã§ãã ããã§ããã«å ãžé²ãããšãã§ããŸãã
ç§ãã¡ã®èšç»ã¯æ¬¡ã®ãšããã§ããã
- ãã°ããŠãã ãã
- ã¿ã¹ã¯ã®çæ
- ãã¹ãŠãã©ãã»ã©çŸããããèŠãŠãã ãã
- ãã£ã«ã«ã»ãã·ã§ã³çªå·ãå²ãåœãŠã
- SQL ServerããããŒã¿ãååŸãã
- ããŒã¿ã Vertica ã«å ¥ãã
- çµ±èšã®åé
ããã§ãããããã¹ãŠå®è¡ã§ããããã«ããããã«ã次ã®å
容ã«å°ããªè¿œå ãå ããŸââããã docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
ããã§ç§ãã¡ã¯æ¬¡ã®ããã«æèµ·ããŸãã
- ãã¹ããšããŠã® Vertica
dwh
ã»ãšãã©ã®ããã©ã«ãèšå®ã§ã¯ã - SQL Server ã® XNUMX ã€ã®ã€ã³ã¹ã¿ã³ã¹ã
- åŸè
ã®ããŒã¿ããŒã¹ã«ããŒã¿ãå
¥åããŸãïŒãããªãå Žåã調ã¹ãŸããïŒ
mssql_init.py
!)
ååãããå°ãè€éãªã³ãã³ãã䜿çšããŠããã¹ãŠã®æ©èœãèµ·åããŸãã
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
ç§ãã¡ã®å¥è·¡ã®ã©ã³ããã€ã¶ãŒãçæããã¢ã€ãã ã䜿çšã§ããŸã Data Profiling/Ad Hoc Query
:
éèŠãªã®ã¯ããããã¢ããªã¹ãã«èŠããªãããšã§ã
詳ããè¿°ã¹ãŸã ETLã»ãã·ã§ã³ ããŸãããããã§ã¯ãã¹ãŠãäºçŽ°ãªããšã§ããããŒã¹ãäœæãããã®äžã«æšèãããããã¹ãŠãã³ã³ããã¹ã ãããŒãžã£ãŒã§ã©ããããŠã次ã®ããšãè¡ããŸãã
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
ã»ãã·ã§ã³.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
æãæ¥ã ç§ãã¡ã®ããŒã¿ãåéãã XNUMX ã®ããŒãã«ããã éåžžã«æ°åããªãè¡ã䜿çšããŠãããå®è¡ããŠã¿ãŸãããã
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Airflow ããå
¥æããããã¯ãå©çšããŠ
pymssql
-æ¥ç¶ - æ¥ä»ã®åœ¢åŒã®å¶éããªã¯ãšã¹ãã«çœ®ãæããŠã¿ãŸããããå¶éã¯ãã³ãã¬ãŒã ãšã³ãžã³ã«ãã£ãŠé¢æ°ã«ã¹ããŒãããŸãã
- ãªã¯ãšã¹ãããã£ãŒããã
pandas
誰ãç§ãã¡ãæãŸããŠãããã®ãDataFrame
- å°æ¥çã«ã¯åœ¹ã«ç«ã¡ãŸãã
代æ¿åã䜿çšããŠããŸã
{dt}
ãªã¯ãšã¹ããã©ã¡ãŒã¿ã®ä»£ããã«%s
ç§ãæªãããããªã ããã§ã¯ãªããpandas
察åŠã§ããªãpymssql
ãããŠæåŸã®ãã®ãæ»ããŸãparams: List
圌ã¯æ¬åœã«æãã§ããã®ã«tuple
.
éçºè ã«ã泚æããŠãã ããpymssql
ãã圌ããµããŒãããªãããšã«æ±ºããããããŠåŒã£è¶ãã®æãæ¥ãpyodbc
.
Airflow ãé¢æ°ã®åŒæ°ã«äœãè©°ã蟌ãã ããèŠãŠã¿ãŸãããã
ããŒã¿ããªãå Žåã¯ãç¶è¡ããæå³ããããŸããã ããããå å¡«ãæåãããšèããã®ãå¥åŠã§ãã ããããããã¯ééãã§ã¯ãããŸããã ããããããã©ãããã°ããã§ããïŒ ãããŠãããã次ã®ãšããã§ãã
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
Airflow ã«ãšã©ãŒããªãããšãäŒããŸãããã¿ã¹ã¯ã¯ã¹ãããããŸãã ã€ã³ã¿ãŒãã§ã€ã¹ã«ã¯ç·ãèµ€ã®åè§åœ¢ã¯ãªãããã³ã¯è²ã«ãªããŸãã
ããŒã¿ãæšãŠãŸããã è€æ°ã®å:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
ããªãã¡ïŒ
- 泚æãååŸããããŒã¿ããŒã¹ã
- ãã©ããã£ã³ã° ã»ãã·ã§ã³ã® ID (ããã¯ç°ãªããŸã) ããããã¿ã¹ã¯ã«å¯ŸããŠ),
- ãœãŒã¹ãšæ³šæ ID ããã®ããã·ã¥ - æçµçãªããŒã¿ããŒã¹ (ãã¹ãŠã XNUMX ã€ã®ããŒãã«ã«æ³šããã) ã§ã¯ãäžæã®æ³šæ ID ãåŸãããŸãã
æåŸãã XNUMX çªç®ã®ã¹ããããæ®ã£ãŠããŸãããã¹ãŠã Vertica ã«æ³šããŸãã ãããŠãå¥åŠãªããšã«ããããè¡ãæãå¹æçã§å¹æçãªæ¹æ³ã® XNUMX ã€ã¯ CSV ã䜿çšããããšã§ãã
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- å°çšã®åä¿¡æ©ã補äœäžã§ã
StringIO
. pandas
芪åã«ç§ãã¡ã®ã眮ããŸãDataFrame
ã®åœ¢ã§CSV
-è¡ã- ããã¯ã䜿çšããŠãæ°ã«å ¥ãã® Vertica ãžã®æ¥ç¶ãéããŠã¿ãŸãããã
- ãããŠä»ãå©ããåããŠ
copy()
ããŒã¿ã Vertika ã«çŽæ¥éä¿¡ããŠãã ããã
ãã©ã€ããŒããäœè¡ãåãŸã£ãŠããããååŸããã»ãã·ã§ã³ ãããŒãžã£ãŒã«ãã¹ãŠãæ£åžžã§ããããšãäŒããŸãã
session.loaded_rows = cursor.rowcount
session.successful = True
ããã ãã§ãã
販売ã§ã¯ã¿ãŒã²ãããã¬ãŒããæäœæ¥ã§äœæããŠãããŸãã ããã§ç§ã¯èªåèªèº«ã«å°ããªãã·ã³ãèš±å¯ããŸããã
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
䜿ã£ãŠããŸã
VerticaOperator()
ããŒã¿ããŒã¹ ã¹ããŒããšããŒãã«ãäœæããŸã (ãã¡ããããŸã ååšããªãå Žå)ã äž»ãªããšã¯ãäŸåé¢ä¿ãæ£ããæŽçããããšã§ãã
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
èŠçŽ
- ããã§ããã - å°ããªããºãã¯èšããŸããã - ãããããªãã§ãããä»
ç§ã森ã®äžã§æãæãããåç©ã ãšç¢ºä¿¡ããŠããŸããïŒ
ãžã¥ãªã¢ã»ããã«ããœã³ãã¶ã»ã°ã©ãã¡ããŒã
ååãšç§ã競äºãããšãããã誰ãããã« ETL ããã»ã¹ãæåããäœæããŠèµ·åãããã圌ã㯠SSIS ãšããŠã¹ãç§ã¯ Airflow ãæã£ãŠãããšæããŸã...ãããŠãã¡ã³ããã³ã¹ã®ãããããæ¯èŒããã§ããã...ãããç§ãããããé¢ã§åœŒããåãããšã«ããªããåæããŠããããšæããŸã!
ããå°ãçå£ã«èããã°ãApache Airflow ã¯ããã»ã¹ãããã°ã©ã ã³ãŒãã®åœ¢ã§èšè¿°ããããšã§ç§ã®åœ¹å²ãæãããŸããã ãã£ãš ãã£ãšå¿«é©ã«ããã£ãšæ¥œããã
ãã©ã°ã€ã³ãšã¹ã±ãŒã©ããªãã£ã®äž¡æ¹ã®ç¹ã§ããã®ç¡å¶éã®æ¡åŒµæ§ã«ãããããŒã¿ã®åéãæºåãåŠçã®å šãµã€ã¯ã«ãããã«ã¯ç«æãžã®ãã±ããã®æã¡äžããªã©ãã»ãŒããããåé㧠Airflow ã䜿çšããæ©äŒãåŸãããŸããã³ãŒã¹ïŒã
æçµããŒããåèè³æããã³æ å ±
ç§ãã¡ãããªãã®ããã«éããçæ
start_date
ã ã¯ããããã¯ãã§ã«ããŒã«ã«ããŒã ã§ãã Doug ã®äž»ãªè°è«çµç±start_date
ãã¹ãŠåæ ŒããŸãã ç°¡åã«èšããšãstart_date
çŸåšã®æ¥ä»ãããã³schedule_interval
- ããæ¥ãDAG ã¯ææ¥ããå§ãŸããŸããstart_date = datetime(2020, 7, 7, 0, 1, 2)
ããåé¡ã¯ãããŸããã
ããã«é¢é£ããå¥ã®å®è¡æãšã©ãŒããããŸãã
Task is missing the start_date parameter
ããã¯ãã»ãšãã©ã®å Žåãdag ãªãã¬ãŒã¿ãŒã«ãã€ã³ãããã®ãå¿ããããšã瀺ããŸãã- ãã¹ãŠ XNUMX å°ã®ãã·ã³äžã§ã ã¯ããããŒã¹ (Airflow èªäœãšåœç€Ÿã®ã³ãŒãã£ã³ã°)ãWeb ãµãŒããŒãã¹ã±ãžã¥ãŒã©ãŒãã¯ãŒã«ãŒãå«ãŸããŸãã ãããŠããã¯ããŸããããŸããã ããããæéã®çµéãšãšãã«ãµãŒãã¹ã®ã¿ã¹ã¯ã®æ°ãå¢å ããPostgreSQL ãã€ã³ããã¯ã¹ã« 20 ããªç§ã§ã¯ãªã 5 ç§ã§å¿çãå§ãããšããç§ãã¡ã¯ãããåãäžããŠæã¡å»ããŸããã
- ããŒã«ã«ãšã°ãŒãã¥ãŒã¿ã ã¯ããç§ãã¡ã¯ãŸã ãã®äžã«åº§ã£ãŠããŸãããããŠç§ãã¡ã¯ãã§ã«æ·±æ·µã®ç«¯ã«æ¥ãŠããŸãã ãããŸã§ã¯ LocalExecutor ã§ååã§ããããä»åºŠã¯å°ãªããšã XNUMX 人ã®ã¯ãŒã«ãŒãè¿œå ããŠæ¡åŒµããå¿ èŠããããCeleryExecutor ã«ç§»è¡ããããã«æžåœã«åªåããå¿ èŠããããŸãã ãããŠãXNUMX å°ã®ãã·ã³äžã§äœæ¥ã§ãããšããäºå®ãèæ ®ãããšãããšããµãŒããŒäžã§ãã£ãŠã Celery ã䜿çšããããšã劚ãããã®ã¯äœããããŸãããããã¡ãããæ£çŽã«èšã£ãŠãæ¬çªç°å¢ã«å°å ¥ãããããšã¯ãããŸãã!ã
- äžäœ¿çš çµã¿èŸŒã¿ããŒã«:
- Connections ãµãŒãã¹è³æ Œæ å ±ãä¿åããããã
- SLA ãã¹ æéå ã«å®äºããªãã£ãã¿ã¹ã¯ã«å¯Ÿå¿ããããã
- ãšãã¯ã¹ã³ã ã¡ã¿ããŒã¿äº€æã®ããïŒç§ã¯èšããŸããïŒ ã¡ã¿ããŒã¿!) DAG ã¿ã¹ã¯éã§ã
- ã¡ãŒã«èåŸ ã ããŠãäœãšèšãã°ããã§ããããïŒ å€±æããã¿ã¹ã¯ã®ãã¹ãŠã®ç¹°ãè¿ãã«å¯ŸããŠã¢ã©ãŒããèšå®ãããŸããã çŸåšãç§ã®è·å Žã® Gmail ã«ã¯ Airflow ããã®ã¡ãŒã«ã 90 件ãè¶ ããŠããŸãããWeb ã¡ãŒã«ã®éå£ã§ã¯äžåºŠã« 100 件ãè¶ ããã¡ãŒã«ã®åä¿¡ãšåé€ãæåŠãããŠããŸãã
ããã«èœãšãç©Ž:
Apache Airflow ã®èœãšãç©Ž
ãã®ä»ã®èªååããŒã«
ç§ãã¡ãæã䜿ããã«é ã䜿ã£ãŠããã«äœæ¥ã§ããããã«ãAirflow ã¯ä»¥äžãçšæããŸããã
REST API - 圌ã¯ãŸã å®éšæ®µéã®ã¹ããŒã¿ã¹ãä¿æããŠãããããäœæ¥ã劚ããããããšã¯ãããŸããã ããã䜿çšãããšãDAG ãšã¿ã¹ã¯ã«é¢ããæ å ±ãååŸã§ããã ãã§ãªããDAG ã®åæ¢/éå§ãDAG å®è¡ãŸãã¯ããŒã«ã®äœæãã§ããŸããCLI - å€ãã®ããŒã«ã¯ã³ãã³ã ã©ã€ã³ããå©çšã§ããŸãããWebUI ãã䜿çšããã®ãäžäŸ¿ãªã ãã§ãªããéåžžã¯ååšããŸããã äŸãã°ïŒbackfill
ã¿ã¹ã¯ã€ã³ã¹ã¿ã³ã¹ãåèµ·åããããã«å¿ èŠã§ãã
ããšãã°ãã¢ããªã¹ãããã£ãŠæ¥ãŠããèšããŸãããããããŠåå¿ãããªãã¯1æ13æ¥ããXNUMXæ¥ãŸã§ã®ããŒã¿ã«ãã³ã»ã³ã¹ããããŸãïŒ çŽããŠãçŽããŠãçŽããŠãçŽããŠïŒã ãããŠãããªãã¯ãšãŠã趣å³ã®è¯ã人ã§ããairflow backfill -s '2020-01-01' -e '2020-01-13' orders
- åºæ¬ãµãŒãã¹:
initdb
,resetdb
,upgradedb
,checkdb
. run
ããã«ãããXNUMX ã€ã®ã€ã³ã¹ã¿ã³ã¹ ã¿ã¹ã¯ãå®è¡ãããã¹ãŠã®äŸåé¢ä¿ã®ã¹ã³ã¢ãååŸããããšãã§ããŸãã ããã«ã次çµç±ã§å®è¡ã§ããŸãLocalExecutor
ã»ããªã®ã¯ã©ã¹ã¿ãŒãããå Žåã§ãã- ã»ãŒåãããšãããŸã
test
ãããŒã¹ã®ã¿ã«ãäœãæžã蟌ã¿ãŸããã connections
ã·ã§ã«ããæ¥ç¶ã倧éã«äœæã§ããŸãã
Python API - ãã©ã°ã€ã³ã察象ãšããããªãããŒãã³ã¢ãªå¯Ÿè©±æ¹æ³ã§ãããå°ããªæã§ããã«çŸ€ããããšã¯ãããŸããã ããããç§ãã¡ãè¡ãã®ã誰ãæ¢ããã®ã§ãããã/home/airflow/dags
ã èµ°ãipython
ãããŠãããå§ããŸããïŒ ããšãã°ã次ã®ã³ãŒãã䜿çšããŠãã¹ãŠã®æ¥ç¶ããšã¯ã¹ããŒãã§ããŸããfrom airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- Airflow ã¡ã¿ããŒã¿ããŒã¹ã«æ¥ç¶ããŸãã ããã«æžã蟌ãããšã¯ãå§ãããŸããããããŸããŸãªç¹å®ã®ã¡ããªã¯ã¹ã®ã¿ã¹ã¯ç¶æ
ãååŸããããšã¯ãAPI ã䜿çšãããããã¯ããã«é«éãã€ç°¡åã§ãã
ãã¹ãŠã®ã¿ã¹ã¯ãåªçã§ããããã§ã¯ãããŸããããå Žåã«ãã£ãŠã¯å€±æããå¯èœæ§ããããããã¯æ£åžžã§ãããšããŸãã ãã ããããã€ãã®è©°ãŸãã¯ãã§ã«çãããããã確èªããå¿ èŠããããŸãã
SQLã«æ³šæããŠãã ãã!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ãªãã¡ã¬ã³ã¹
ãããŠãã¡ãããGoogle ãçºè¡ããæåã® XNUMX åã®ãªã³ã¯ã¯ãç§ã®ããã¯ããŒã¯ã® Airflow ãã©ã«ããŒã®å 容ã§ãã
Apache ãšã¢ãããŒã®ããã¥ã¡ã³ã - ãã¡ããããªãã£ã¹ããå§ããªããã°ãªããŸããã ããã¥ã¢ã«ã¯ãããŸããã誰ããã®èª¬ææžãèªãã®ã§ãããã?ãã¹ããã©ã¯ãã£ã¹ - ãŸããå°ãªããšãã¯ãªãšã€ã¿ãŒããã®æšå¥šäºé ãèªãã§ãã ããããšã¢ãã㌠UI - ãŸãã«å§ãŸã: åçã®ãŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹Apache Airflow ã®äž»èŠãªæŠå¿µãç解ãã - (çªç¶!) ç§ã®èª¬æã§äœãç解ã§ããªãã£ããšããŠããåºæ¬çãªæŠå¿µã¯ååã«èª¬æãããŠããŸããTianlong ã®ããã° â Airflow ãµãŒããŒ/ã¯ã©ã¹ã¿ãŒã®æ§ç¯æ¹æ³ã«é¢ããã¬ã€ã - Airflow ã¯ã©ã¹ã¿ãŒãã»ããã¢ããããããã®çãã¬ã€ããLyft ã§ã® Apache Airflow ã®å®è¡ - ãããã圢åŒäž»çŸ©ãå€ããäŸãå°ãªãããšãé€ããŠãã»ãŒåãèå³æ·±ãèšäºã§ããApache Airflow ã Celery ã¯ãŒã«ãŒã«ãžã§ããåæ£ããæ¹æ³ â Celery ãšã®é£æºã«ã€ããŠãApache Airflow ã§ã® DAG äœæã®ãã¹ã ãã©ã¯ãã£ã¹ - ã¿ã¹ã¯ã®ã¹ãçæ§ãæ¥ä»ã§ã¯ãªã ID ã«ããããŒããå€æããã¡ã€ã«æ§é ããã®ä»ã®èå³æ·±ãç¹ã«ã€ããŠãApache Airflow ã§ã®äŸåé¢ä¿ã®ç®¡ç - ã¿ã¹ã¯ãšããªã¬ãŒ ã«ãŒã«ã®äŸåé¢ä¿ãããã¯ã€ãã§ã«è§ŠããŸããããšã¢ãããŒ: DAG ãã¹ã±ãžã¥ãŒã«ããå€§å¹ ã«é ããŠããå Žå - ã¹ã±ãžã¥ãŒã©ã®äžéšã®ãæå³ãããšããã«åäœãããåé¡ã解決ãã倱ãããããŒã¿ãããŠã³ããŒãããã¿ã¹ã¯ã«åªå é äœãä»ããæ¹æ³ãApache Airflow ã«åœ¹ç«ã€ SQL ã¯ãšãª â Airflow ã¡ã¿ããŒã¿ã«å¯Ÿãã䟿å©ãª SQL ã¯ãšãªãApache Airflow ã䜿çšããŠã¯ãŒã¯ãããŒã®éçºãå§ããŸããã - ã«ã¹ã¿ã ã»ã³ãµãŒã®äœæã«é¢ãã圹ç«ã€ã»ã¯ã·ã§ã³ããããŸããPresto ãš Airflow ã䜿çšã㊠AWS äžã« Fetchr ããŒã¿ ãµã€ãšã³ã¹ ã€ã³ãã©ãæ§ç¯ãã â ããŒã¿ãµã€ãšã³ã¹çšã® AWS ã§ã®ã€ã³ãã©ã¹ãã©ã¯ãã£ã®æ§ç¯ã«é¢ããèå³æ·±ãçãã¡ã¢ãAirflow DAG ã®ãããã°æã«ç¢ºèªãã¹ã 7 ã€ã®äžè¬çãªãšã©ãŒ - ããããééãïŒèª°ãããŸã 説ææžãèªãã§ããªãå ŽåïŒãApache Airflow ã䜿çšããŠãã¹ã¯ãŒããä¿åããã³ã¢ã¯ã»ã¹ãã - Connections ã䜿çšããããšãã§ããŸããããã¹ã¯ãŒããä¿åããã®ã«èŠåŽããŠãã人ã ã®æ§åã«åŸ®ç¬ãã§ãã ãããPython ãš Apache Airflow ã®çŠ - æé»ç㪠DAG 転éãé¢æ°ã§ã®ã³ã³ããã¹ãã®ã¹ããŒãäŸåé¢ä¿ãããã³ã¿ã¹ã¯èµ·åã®ã¹ãããã«ã€ããŠã説æããŸãããšã¢ãããŒ: ããŸãç¥ãããŠããªããã³ããã³ãããã¹ã ãã©ã¯ãã£ã¹ - ãå©çšã«ã€ããŠdefault arguments
Оparams
ãã³ãã¬ãŒãã ãã§ãªããå€æ°ãæ¥ç¶ã§ã䜿çšã§ããŸãããšã¢ãã㌠ã¹ã±ãžã¥ãŒã©ã®ãããã¡ã€ãªã³ã° - ãã©ã³ããŒã Airflow 2.0 ã«åããŠã©ã®ããã«æºåããŠãããã«ã€ããŠã®è©±ãdocker-compose ã§ã® 3 ã€ã® Celery ã¯ãŒã«ãŒã䜿çšãã Apache Airflow - ã¯ã©ã¹ã¿ãŒã®ãããã€ã«é¢ããå°ãå€ãèšäºdocker-compose
.4 Airflow ã³ã³ããã¹ãã䜿çšãããã³ãã¬ãŒãåã¿ã¹ã¯ - ãã³ãã¬ãŒããšã³ã³ããã¹ã転éã䜿çšããåçã¿ã¹ã¯ãAirflow ã§ã®ãšã©ãŒéç¥ â ã¡ãŒã«ãš Slack ã«ããæšæºããã³ã«ã¹ã¿ã éç¥ãAirflow Workshop: æŸèæã䜿ããªãè€é㪠DAG - ã¿ã¹ã¯ããã¯ããXCom ã®åå²ã
èšäºå ã§äœ¿çšãããŠãããªã³ã¯:
ãã¯ããªãã¡ã¬ã³ã¹ - ãã³ãã¬ãŒãã§äœ¿çšã§ãããã¬ãŒã¹ãã«ããŒãããããèœãšãç©Ž - ãšã¢ãã㌠â ããã°ãäœæãããšãã«ããããééããpuckel/docker-airflow: Docker Apache ãšã¢ãã㌠-docker-compose
å®éšããããã°ãªã©ã«ãpython-telegram-bot/python-telegram-bot: ç§ãã¡ã¯ããªããæåŠã§ããªãã©ãããŒã«ããŸãã â Telegram REST API ã® Python ã©ãããŒã
åºæïŒ habr.com