ããã«ã¡ã¯ãç§ã¯ããããªãŒ ãã°ãŽã£ãã³ã³ã§ããVezet ã°ã«ãŒãäŒæ¥ã®åæéšéã®ããŒã¿ ãšã³ãžãã¢ã§ãã
ETL ããã»ã¹ãéçºããããã®çŽ æŽãããããŒã«ã§ãã Apache Airflow ã«ã€ããŠèª¬æããŸãã ãã ããAirflow ã¯éåžžã«å€æ©èœã§å€é¢çã§ãããããããŒã¿ ãããŒã«é¢äžããŠããªããã宿çã«ããã»ã¹ãèµ·åããŠãã®å®è¡ãç£èŠããå¿ èŠãããå Žåã§ããAirflow ã詳ããæ€èšããå¿ èŠããããŸãã
ãããŠãã¯ããç§ã¯äŒããã ãã§ãªããããã°ã©ã ã«ãå€ãã®ã³ãŒããã¹ã¯ãªãŒã³ã·ã§ãããæšå¥šäºé ãå«ãŸããŠããããšã瀺ããŸãã

Airflow ãšããåèªã Google ã§æ€çŽ¢ãããšãã衚瀺ããããã® / ãŠã£ãã¡ãã£ã¢ ã³ã¢ã³ãº
ç®æ¬¡
å°å ¥
Apache Airflow 㯠Django ã«äŒŒãŠããŸãã
- Pythonã§æžããã
- çŽ æŽããã管çããã«ãããã
- ç¡éã«æ¡å€§ãã
- ããè¯ãã ãã§ããããŸã£ããç°ãªãç®çã®ããã«äœæãããŸãããã€ãŸãã(kat ã®åã«æžãããŠããããã«):
- ç¡å¶éã®æ°ã®ãã·ã³ã§ã¿ã¹ã¯ãå®è¡ããã³ç£èŠ (Celery / Kubernetes ãšè¯å¿ãèš±ãéã)
- éåžžã«æžããããçè§£ãããã Python ã³ãŒãããã®åçãªã¯ãŒã¯ãããŒçæã䜿çš
- ãããŠãæ¢è£œã®ã³ã³ããŒãã³ããšèªå®¶è£œãã©ã°ã€ã³ã®äž¡æ¹ã䜿çšããŠãããŒã¿ããŒã¹ãš API ãçžäºã«æ¥ç¶ããæ©èœ (ããã¯éåžžã«ç°¡åã§ã)ã
Apache Airflow ãæ¬¡ã®ããã«äœ¿çšããŸãã
- åœç€Ÿã¯ãDWH ãš ODS (Vertica ãš Clickhouse ããããŸã) ã®ããŸããŸãªãœãŒã¹ (å€ãã® SQL Server ããã³ PostgreSQL ã€ã³ã¹ã¿ã³ã¹ãã¢ããªã±ãŒã·ã§ã³ ã¡ããªã¯ã¹ãåããããŸããŸãª APIã1C ãå«ã) ããããŒã¿ãåéããŸãã
- ã©ã®ãããé²ãã§ããã®ã
cronãODS äžã§ããŒã¿çµ±åããã»ã¹ãéå§ãããã®ã¡ã³ããã³ã¹ãç£èŠããŸãã
æè¿ãŸã§ãç§ãã¡ã®ããŒãºã¯ 32 ã³ã¢ãš 50 GB ã® RAM ãåãã XNUMX å°ã®å°åãµãŒããŒã§ã«ããŒãããŠããŸããã Airflow ã§ã¯ãããã¯æ¬¡ã®ããã«æ©èœããŸãã
- бПлее 200ãã° (å®éã«ã¯ã¿ã¹ã¯ãè©°ã蟌ãã ã¯ãŒã¯ãããŒ)ã
- ããããå¹³åã㊠70ã®ã¿ã¹ã¯,
- ãã®è¯ãã¯å§ãŸããŸãïŒå¹³åçã«ãïŒ XNUMXæéã«XNUMXå.
ãããŠãã©ã®ããã«æ¡åŒµãããã«ã€ããŠã¯ã以äžã«æžããŸãããããã§ã解決ããè¶ åé¡ãå®çŸ©ããŸãããã
ãªãªãžãã«ã® SQL Server ã 50 ã€ãããããããã« XNUMX ã®ããŒã¿ããŒã¹ (ãããã XNUMX ã€ã®ãããžã§ã¯ãã®ã€ã³ã¹ã¿ã³ã¹) ãããããããã¯åãæ§é (ã»ãŒã©ãã§ããã ã¢ããã) ãæã£ãŠããŸããã€ãŸããããããã« Orders ããŒãã« (幞ããªããšã«ããã®ããŒãã«ãå«ãŸããŠããŸã) ããããŸããååã¯ããããããžãã¹ã«ããã·ã¥ã§ããŸã)ã ãµãŒãã¹ ãã£ãŒã«ã (ãœãŒã¹ ãµãŒããŒããœãŒã¹ ããŒã¿ããŒã¹ãETL ã¿ã¹ã¯ ID) ã远å ããŠããŒã¿ãååŸããããããããšãã° Vertica ã«åçŽã«æå ¥ããŸãã
è¡ããïŒ
äž»èŠéšåãå®è·µçïŒãããŠå°ãçè«çïŒ
ãªãç§ãã¡ïŒãããŠããªãïŒã¯
æšã倧ãããŠç§ãçŽ æŽã ã£ãé SQL-schik ãã·ã¢ã®ããå°å£²åºã§ã¯ãå©çšå¯èœãª XNUMX ã€ã®ããŒã«ã䜿çšã㊠ETL ããã»ã¹ãå¥åããŒã¿ ãããŒãè©æ¬ºããŸããã
- ã€ã³ãã©ããã£ã« ãã¯ãŒ ã»ã³ã¿ãŒ - ç¬èªã®ããŒããŠã§ã¢ãšç¬èªã®ããŒãžã§ã³ç®¡çãåãããéåžžã«æ®åããéåžžã«çç£æ§ã®é«ãã·ã¹ãã ã ç¥ããã®èœåã®1%ãçŠããã®ã§äœ¿çšããŸããã ãªãïŒ ãŸã第äžã«ããã®ã€ã³ã¿ãŒãã§ã€ã¹ã¯ 380 幎代ã®ãã®ã§ãç§ãã¡ã«ç²Ÿç¥çãªãã¬ãã·ã£ãŒãäžããŸããã 第äºã«ããã®è£
眮ã¯éåžžã«è€éãªããã»ã¹ãã³ã³ããŒãã³ãã®ççãªåå©çšããã®ä»ã®éåžžã«éèŠãªäŒæ¥ã®ããªãã¯åãã«èšèšãããŠããŸãã ãšã¢ãã¹AXNUMXã®ç¿Œã®ããã«ã幎éè²»çšãããããšããäºå®ã«ã€ããŠã¯ãäœãèšããŸããã
ã¹ã¯ãªãŒã³ã·ã§ãã㯠30 æ³æªæºã®äººãå°ãå·ã€ããå¯èœæ§ãããã®ã§æ³šæããŠãã ãã

- SQL ãµãŒããŒçµ±åãµãŒã㌠- ãããžã§ã¯ãå
ãããŒã§ãã®ä»²éã䜿çšããŸããã å®éã®ãšãããç§ãã¡ã¯ãã§ã« SQL Server ã䜿çšããŠããããã® ETL ããŒã«ã䜿çšããªãã®ã¯ã©ããããããäžåçã§ãã ãã®å
容ã¯ãã¹ãŠè¯å¥œã§ããã€ã³ã¿ãŒãã§ãŒã¹ãçŸããã鲿ã¬ããŒããåªããŠããŸãããããããããç§ãã¡ããœãããŠã§ã¢è£œåãæããçç±ã§ã¯ãããŸããããããããã®ããã§ã¯ãããŸããã ããŒãžã§ã³ã¢ãããã
dtsx(ä¿åæã«ããŒããã·ã£ããã«ããã XML ã§ã) ã§ããŸãããäœãæå³ãããã®ã§ãããã? äœçŸãã®ããŒãã«ããããµãŒããŒããå¥ã®ãµãŒããŒã«ãã©ãã°ããã¿ã¹ã¯ ããã±ãŒãžãäœæããŠã¿ãŠã¯ã©ãã§ãããã? ã¯ããäœãš XNUMX åã§ããããŠã¹ã®ãã¿ã³ãã¯ãªãã¯ãããšã人差ãæã XNUMX åã®éšåããèœã¡ãŠããŸããŸãã ãããããã®ã»ãããã¡ãã·ã§ããã«ã«èŠããã®ã¯ééããããŸããã
ç§ãã¡ã¯ç¢ºãã«åºå£ãæ¢ããŠããŸããã å¶æ°ã®å Žå æ®ã© èªäœã® SSIS ããã±ãŒãž ãžã§ãã¬ãŒã¿ãŒã«ãã©ãçããŸãã ...
âŠãããŠãæ°ããä»äºãç§ãèŠã€ããŸããã ãããŠãApache Airflow ãç§ã远ãè¶ããŸããã
ETL ããã»ã¹ã®èšè¿°ãåçŽãª Python ã³ãŒãã§ãããšç¥ã£ããšããç§ã¯ãã åã³ã«èžããããŸããã§ããã ãã®ããã«ããŠãããŒã¿ ã¹ããªãŒã ã®ããŒãžã§ã³ç®¡çãšå·®åãè¡ãããæ°çŸã®ããŒã¿ããŒã¹ããåäžã®æ§é ãæã€ããŒãã«ã 13 ã€ã®ã¿ãŒã²ããã«æµã蟌ãããšããXNUMX ã€åãŸã㯠XNUMX ã€ã® XNUMX ã€ã³ãç»é¢å ã® Python ã³ãŒãã®åé¡ã«ãªããŸããã
ã¯ã©ã¹ã¿ãŒã®çµã¿ç«ãŠ
å®å šã«å¹Œçšåã®ãããªãã®ãæé ããããAirflowãéžæããããŒã¿ããŒã¹ãCeleryãããã¯ã§èª¬æãããŠãããã®ä»ã®ã±ãŒã¹ã®ã€ã³ã¹ããŒã«ãªã©ãå®å šã«æçœãªããšã«ã€ããŠããã§è©±ãã®ã¯ãããŸãããã
ããã«å®éšãå§ããããããã«ãã¹ã±ãããããŸãã docker-compose.yml ãã®äžã§ïŒ
- å®éã«è²ãŠãŠã¿ãŸããã ãšã¢ãããŒ: ã¹ã±ãžã¥ãŒã©ãWeb ãµãŒããŒã Flower ãããã§å転ã㊠Celery ã¿ã¹ã¯ãç£èŠããŸã (ãã§ã«ããã·ã¥ãããŠãããã)
apache/airflow:1.10.10-python3.7ãã§ãæ°ã«ããŸããïŒ - PostgreSQLããã«ãAirflow ã¯ãµãŒãã¹æ å ± (ã¹ã±ãžã¥ãŒã©ãŒ ããŒã¿ãå®è¡çµ±èšãªã©) ãæžã蟌ã¿ãCelery ã¯å®äºããã¿ã¹ã¯ãããŒã¯ããŸãã
- Redisã®ãCelery ã®ã¿ã¹ã¯ ãããŒã«ãŒãšããŠæ©èœããŸãã
- ã»ããªåŽåè ãã¿ã¹ã¯ã®çŽæ¥å®è¡ã«åŸäºããŸãã
- ãã©ã«ããž
./dagsDAG ã®èª¬æãå«ããã¡ã€ã«ã远å ããŸãã ãããã¯ãã®å Žã§ååŸãããããããããã¿ããããã³ã«ã¹ã¿ãã¯å šäœããžã£ã°ãªã³ã°ããå¿ èŠã¯ãããŸããã
äŸã®ã³ãŒãã¯äžéšå®å šã«ã¯ç€ºãããŠããŸããã (ããã¹ããä¹±éã«ãªããªãããã«)ãããã»ã¹ã®ã©ããã§å€æŽãããŠããŸãã å®å šã«åäœããã³ãŒãäŸã¯ãªããžããªã«ãããŸãã .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokeråèïŒ
- æ§å³ã®çµã¿ç«ãŠã«ãããŠã¯ãããç¥ãããã€ã¡ãŒãžã«å€§ããäŸåããŸããã - å¿ ããã§ãã¯ããŠãã ããã ãããããããããªãã®äººçã«ã¯ä»ã«äœãå¿ èŠãªããããããŸããã
- ãã¹ãŠã®ãšã¢ãããŒèšå®ã¯ã
airflow.cfgã ãã§ãªãã(éçºè ã®ãããã§) ç°å¢å€æ°ãä»ããŠãç§ã¯ãããæªæãæã£ãŠå©çšããŸããã - åœç¶ã®ããšãªãããæ¬çªç°å¢ã«å¯Ÿå¿ãããã®ã§ã¯ãããŸãããæå³çã«ã³ã³ããã«ããŒãããŒããèšå®ãããã»ãã¥ãªãã£ãæ°ã«ããŸããã§ããã ããããç§ã¯å®éšè ã«ãµããããæäœéã®ããšã¯è¡ããŸããã
- ãäºæ¿ãã ããïŒ
- DAG ãã©ã«ããŒã¯ãã¹ã±ãžã¥ãŒã©ãŒãšã¯ãŒã«ãŒã®äž¡æ¹ãã¢ã¯ã»ã¹ã§ããå¿ èŠããããŸãã
- åãããšããã¹ãŠã®ãµãŒãããŒã㣠ã©ã€ãã©ãªã«ãåœãŠã¯ãŸããŸãããããã¯ãã¹ãŠãã¹ã±ãžã¥ãŒã©ãšã¯ãŒã«ãŒãåãããã·ã³ã«ã€ã³ã¹ããŒã«ããå¿ èŠããããŸãã
ããŠãããã§ç°¡åã§ãã
$ docker-compose up --scale worker=3ãã¹ãŠãèµ·åããããWeb ã€ã³ã¿ãŒãã§ã€ã¹ã確èªã§ããŸãã
- æ°æµïŒ
- è±ïŒ
ã³ã³ã»ãã
ããããã¹ãŠã®ããã°ãã§äœãçè§£ã§ããªãã£ãå Žåã¯ã次ã®çãèŸæžãåç §ããŠãã ããã
- ã¹ã±ãžã¥ãŒã© - Airflow ã§æãéèŠãªããããã人éã§ã¯ãªããããããäžçæžåœåãããšãå¶åŸ¡ããŸããã¹ã±ãžã¥ãŒã«ãç£èŠããããŒã¿ãæŽæ°ããã¿ã¹ã¯ãéå§ããŸãã
äžè¬ã«ãå€ãããŒãžã§ã³ã§ã¯ã¡ã¢ãªã«åé¡ããã (èšæ¶åªå€±ã§ã¯ãªãããªãŒã¯ã§ã)ãåŸæ¥ã®ãã©ã¡ãŒã¿ãèšå®ã«æ®ã£ãŠããããšããããŸããã
run_durationâ åèµ·åééã ããããä»ã¯ãã¹ãŠãé 調ã§ãã - DAG (å¥åãdugã) - ãæåéå·¡åã°ã©ããã§ããããã®ãããªå®çŸ©ã§ããã人ã¯ã»ãšãã©ããŸããããå®éã«ã¯ãçžäºäœçšããã¿ã¹ã¯ (äžèšãåç
§) ã®ã³ã³ãããŒããŸã㯠SSIS ã®ããã±ãŒãžã Informatica ã®ã¯ãŒã¯ãããŒã«çžåœããŸãã
ãã°ã«å ããŠããµããã°ããŸã ããå¯èœæ§ããããŸããããããããããã«ã¯å°éããªãã§ãããã
- DAG å®è¡ - ç¬èªã«å²ãåœãŠãããåæåããã DAG
execution_dateã åã DAG ã® Dagran ã¯äžŠè¡ããŠåäœã§ããŸã (ãã¡ãããã¿ã¹ã¯ãåªçã«ããŠããå Žå)ã - Operator ç¹å®ã®ã¢ã¯ã·ã§ã³ã®å®è¡ãæ
åœããã³ãŒãã®äžéšã§ãã æŒç®åã«ã¯æ¬¡ã® XNUMX çš®é¡ããããŸãã
- ã¢ã¯ã·ã§ã³ç§ãã¡ã®ãæ°ã«å
¥ãã®ããã«
PythonOperatorãä»»æã® (æå¹ãª) Python ã³ãŒããå®è¡ã§ããŸãã - 転éãããŒã¿ãããå Žæããå¥ã®å Žæãžè»¢éããŸãã
MsSqlToHiveTransfer; - ã»ã³ãµãŒ äžæ¹ãã€ãã³ããçºçãããŸã§åå¿ããããDAG ã®ãããªãå®è¡ãé
ããããããããšãã§ããŸãã
HttpSensoræå®ããããšã³ããã€ã³ãããã«ããå¿ èŠãªå¿çãåŸ æ©ããã転éãéå§ããŸãGoogleCloudStorageToS3Operatorã 奜å¥å¿æºçãªäººã¯ããå°ããŸãã çµå±ã®ãšãããæŒç®åã§çŽæ¥ç¹°ãè¿ããå®è¡ã§ããã®ã§ããã ãããŠãäžæããããªãã¬ãŒã¿ãŒã«ãã£ãŠã¿ã¹ã¯ã®ããŒã«ãè©°ãŸããªãããã«ããããã§ãã ã»ã³ãµãŒã¯èµ·åããŠç¢ºèªããæ¬¡ã®è©Šè¡ã®åã«åæ¢ããŸãã
- ã¢ã¯ã·ã§ã³ç§ãã¡ã®ãæ°ã«å
¥ãã®ããã«
- ä»äº - 宣èšããããªãã¬ãŒã¿ãŒã¯ãã¿ã€ãã«é¢ä¿ãªããDAG ã«ã¢ã¿ãããããã¿ã¹ã¯ã®ã©ã³ã¯ã«ææ ŒããŸãã
- ã¿ã¹ã¯ã€ã³ã¹ã¿ã³ã¹ - ç·åãã©ã³ããŒããããã©ãŒããŒãšã¯ãŒã«ãŒã®æŠéã«ã¿ã¹ã¯ãéä¿¡ããææãæ¥ããšå€æãããšã (䜿çšããå Žåã¯ãã®å Žã§)
LocalExecutorãŸãã¯ã次ã®å Žåã¯ãªã¢ãŒã ããŒãã«éä¿¡ãããŸããCeleryExecutorïŒããããã«ã³ã³ããã¹ãïŒã€ãŸããäžé£ã®å€æ° - å®è¡ãã©ã¡ãŒã¿ïŒãå²ãåœãŠãã³ãã³ããŸãã¯ã¯ãšãª ãã³ãã¬ãŒããå±éããããããããŒã«ããŸãã
ã¿ã¹ã¯ãçæããŸã
ãŸãããã°ã®äžè¬çãªã¹ããŒã ã®æŠèŠã説æããŸããæ¬¡ã«ãããã€ãã®éèŠãªè§£æ±ºçãé©çšããããã詳现ãããã«è©³ããèŠãŠãããŸãã
ãããã£ãŠãæãåçŽãªåœ¢åŒã§ã¯ããã®ãã㪠DAG ã¯æ¬¡ã®ããã«ãªããŸãã
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)ãããçè§£ããŸãããïŒ
- ãŸããå¿ èŠãªã©ã€ãã©ãªãã€ã³ããŒããã äœãä»ã®ãã®;
sql_server_ds- ã§ãList[namedtuple[str, str]]Airflow Connections ããã®æ¥ç¶ã®ååãšãã¬ãŒãã®ååŸå ãšãªãããŒã¿ããŒã¹ãdag- ç§ãã¡ã®ãã°ã®çºè¡šãããã¯å¿ ãå«ãŸããŠããå¿ èŠããããŸãglobals()ããã§ãªãå ŽåãAirflow ã¯ãããèŠã€ããããŸããã ãã°ã¯æ¬¡ã®ããã«ãèšãå¿ èŠããããŸãã- 圌ã®ååã¯
orders- ãã®åå㯠Web ã€ã³ã¿ãŒãã§ã€ã¹ã«è¡šç€ºãããŸãã - 圌ã¯XNUMXæXNUMXæ¥ã®çå€äžããåãäºå®ã ãšããã
- ãããŠãããã¯ããã 6 æéããšã«å®è¡ãããã¯ãã§ã (ããã§ã¯ã代ããã«ã¿ããªäººã®ããã«
timedelta()蚱容å¯èœcron-ã©ã€ã³0 0 0/6 ? * * *ãããŸãã¯ãŒã«ã§ã¯ãªãå Žåã¯ã次ã®ãããªè¡šçŸã§ã@daily);
- 圌ã®ååã¯
workflow()ãäž»ãªä»äºãããŸãããä»ã¯ããã§ã¯ãããŸããã çŸæç¹ã§ã¯ãã³ã³ããã¹ãããã°ã«ãã³ãããã ãã§ãã- 次ã«ãã¿ã¹ã¯ãäœæãããšããç°¡åãªéæ³ã玹ä»ããŸãã
- ç§ãã¡ã¯æ å ±æºã調ã¹ãŸãã
- åæåãã
PythonOperatorããããŒãå®è¡ããŸãworkflow()ã ã¿ã¹ã¯ã®äžæã® (DAG å ã§ã®) ååãæå®ããDAG èªäœãçµã³ä»ããããšãå¿ããªãã§ãã ããã åœæprovide_context次ã«ã远å ã®åŒæ°ã颿°ã«æ³šã蟌ã¿ãããã䜿çšããŠæ éã«åéããŸãã**context.
ä»ã®ãšããã¯ããã ãã§ãã åŸããããã®:
- Webã€ã³ã¿ãŒãã§ã€ã¹ã®æ°ããDAGã
- XNUMX åã®ã¿ã¹ã¯ã䞊è¡ããŠå®è¡ãããŸã (AirflowãCelery ã®èšå®ãããã³ãµãŒããŒã®å®¹éã蚱容ããå Žå)ã
ãŸããã»ãŒããããŸããã

äŸåé¢ä¿ãã€ã³ã¹ããŒã«ããã®ã¯èª°ã§ãã?
ãã®å
šäœãåçŽåããããã«ãç§ã¯ãã蟌ã¿ãŸãã docker-compose.yml åŠç requirements.txt ãã¹ãŠã®ããŒãäžã§ã
ä»ã§ã¯ããã¯ãªããªããŸãã:

ç°è²ã®åè§ã¯ãã¹ã±ãžã¥ãŒã©ã«ãã£ãŠåŠçãããã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ã§ãã
å°ãåŸ ã¡ãŸããã¯ãŒã«ãŒã«ãã£ãŠã¿ã¹ã¯ãå²ãåœãŠãããŸãã

ãã¡ãããç·è²ã®ãã®ã¯ç¡äºã«äœæ¥ãå®äºããŸããã ã¬ããºã¯ããŸãæåããŠããªãã
ã¡ãªã¿ã«ã補åã«ã¯ãã©ã«ããŒã¯ãããŸãã
./dagsããã·ã³éã«åæã¯ãããŸãã - ãã¹ãŠã® DAG ã¯gitGitlab äžã§ãGitlab CI ã¯ããŒãžæã«ãã·ã³ã«æŽæ°ãé åžããŸããmaster.
è±ã«ã€ããŠå°ã
åŽåè ãã¡ãç§ãã¡ã®ãããã¶ããå©ãã€ããŠããéãç§ãã¡ã«äœããæããŠãããããäžã€ã®ããŒã«ããã©ã¯ãŒãæãåºããŠã¿ãŸãããã
ã¯ãŒã«ãŒ ããŒãã®æŠèŠæ å ±ãå«ãæåã®ããŒãž:

å®è¡ãããã¿ã¹ã¯ãå«ãŸããæãéäžçãªããŒãž:

ç§ãã¡ã®ãããŒã«ãŒã®ã¹ããŒã¿ã¹ãèšèŒãããæãéå±ãªããŒãž:

æãæããããŒãžã«ã¯ãã¿ã¹ã¯ã®ã¹ããŒã¿ã¹ ã°ã©ããšãã®å®è¡æéã衚瀺ãããŸãã

äžè¶³ããŠãããã®ãããŒãããŸã
ããã§ããã¹ãŠã®ã¿ã¹ã¯ãããŸããããŸãããããªãã¯è² å·è ãéã³å»ãããšãã§ããŸãã

ãããŠãäœããã®çç±ã§å€ãã®è² å·è ãããŸããã Airflow ãæ£ãã䜿çšããå Žåããããã®åè§åœ¢ã¯ãããŒã¿ã確å®ã«å°çããŠããªãããšã瀺ããŠããŸãã
ãã°ãç£èŠããé害ãçºçããã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ãåèµ·åããå¿ èŠããããŸãã
ä»»æã®åè§åœ¢ãã¯ãªãã¯ãããšãå©çšå¯èœãªã¢ã¯ã·ã§ã³ã衚瀺ãããŸãã

åããŠãããã®ã奪ã£ãŠã¯ãªã¢ããããšãã§ããŸãã ã€ãŸããããã§äœãã倱æããããšãå¿ããåãã€ã³ã¹ã¿ã³ã¹ ã¿ã¹ã¯ãã¹ã±ãžã¥ãŒã©ã«éãããããšã«ãªããŸãã

èµ€ãåè§ããã¹ãŠè¡šç€ºãããç¶æ
ã§ããŠã¹ã䜿ã£ãŠãããè¡ãã®ã¯ãããŸã人éçã§ã¯ãªãããšã¯æããã§ããããã¯ç§ãã¡ã Airflow ã«æåŸ
ãããã®ã§ã¯ãããŸããã åœç¶ã®ããšãªãããç§ãã¡ã¯å€§éç Žå£å
µåšãæã£ãŠããŸãã Browse/Task Instances

ãã¹ãŠãäžåºŠã«éžæããŠãŒãã«ãªã»ããããæ£ããé ç®ãã¯ãªãã¯ããŠã¿ãŸãããã

æž æåŸã®ã¿ã¯ã·ãŒã¯æ¬¡ã®ããã«ãªããŸã (ã¿ã¯ã·ãŒã¯ãã§ã«ã¹ã±ãžã¥ãŒã©ãŒãã¹ã±ãžã¥ãŒã«ãèšå®ããã®ãåŸ ã£ãŠããŸã)ã

æ¥ç¶ãããã¯ããã®ä»ã®å€æ°
次㮠DAG ãèŠãŠã¿ãŸãããã update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ÐПÑпПЎа Ñ
ПÑПÑОе, ПÑÑеÑÑ ÐŸÐ±ÐœÐŸÐ²Ð»ÐµÐœÑ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ÐаÑаÑ, пÑПÑÑпайÑÑ, ÐŒÑ {{ dag.dag_id }} ÑÑПМОлО
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]çããã¯ã¬ããŒãæŽæ°ãããããšããããŸãã? ããã圌女ã®è©±ã§ããããŒã¿ãååŸãããœãŒã¹ã®ãªã¹ãããããŸãã ã©ãã«çœ®ãããªã¹ãããããŸãã äœããèµ·ãã£ããå£ããããããšãã¯ãã¯ã©ã¯ã·ã§ã³ã鳎ããããšãå¿ããªãã§ãã ããïŒãŸããããã¯ç§ãã¡ã®ããšã§ã¯ãããŸããããããïŒã
ãã¡ã€ã«ãããäžåºŠèª¿ã¹ãŠãæ°ãã«äžæçã«ãªã£ãç¹ãèŠãŠã¿ãŸãããã
from commons.operators import TelegramBotSendMessage- ç¬èªã®ãªãã¬ãŒã¿ãŒãäœæããããšã劚ãããã®ã¯äœããããŸããããããå©çšããŠãUnblocked ã«ã¡ãã»ãŒãžãéä¿¡ããããã®å°ããªã©ãããŒãäœæããŸããã (ãã®æŒç®åã«ã€ããŠã¯åŸã§è©³ãã説æããŸã)ãdefault_args={}- dag ã¯ããã¹ãŠã®æŒç®åã«åãåŒæ°ãé åžã§ããŸããto='{{ var.value.all_the_kings_men }}'- åétoããŒãã³ãŒãã£ã³ã°ã¯ããŸããããJinja ãšé»åã¡ãŒã«ã®ãªã¹ããå«ã倿°ã䜿çšããŠåçã«çæããŸãããããæ éã«å ¥åããŸãããAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESSâ ãªãã¬ãŒã¿ãŒãéå§ããããã®æ¡ä»¶ã ç§ãã¡ã®å Žåããã¹ãŠã®äŸåé¢ä¿ãããŸããã£ãå Žåã«ã®ã¿ãæçŽãäžåžã«å±ããŸãã éŠå°Ÿãã;tg_bot_conn_id='tg_main'- åŒæ°conn_idã§äœæããæ¥ç¶ ID ãåãå ¥ããŸãAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Telegram å ã®ã¡ãã»ãŒãžã¯ãèœã¡ãã¿ã¹ã¯ãããå Žåã«ã®ã¿é£ã³å»ããŸããtask_concurrency=1- XNUMX ã€ã®ã¿ã¹ã¯ã®è€æ°ã®ã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ãåæã«èµ·åããããšã¯çŠæ¢ãããŠããŸãã ããããªããšãè€æ°ã®è£œåãåæã«çºå£²ãããããšã«ãªããŸããVerticaOperatorïŒXNUMXã€ã®ããŒãã«ãèŠãªããïŒreport_update >> [email, tg]- å šãŠVerticaOperator次ã®ãããªæçŽãã¡ãã»ãŒãžã®éä¿¡ã«éäžããŸãã

ãã ããNotifier ãªãã¬ãŒã¿ãŒã¯èµ·åæ¡ä»¶ãç°ãªããããæ©èœããã®ã¯ XNUMX ã€ã ãã§ãã ããªãŒ ãã¥ãŒã§ã¯ããã¹ãŠãå°ãèŠèŠçã«å£ã£ãŠèŠããŸãã

ã«ã€ããŠå°ãã話ããŸã ãã¯ã ãããŠåœŒãã®å人ãã¡ - 倿°.
ãã¯ãã¯ãããŸããŸãªæçšãªæ å ±ãæŒç®åã®åŒæ°ã«çœ®ãæããããšãã§ãã Jinja ãã¬ãŒã¹ãã«ããŒã§ãã ããšãã°ã次ã®ããã«ãªããŸãã
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} ã³ã³ããã¹ã倿°ã®å
容ã«å±éãããŸã execution_date 圢åŒã§ YYYY-MM-DD: 2020-07-14ã æãåªããŠããç¹ã¯ãã³ã³ããã¹ã倿°ãç¹å®ã®ã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ (ããªãŒ ãã¥ãŒã®åè§åœ¢) ã«åºå®ãããŠãããåèµ·åãããšãã¬ãŒã¹ãã«ããŒãåãå€ã«å±éãããããšã§ãã
å²ãåœãŠãããå€ã¯ãåã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ã® [ã¬ã³ããªã³ã°] ãã¿ã³ã䜿çšããŠè¡šç€ºã§ããŸãã æçŽãéãã¿ã¹ã¯ã¯æ¬¡ã®ããã«ãªããŸãã

ãããŠãã¡ãã»ãŒãžãéä¿¡ããã¿ã¹ã¯ã§ã次ã®ããã«ãªããŸãã

å©çšå¯èœãªææ°ããŒãžã§ã³ã®çµã¿èŸŒã¿ãã¯ãã®å®å šãªãªã¹ãã¯ãããããå ¥æã§ããŸãã
ããã«ããã©ã°ã€ã³ã䜿çšãããšç¬èªã®ãã¯ãã宣èšã§ããŸãããããã¯ãŸãå¥ã®è©±ã§ãã
äºåå®çŸ©ããããã®ã«å ããŠã倿°ã®å€ã眮ãæããããšãã§ããŸã (ããã¯äžèšã®ã³ãŒãã§ãã§ã«äœ¿çšããŠããŸã)ã ã§äœæããŸããã Admin/Variables ããã€ãã®ããš:

䜿ãããã®ã¯ãã¹ãŠïŒ
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')å€ã¯ã¹ã«ã©ãŒã«ããããšããJSON ã«ããããšãã§ããŸãã JSONã®å Žå:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}ç®çã®ããŒãžã®ãã¹ã䜿çšããã ãã§ãã {{ var.json.bot_config.bot.token }}.
æåéãäžèšèšã£ãŠãã¹ã¯ãªãŒã³ã·ã§ããã XNUMX æèŠããŸãã æ¥ç¶ã ããã§ã¯ãã¹ãŠãåºæ¬çãªãã®ã§ã: ããŒãžäž Admin/Connections æ¥ç¶ãäœæããããã«ãã°ã€ã³/ãã¹ã¯ãŒããšããå
·äœçãªãã©ã¡ãŒã¿ã远å ããŸãã ãã®ãããªïŒ

ãã¹ã¯ãŒã㯠(ããã©ã«ãããã培åºçã«) æå·åããããšãã(ç§ãè¡ã£ãããã«) æ¥ç¶ã¿ã€ããçç¥ããããšãã§ããŸãã tg_main) - å®éã®ãšãããã¿ã€ãã®ãªã¹ã㯠Airflow ã¢ãã«ã«çµã¿èŸŒãŸããŠããããœãŒã¹ ã³ãŒãã«ã¢ã¯ã»ã¹ããªãéãæ¡åŒµããããšã¯ã§ããŸãã (çªç¶äœããã°ãŒã°ã«ã§æ€çŽ¢ããªãã£ãå Žåã¯ãä¿®æ£ããŠãã ãã) ãããã ããã ãã§ã¯ã¬ãžãããååŸããããšã劚ãããã®ã¯äœããããŸãããååã
åãååã§è€æ°ã®æ¥ç¶ãäœæããããšãã§ããŸãããã®å Žåãã¡ãœãã㯠BaseHook.get_connection()ãååã§æ¥ç¶ãååŸããŸãã ã©ã³ãã ããã€ãã®ååè
ããã®ååã§ã (ã©ãŠã³ãããã³ãäœæããæ¹ãè«ççã§ããããã㯠Airflow éçºè
ã®è¯å¿ã«ä»»ããŸããã)ã
倿°ãšæ¥ç¶ã¯ç¢ºãã«åªããããŒã«ã§ããããããŒã®ã©ã®éšåãã³ãŒãèªäœã«ä¿åããã©ã®éšåãä¿åã®ããã« Airflow ã«æž¡ãããšãããã©ã³ã¹ã倱ããªãããšãéèŠã§ãã äžæ¹ã§ãUI ãéããŠã¡ãŒã« ããã¯ã¹ãªã©ã®å€ããã°ãã倿Žã§ãããšäŸ¿å©ã§ãã äžæ¹ã§ãããã¯äŸç¶ãšããŠãç§ãã¡ (ç§) ãæé€ãããã£ãããŠã¹ ã¯ãªãã¯ãžã®ååž°ã§ãã
æ¥ç¶ã®æäœãã¿ã¹ã¯ã® XNUMX ã€ã§ã ããã¯ã äžè¬ã«ãAirflow ããã¯ã¯ããµãŒãããŒãã£ã®ãµãŒãã¹ãã©ã€ãã©ãªã«æ¥ç¶ããããã®ãã€ã³ãã§ãã äŸãã°ã JiraHook Jira ãšå¯Ÿè©±ããããã®ã¯ã©ã€ã¢ã³ããéããŸã (ã¿ã¹ã¯ãååŸã«ç§»åã§ããŸã)ã SambaHook ããŒã«ã«ãã¡ã€ã«ãããã·ã¥ã§ããŸã smb-ç¹ã
ã«ã¹ã¿ã æŒç®åã®è§£æ
ãããŠãç§ãã¡ã¯ãããã©ã®ããã«äœãããŠããããèŠãããšã«è¿ã¥ããŸãã TelegramBotSendMessage
ã³ãŒã commons/operators.py å®éã®æŒç®åã䜿çšããŠ:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)ããã§ã¯ãAirflow ã®ä»ã®ãã¹ãŠãšåæ§ã«ããã¹ãŠãéåžžã«ã·ã³ãã«ã§ãã
- ç¶æ¿å
BaseOperatorããªãã®æ°ã® Airflow åºæã®æ©èœãå®è£ ãããŠããŸã (æãªãšãã«èŠãŠãã ãã) - 宣èšããããã£ãŒã«ã
template_fieldsããã§ãJinja ã¯åŠçãããã¯ããæ¢ããŸãã - ïœã«å¯Ÿããæ£ããè°è«ãæŽçãã
__init__()ãå¿ èŠã«å¿ããŠããã©ã«ããèšå®ããŸãã - ç¥å ã®åæåãå¿ããŠããŸããã
- 察å¿ããããã¯ãéãã
TelegramBotHookããããã¯ã©ã€ã¢ã³ããªããžã§ã¯ããåãåããŸããã - ãªãŒããŒã©ã€ãããã (åå®çŸ©ããã) ã¡ãœãã
BaseOperator.execute()ããªãã¬ãŒã¿ãŒãèµ·åããæéãæ¥ããšãAirfowãããããããŸã - ãã®äžã«ããã°ã€ã³ãå¿ããŠã¡ã€ã³ã¢ã¯ã·ã§ã³ãå®è£ ããŸãã ïŒã¡ãªã¿ã«ãç§ãã¡ã¯ãã°ã€ã³ããŠããŸãstdoutОstderr- æ°æµããããããã®ã鮿ããçŸããå ã¿èŸŒã¿ãå¿ èŠã«å¿ããŠåè§£ããŸãã)
äœããããèŠãŠã¿ãŸããã commons/hooks.pyã ãã¡ã€ã«ã®æåã®éšåãšããã¯èªäœã¯æ¬¡ã®ãšããã§ãã
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientããã§äœã説æããã°ããã®ãããããŸããããéèŠãªç¹ã ããã¡ã¢ããŠãããŸãã
- ç¶æ¿ããåŒæ°ã«ã€ããŠèããŸã - ã»ãšãã©ã®å ŽåãåŒæ°ã¯æ¬¡ã® XNUMX ã€ã«ãªããŸãã
conn_id; - æšæºçãªæ¹æ³ããªãŒããŒã©ã€ããã: èªåèªèº«ã«éçãèšãã
get_conn()ãããã§ã¯æ¥ç¶ãã©ã¡ãŒã¿ãååã§ååŸããã»ã¯ã·ã§ã³ãååŸããã ãã§ãextra(ãã㯠JSON ãã£ãŒã«ãã§ã)ãããã« (ç§èªèº«ã®æç€ºã«åŸã£ãŠ!) Telegram ããã ããŒã¯ã³ãé 眮ããŸãã{"bot_token": "YOuRAwEsomeBOtToKen"}. - ç§ãã¡ã®ã€ã³ã¹ã¿ã³ã¹ãäœæããŸã
TelegramBotãç¹å®ã®ããŒã¯ã³ãäžããŸãã
ããã§å
šéšã§ãã æ¬¡ã䜿çšããŠããã¯ããã¯ã©ã€ã¢ã³ããååŸã§ããŸã TelegramBotHook().clent ãŸã㯠TelegramBotHook().get_conn().
ãã¡ã€ã«ã® XNUMX çªç®ã®éšåã§ã¯ãåããã®ããã©ãã°ããªãããã« Telegram REST API ã®ãã€ã¯ãã©ãããŒãäœæããŸãã XNUMXã€ã®æ¹æ³ã«ã€ã㊠sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))æ£ããæ¹æ³ã¯ããã¹ãŠãåèšããããšã§ãã
TelegramBotSendMessage,TelegramBotHook,TelegramBot- ãã©ã°ã€ã³ããããªãã¯ãªããžããªã«çœ®ãããªãŒãã³ãœãŒã¹ã«æäŸããŸãã
ããããã¹ãŠã調æ»ããŠããéãã¬ããŒãã®æŽæ°ã¯æ£åžžã«å€±æãããã£ãã«ã§ãšã©ãŒ ã¡ãã»ãŒãžãéä¿¡ãããŸããã ééã£ãŠãªãã確èªããŠã¿ãŸãâŠ

ç§ãã¡ã®ç¬ã®äžã§äœããå£ããŸããïŒ ããã¯ç§ãã¡ãæåŸ
ããŠããããšã§ã¯ãããŸãããïŒ ãã®éãïŒ
泚ãã€ããã§ããïŒ
äœããèŠéãããããªæ°ãããŸããïŒ SQL Server ãã Vertica ã«ããŒã¿ã転éãããšçŽæããŠããããã§ããããããåãåã£ãŠæ¬é¡ããéžããŠããŸããŸãããæªå !
ãã®æ®èè¡çºã¯æå³çãªãã®ã§ãç§ã¯ããªãã®ããã«ããã€ãã®çšèªãè§£èªããå¿ èŠããã£ãã ãã§ãã ããã§ããã«å ãžé²ãããšãã§ããŸãã
ç§ãã¡ã®èšç»ã¯æ¬¡ã®ãšããã§ããã
- ãã°ããŠãã ãã
- ã¿ã¹ã¯ã®çæ
- ãã¹ãŠãã©ãã»ã©çŸããããèŠãŠãã ãã
- ãã£ã«ã«ã»ãã·ã§ã³çªå·ãå²ãåœãŠã
- SQL ServerããããŒã¿ãååŸãã
- ããŒã¿ã Vertica ã«å ¥ãã
- çµ±èšã®åé
ããã§ãããããã¹ãŠå®è¡ã§ããããã«ããããã«ã次ã®å
容ã«å°ããªè¿œå ãå ããŸââããã docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyããã§ç§ãã¡ã¯æ¬¡ã®ããã«æèµ·ããŸãã
- ãã¹ããšããŠã® Vertica
dwhã»ãšãã©ã®ããã©ã«ãèšå®ã§ã¯ã - SQL Server ã® XNUMX ã€ã®ã€ã³ã¹ã¿ã³ã¹ã
- åŸè
ã®ããŒã¿ããŒã¹ã«ããŒã¿ãå
¥åããŸãïŒãããªãå Žåã調ã¹ãŸããïŒ
mssql_init.py!)
ååãããå°ãè€éãªã³ãã³ãã䜿çšããŠããã¹ãŠã®æ©èœãèµ·åããŸãã
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3ç§ãã¡ã®å¥è·¡ã®ã©ã³ããã€ã¶ãŒãçæããã¢ã€ãã ã䜿çšã§ããŸã Data Profiling/Ad Hoc Query:

éèŠãªã®ã¯ããããã¢ããªã¹ãã«èŠããªãããšã§ã
詳ããè¿°ã¹ãŸã ETLã»ãã·ã§ã³ ããŸãããããã§ã¯ãã¹ãŠãäºçްãªããšã§ããããŒã¹ãäœæãããã®äžã«æšèãããããã¹ãŠãã³ã³ããã¹ã ãããŒãžã£ãŒã§ã©ããããŠã次ã®ããšãè¡ããŸãã
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15ã»ãã·ã§ã³.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passæãæ¥ã ç§ãã¡ã®ããŒã¿ãåéãã XNUMX ã®ããŒãã«ããã éåžžã«æ°åããªãè¡ã䜿çšããŠãããå®è¡ããŠã¿ãŸãããã
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Airflow ããå
¥æããããã¯ãå©çšããŠ
pymssql-æ¥ç¶ - æ¥ä»ã®åœ¢åŒã®å¶éããªã¯ãšã¹ãã«çœ®ãæããŠã¿ãŸããããå¶éã¯ãã³ãã¬ãŒã ãšã³ãžã³ã«ãã£ãŠé¢æ°ã«ã¹ããŒãããŸãã
- ãªã¯ãšã¹ãããã£ãŒããã
pandas誰ãç§ãã¡ãæãŸããŠãããã®ãDataFrame- å°æ¥çã«ã¯åœ¹ã«ç«ã¡ãŸãã
代æ¿åã䜿çšããŠããŸã
{dt}ãªã¯ãšã¹ããã©ã¡ãŒã¿ã®ä»£ããã«%sç§ãæªãããããªã ããã§ã¯ãªããpandas察åŠã§ããªãpymssqlãããŠæåŸã®ãã®ãæ»ããŸãparams: ListåœŒã¯æ¬åœã«æãã§ããã®ã«tuple.
éçºè ã«ã泚æããŠãã ããpymssqlãã圌ããµããŒãããªãããšã«æ±ºããããããŠåŒã£è¶ãã®æãæ¥ãpyodbc.
Airflow ã颿°ã®åŒæ°ã«äœãè©°ã蟌ãã ããèŠãŠã¿ãŸãããã

ããŒã¿ããªãå Žåã¯ãç¶è¡ããæå³ããããŸããã ããããå å¡«ãæåãããšèããã®ãå¥åŠã§ãã ããããããã¯ééãã§ã¯ãããŸããã ããããããã©ãããã°ããã§ããïŒ ãããŠããããæ¬¡ã®ãšããã§ãã
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException Airflow ã«ãšã©ãŒããªãããšãäŒããŸãããã¿ã¹ã¯ã¯ã¹ãããããŸãã ã€ã³ã¿ãŒãã§ã€ã¹ã«ã¯ç·ãèµ€ã®åè§åœ¢ã¯ãªãããã³ã¯è²ã«ãªããŸãã
ããŒã¿ãæšãŠãŸããã è€æ°ã®å:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])ããªãã¡ïŒ
- 泚æãååŸããããŒã¿ããŒã¹ã
- ãã©ããã£ã³ã° ã»ãã·ã§ã³ã® ID (ããã¯ç°ãªããŸã) ããããã¿ã¹ã¯ã«å¯ŸããŠ),
- ãœãŒã¹ãšæ³šæ ID ããã®ããã·ã¥ - æçµçãªããŒã¿ããŒã¹ (ãã¹ãŠã XNUMX ã€ã®ããŒãã«ã«æ³šããã) ã§ã¯ãäžæã®æ³šæ ID ãåŸãããŸãã
æåŸãã XNUMX çªç®ã®ã¹ããããæ®ã£ãŠããŸãããã¹ãŠã Vertica ã«æ³šããŸãã ãããŠãå¥åŠãªããšã«ããããè¡ãæã广çã§å¹æçãªæ¹æ³ã® XNUMX ã€ã¯ CSV ã䜿çšããããšã§ãã
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- å°çšã®åä¿¡æ©ã補äœäžã§ã
StringIO. pandas芪åã«ç§ãã¡ã®ã眮ããŸãDataFrameã®åœ¢ã§CSV-è¡ã- ããã¯ã䜿çšããŠãæ°ã«å ¥ãã® Vertica ãžã®æ¥ç¶ãéããŠã¿ãŸãããã
- ãããŠä»ãå©ããåããŠ
copy()ããŒã¿ã Vertika ã«çŽæ¥éä¿¡ããŠãã ããã
ãã©ã€ããŒããäœè¡ãåãŸã£ãŠããããååŸããã»ãã·ã§ã³ ãããŒãžã£ãŒã«ãã¹ãŠãæ£åžžã§ããããšãäŒããŸãã
session.loaded_rows = cursor.rowcount
session.successful = Trueããã ãã§ãã
販売ã§ã¯ã¿ãŒã²ãããã¬ãŒããæäœæ¥ã§äœæããŠãããŸãã ããã§ç§ã¯èªåèªèº«ã«å°ããªãã·ã³ãèš±å¯ããŸããã
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)䜿ã£ãŠããŸã
VerticaOperator()ããŒã¿ããŒã¹ ã¹ããŒããšããŒãã«ãäœæããŸã (ãã¡ããããŸã ååšããªãå Žå)ã äž»ãªããšã¯ãäŸåé¢ä¿ãæ£ããæŽçããããšã§ãã
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadèŠçŽ
- ããã§ããã - å°ããªããºãã¯èšããŸããã - ãããããªãã§ãããä»
ç§ã森ã®äžã§æãæãããåç©ã ãšç¢ºä¿¡ããŠããŸããïŒ
ãžã¥ãªã¢ã»ããã«ããœã³ãã¶ã»ã°ã©ãã¡ããŒã
ååãšç§ãç«¶äºãããšãããã誰ãããã« ETL ããã»ã¹ãæåããäœæããŠèµ·åãããã圌ã㯠SSIS ãšããŠã¹ãç§ã¯ Airflow ãæã£ãŠãããšæããŸã...ãããŠãã¡ã³ããã³ã¹ã®ãããããæ¯èŒããã§ããã...ãããç§ãããããé¢ã§åœŒããåãããšã«ããªããåæããŠããããšæããŸã!
ããå°ãçå£ã«èããã°ãApache Airflow ã¯ããã»ã¹ãããã°ã©ã ã³ãŒãã®åœ¢ã§èšè¿°ããããšã§ç§ã®åœ¹å²ãæãããŸããã ãã£ãš ãã£ãšå¿«é©ã«ããã£ãšæ¥œããã
ãã©ã°ã€ã³ãšã¹ã±ãŒã©ããªãã£ã®äž¡æ¹ã®ç¹ã§ããã®ç¡å¶éã®æ¡åŒµæ§ã«ãããããŒã¿ã®åéãæºåãåŠçã®å šãµã€ã¯ã«ãããã«ã¯ç«æãžã®ãã±ããã®æã¡äžããªã©ãã»ãŒããããåéã§ Airflow ã䜿çšããæ©äŒãåŸãããŸããã³ãŒã¹ïŒã
æçµããŒããåèè³æããã³æ å ±
ç§ãã¡ãããªãã®ããã«éããçæ
start_dateã ã¯ããããã¯ãã§ã«ããŒã«ã«ããŒã ã§ãã Doug ã®äž»ãªè°è«çµç±start_dateãã¹ãŠåæ ŒããŸãã ç°¡åã«èšããšãstart_dateçŸåšã®æ¥ä»ãããã³schedule_interval- ããæ¥ãDAG ã¯ææ¥ããå§ãŸããŸããstart_date = datetime(2020, 7, 7, 0, 1, 2)ããåé¡ã¯ãããŸããã
ããã«é¢é£ããå¥ã®å®è¡æãšã©ãŒããããŸãã
Task is missing the start_date parameterããã¯ãã»ãšãã©ã®å Žåãdag ãªãã¬ãŒã¿ãŒã«ãã€ã³ãããã®ãå¿ããããšã瀺ããŸãã- ãã¹ãŠ XNUMX å°ã®ãã·ã³äžã§ã ã¯ããããŒã¹ (Airflow èªäœãšåœç€Ÿã®ã³ãŒãã£ã³ã°)ãWeb ãµãŒããŒãã¹ã±ãžã¥ãŒã©ãŒãã¯ãŒã«ãŒãå«ãŸããŸãã ãããŠããã¯ããŸããããŸããã ããããæéã®çµéãšãšãã«ãµãŒãã¹ã®ã¿ã¹ã¯ã®æ°ãå¢å ããPostgreSQL ãã€ã³ããã¯ã¹ã« 20 ããªç§ã§ã¯ãªã 5 ç§ã§å¿çãå§ãããšããç§ãã¡ã¯ãããåãäžããŠæã¡å»ããŸããã
- ããŒã«ã«ãšã°ãŒãã¥ãŒã¿ã ã¯ããç§ãã¡ã¯ãŸã ãã®äžã«åº§ã£ãŠããŸãããããŠç§ãã¡ã¯ãã§ã«æ·±æ·µã®ç«¯ã«æ¥ãŠããŸãã ãããŸã§ã¯ LocalExecutor ã§ååã§ããããä»åºŠã¯å°ãªããšã XNUMX 人ã®ã¯ãŒã«ãŒã远å ããŠæ¡åŒµããå¿ èŠããããCeleryExecutor ã«ç§»è¡ããããã«æžåœã«åªåããå¿ èŠããããŸãã ãããŠãXNUMX å°ã®ãã·ã³äžã§äœæ¥ã§ãããšããäºå®ãèæ ®ãããšãããšããµãŒããŒäžã§ãã£ãŠã Celery ã䜿çšããããšã劚ãããã®ã¯äœããããŸãããããã¡ãããæ£çŽã«èšã£ãŠãæ¬çªç°å¢ã«å°å ¥ãããããšã¯ãããŸãã!ã
- äžäœ¿çš çµã¿èŸŒã¿ããŒã«:
- ã€ãªãã ãµãŒãã¹è³æ Œæ å ±ãä¿åããããã
- SLA ãã¹ æéå ã«å®äºããªãã£ãã¿ã¹ã¯ã«å¯Ÿå¿ããããã
- ãšãã¯ã¹ã³ã ã¡ã¿ããŒã¿äº€æã®ããïŒç§ã¯èšããŸããïŒ ã¡ã¿ããŒã¿!) DAG ã¿ã¹ã¯éã§ã
- ã¡ãŒã«èåŸ ã ããŠãäœãšèšãã°ããã§ããããïŒ å€±æããã¿ã¹ã¯ã®ãã¹ãŠã®ç¹°ãè¿ãã«å¯ŸããŠã¢ã©ãŒããèšå®ãããŸããã çŸåšãç§ã®è·å Žã® Gmail ã«ã¯ Airflow ããã®ã¡ãŒã«ã 90 ä»¶ãè¶ ããŠããŸãããWeb ã¡ãŒã«ã®éå£ã§ã¯äžåºŠã« 100 ä»¶ãè¶ ããã¡ãŒã«ã®åä¿¡ãšåé€ãæåŠãããŠããŸãã
ããã«èœãšã穎:
ãã®ä»ã®èªååããŒã«
ç§ãã¡ãæã䜿ããã«é ã䜿ã£ãŠããã«äœæ¥ã§ããããã«ãAirflow ã¯ä»¥äžãçšæããŸããã
- - 圌ã¯ãŸã å®é𿮵éã®ã¹ããŒã¿ã¹ãä¿æããŠãããããäœæ¥ã劚ããããããšã¯ãããŸããã ããã䜿çšãããšãDAG ãšã¿ã¹ã¯ã«é¢ããæ å ±ãååŸã§ããã ãã§ãªããDAG ã®åæ¢/éå§ãDAG å®è¡ãŸãã¯ããŒã«ã®äœæãã§ããŸãã
- - å€ãã®ããŒã«ã¯ã³ãã³ã ã©ã€ã³ããå©çšã§ããŸãããWebUI ãã䜿çšããã®ãäžäŸ¿ãªã ãã§ãªããéåžžã¯ååšããŸããã äŸãã°ïŒ
backfillã¿ã¹ã¯ã€ã³ã¹ã¿ã³ã¹ãåèµ·åããããã«å¿ èŠã§ãã
ããšãã°ãã¢ããªã¹ãããã£ãŠæ¥ãŠããèšããŸãããããããŠåå¿ãããªãã¯1æ13æ¥ããXNUMXæ¥ãŸã§ã®ããŒã¿ã«ãã³ã»ã³ã¹ããããŸãïŒ çŽããŠãçŽããŠãçŽããŠãçŽããŠïŒã ãããŠãããªãã¯ãšãŠãè¶£å³ã®è¯ã人ã§ããairflow backfill -s '2020-01-01' -e '2020-01-13' orders- åºæ¬ãµãŒãã¹:
initdb,resetdb,upgradedb,checkdb. runããã«ãããXNUMX ã€ã®ã€ã³ã¹ã¿ã³ã¹ ã¿ã¹ã¯ãå®è¡ãããã¹ãŠã®äŸåé¢ä¿ã®ã¹ã³ã¢ãååŸããããšãã§ããŸãã ããã«ã次çµç±ã§å®è¡ã§ããŸãLocalExecutorã»ããªã®ã¯ã©ã¹ã¿ãŒãããå Žåã§ãã- ã»ãŒåãããšãããŸã
testãããŒã¹ã®ã¿ã«ãäœãæžã蟌ã¿ãŸããã connectionsã·ã§ã«ããæ¥ç¶ã倧éã«äœæã§ããŸãã
- - ãã©ã°ã€ã³ã察象ãšããããªãããŒãã³ã¢ãªå¯Ÿè©±æ¹æ³ã§ãããå°ããªæã§ããã«çŸ€ããããšã¯ãããŸããã ããããç§ãã¡ãè¡ãã®ãèª°ãæ¢ããã®ã§ãããã
/home/airflow/dagsã èµ°ãipythonãããŠãããå§ããŸããïŒ ããšãã°ã次ã®ã³ãŒãã䜿çšããŠãã¹ãŠã®æ¥ç¶ããšã¯ã¹ããŒãã§ããŸããfrom airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Airflow ã¡ã¿ããŒã¿ããŒã¹ã«æ¥ç¶ããŸãã ããã«æžã蟌ãããšã¯ãå§ãããŸããããããŸããŸãªç¹å®ã®ã¡ããªã¯ã¹ã®ã¿ã¹ã¯ç¶æ
ãååŸããããšã¯ãAPI ã䜿çšãããããã¯ããã«é«éãã€ç°¡åã§ãã
ãã¹ãŠã®ã¿ã¹ã¯ãåªçã§ããããã§ã¯ãããŸããããå Žåã«ãã£ãŠã¯å€±æããå¯èœæ§ããããããã¯æ£åžžã§ãããšããŸãã ãã ããããã€ãã®è©°ãŸãã¯ãã§ã«çãããããã確èªããå¿ èŠããããŸãã
SQLã«æ³šæããŠãã ãã!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ãªãã¡ã¬ã³ã¹
ãããŠãã¡ãããGoogle ãçºè¡ããæåã® XNUMX åã®ãªã³ã¯ã¯ãç§ã®ããã¯ããŒã¯ã® Airflow ãã©ã«ããŒã®å 容ã§ãã
- - ãã¡ããããªãã£ã¹ããå§ããªããã°ãªããŸããã ããã¥ã¢ã«ã¯ãããŸããã誰ããã®èª¬ææžãèªãã®ã§ãããã?
- - ãŸããå°ãªããšãã¯ãªãšã€ã¿ãŒããã®æšå¥šäºé ãèªãã§ãã ããã
- - ãŸãã«å§ãŸã: åçã®ãŠãŒã¶ãŒã€ã³ã¿ãŒãã§ã€ã¹
- - (çªç¶!) ç§ã®èª¬æã§äœãçè§£ã§ããªãã£ããšããŠããåºæ¬çãªæŠå¿µã¯ååã«èª¬æãããŠããŸãã
- - Airflow ã¯ã©ã¹ã¿ãŒãã»ããã¢ããããããã®çãã¬ã€ãã
- - ãããã圢åŒäž»çŸ©ãå€ããäŸãå°ãªãããšãé€ããŠãã»ãŒåãè峿·±ãèšäºã§ãã
- â Celery ãšã®é£æºã«ã€ããŠã
- - ã¿ã¹ã¯ã®ã¹ãçæ§ãæ¥ä»ã§ã¯ãªã ID ã«ããããŒãã倿ããã¡ã€ã«æ§é ããã®ä»ã®è峿·±ãç¹ã«ã€ããŠã
- - ã¿ã¹ã¯ãšããªã¬ãŒ ã«ãŒã«ã®äŸåé¢ä¿ãããã¯ã€ãã§ã«è§ŠããŸããã
- - ã¹ã±ãžã¥ãŒã©ã®äžéšã®ãæå³ãããšããã«åäœãããåé¡ã解決ãã倱ãããããŒã¿ãããŠã³ããŒãããã¿ã¹ã¯ã«åªå é äœãä»ããæ¹æ³ã
- â Airflow ã¡ã¿ããŒã¿ã«å¯Ÿãã䟿å©ãª SQL ã¯ãšãªã
- - ã«ã¹ã¿ã ã»ã³ãµãŒã®äœæã«é¢ãã圹ç«ã€ã»ã¯ã·ã§ã³ããããŸãã
- â ããŒã¿ãµã€ãšã³ã¹çšã® AWS ã§ã®ã€ã³ãã©ã¹ãã©ã¯ãã£ã®æ§ç¯ã«é¢ããè峿·±ãçãã¡ã¢ã
- - ããããééãïŒèª°ãããŸã èª¬ææžãèªãã§ããªãå ŽåïŒã
- - Connections ã䜿çšããããšãã§ããŸããããã¹ã¯ãŒããä¿åããã®ã«èŠåŽããŠãã人ã ã®æ§åã«åŸ®ç¬ãã§ãã ããã
- - æé»ç㪠DAG 転éã颿°ã§ã®ã³ã³ããã¹ãã®ã¹ããŒãäŸåé¢ä¿ãããã³ã¿ã¹ã¯èµ·åã®ã¹ãããã«ã€ããŠã説æããŸãã
- - ãå©çšã«ã€ããŠ
default argumentsОparamsãã³ãã¬ãŒãã ãã§ãªãã倿°ãæ¥ç¶ã§ã䜿çšã§ããŸãã - - ãã©ã³ããŒã Airflow 2.0 ã«åããŠã©ã®ããã«æºåããŠãããã«ã€ããŠã®è©±ã
- - ã¯ã©ã¹ã¿ãŒã®ãããã€ã«é¢ããå°ãå€ãèšäº
docker-compose. - - ãã³ãã¬ãŒããšã³ã³ããã¹ã転éã䜿çšããåçã¿ã¹ã¯ã
- â ã¡ãŒã«ãš Slack ã«ããæšæºããã³ã«ã¹ã¿ã éç¥ã
- - ã¿ã¹ã¯ããã¯ããXCom ã®åå²ã
èšäºå ã§äœ¿çšãããŠãããªã³ã¯:
- - ãã³ãã¬ãŒãã§äœ¿çšã§ãããã¬ãŒã¹ãã«ããŒã
- â ããã°ãäœæãããšãã«ããããééãã
- -
docker-composeå®éšããããã°ãªã©ã«ã - â Telegram REST API ã® Python ã©ãããŒã
åºæïŒ habr.com




