ืืขืื, ืืื ืืื ืืืืืจื ืืึธืืืืื ืขื ืงืึธ - ืืึทืืึท ืื ืืฉืขื ืืจ ืคืื ืื ืึทื ืึทืืืืืงืก ืึธืคึผืืืืืื ื ืคืื ืื ืืืขืืขื ืืจืืคึผืข ืคืื โโืงืึธืืคึผืึทื ืืขืก.
ืืื ืืืขื ืืึธืื ืืืจ ืืืขืื ืึท ืืจืืืก ืืขืฆืืึทื ืคึฟืึทืจ ืืขืืืขืืึธืคึผืื ื ETL ืคึผืจืึทืกืขืกืึทื - Apache Airflow. ืึธืืขืจ ืึทืืจืคืืึธืื ืืื ืึทืืื ืืืขืจืกืึทืืึทื ืืื ืืึทืืืืคืึทืกืึทืืื ืึทื ืืืจ ืืึธื ื ืขืืขื ืึท ื ืขืขื ืืขืจ ืงืืง ืืื ืขืก ืืคืืื ืืืื ืืืจ ืืขื ื ื ืืฉื ืื ืืืึทืืืื ืืื ืืึทืื ืคืืึธืื, ืึธืืขืจ ืืืจ ืืึทืจืคึฟื ืฆื ืคึผืืจืืึทืืืงืื ืงืึทืืขืจ ืงืืื ืคึผืจืึทืกืขืกืึทื ืืื ืืึธื ืืืึธืจ ืืืืขืจ ืืืจืืคืืจืื ื.
ืืื ืืึธ, ืืื ืืืขื ื ืืฉื ืืืืื ืืึธืื ืืืจ, ืึธืืขืจ ืืืื ืืืืึทืื ืืืจ: ืื ืคึผืจืึธืืจืึทื ืึผืืื ืึท ืคึผืืึทืฅ ืคืื ืงืึธื, ืกืงืจืขืขื ืฉืึธืฅ ืืื ืจืขืงืึทืืึทื ืืืืฉืึทื ื.

ืืืึธืก ืืืจ ืืืืฉืึทืืืึทืื ืืขื ืืืขื ืืืจ ืืืื ืื ืืืึธืจื Airflow / Wikimedia Commons
ืืืฉ ืคืื ืืื ืืึทืื
ืืงืืื
ืึทืคึผืึทืืฉื ืึทืืจืคืืึธืื ืืื ืคึผืื ืงื ืืื Django:
- ืืขืฉืจืืื ืืื ืคึผืืืืึธื,
- ืขืก ืืื ืึท ืืืกืืขืฆืืืื ื ืึทืืืื ืืกืืจืึทืืึธืจ,
- ืึทื ืืืืึทืืึทืืื ืืงืกืคึผืึทื ืืึทืืึทื
- ื ืึธืจ ืืขืกืขืจ, ืืื ืืขืืืื ืคึฟืึทืจ ืืึธืจ ืคืึทืจืฉืืืขื ืข ืฆืืืขืงื, ื ืืืืื (ืืื ืืขืฉืจืืื ืืืืืขืจ ืื ืงืึทื):
- ืงืึทืืขืจ ืืื ืืึธื ืืืึธืจืื ื ืืึทืกืงืก ืืืืฃ ืึท ืึทื ืืืืึทืืึทื ื ืืืขืจ ืคืื ืืืฉืื ืขื (ืืื ืคืืืข ืกืขืืืขืจืื / ืงืืืขืจื ืขืืขืก ืืื ืืืื ืืขืืืืกื ืืืขื ืืึธืื ืืืจ)
- ืืื ืืื ืึทืืืฉ ืืืึธืจืงืคืืึธืื ืืืจ ืคืื ืืืืขืจ ืืจืื ื ืฆื ืฉืจืืึทืื ืืื ืคึฟืึทืจืฉืืืื ืคึผืืืืึธื ืงืึธื
- ืืื ืื ืคืืืืงืืื ืฆื ืคืึทืจืืื ืื ืงืืื ืืึทืืึทืืืืกืื ืืื ืึทืคึผืืก ืืื ืืืืืข ืคืึทืจืืืง ืงืึทืืคึผืึธืื ืึทื ืฅ ืืื ืืึธืืืืืื ืคึผืืืืื ืก (ืืืึธืก ืืื ืืึธืจ ืคึผืฉืื).
ืืืจ ื ืืฆื Apache Airflow ืืื ืืึธืก:
- ืืืจ ืงืืืึทืื ืืึทืื ืคึฟืื ืคืึทืจืฉืืื ืงืืืืื (ืคืืืข SQL ืกืขืจืืืืจืขืจ ืืื PostgreSQL ืื ืกืืึทื ืกืื, ืคืึทืจืฉืืื ืึทืคึผืืก ืืื ืึทืคึผืืึทืงืืืฉืึทื ืืขืืจืืงืก, ืืคืืื 1C) ืืื DWH ืืื ODS (ืคึฟืึทืจ ืืื ืื ืขืก ืืื ืืืขืจืืืงืึท ืืื ืงืืืงืืึธืืกืข).
- ืืื ืึทืืืึทื ืกืืจืืข
cron, ืืืึธืก ืืืืคื ืืึทืื ืงืึทื ืกืึทืืึทืืืืฉืึทื ืคึผืจืึทืกืขืกืึทื ืืืืฃ ืื ODS ืืื ืืืื ืืึธื ืืืึธืจืก ืืืืขืจ ืืืฉืึทืื.
ืืื ืืขืฆืื ืก, ืืื ืืืขืจ ืืืืขืจืคืขื ืืฉื ืืขื ืขื ืืืืขืงื ืืืจื ืืืื ืงืืืื ืกืขืจืืืขืจ ืืื 32 ืงืึธืจืขืก ืืื 50 ืืืืืืืื ืคืื ืืึทืจืึทื. ืืื Airflow ืืึธืก ืึทืจืืขื:
- ืืขืจ 200 ืืึธื (ืคืืงืืืฉ ืืืึธืจืงืคืืึธืืืก ืืื ืืืึธืก ืืืจ ืืึธืื ืึธื ืืขืคืืื ืื ืืึทืกืงืก),
- ืืื ืืขืืขืจ ืืื ืืืจืืฉื ืืืืขื 70 ืืึทืกืงืก,
- ืื ืฉืืึธืคึผื ืกืืึทืจืฅ (ืืืื ืืื ืืืจืืฉื ืืืืขื) ืึทืืึธื ืึท ืฉืขื.
ืืื ืืืขื ืฉืจืืึทืื ืืืขืื ืืื ืืืจ ืืงืกืคึผืึทื ืืื ืืื ืื, ืึธืืขืจ ืืืฆื ืืึธืื ืืื ืื ืืขืคืื ืืจื ืื ืืืืขืจ-ืึทืจืืขื ืืืึธืก ืืืจ ืืืขืื ืกืึธืืืืข:
ืขืก ืืขื ืขื ืืจืื ืืงืืจ ืกืงื ืกืขืจืืืขืจืก, ืืขืืขืจ ืืื 50 ืืึทืืึทืืืืกืื - ืจืืกืคึผืขืงืืืืืื ืืื ืืืื ืคึผืจืืืขืงื, ืืืืขืจ ืกืืจืืงืืืจ ืืื ืื ืืขืืืข (ืึผืืขื ืืืืขืืื, muah-ha-ha), ืืืึธืก ืืืื ืึทื ืืขืืขืจ ืืื ืึทื ืึธืจืืขืจืก ืืืฉ (ืฆืื ืืืืง, ืืืจ ืงืขื ืขื ืืึธืื ืึท ืืืฉ ืืื ืืขื ื ืึธืืขื ืฉืืืคึผื ืืื ืงืืื ืืขืฉืขืคื). ืืืจ ื ืขืืขื ืื ืืึทืื ืืืจื ืึทืืื ื ืกืขืจืืืืก ืคืขืืืขืจ (ืืงืืจ ืกืขืจืืืขืจ, ืืงืืจ ืืึทืืึทืืืืก, ETL ืืึทืกืง ืืืืขื ืึทืคืืื) ืืื ื ืึทืืืืื ืืืึทืจืคื ืืื ืืื ืืืขืจืืืงืึท.
ืืื ืก ืืืื!
ืืขืจ ืืืื ืืื ืืงืขืจืืืง, ืคึผืจืึทืงืืืฉ (ืืื ืึท ืืืกื ืืขืึธืจืขืืืฉ)
ืคืืจืืืืก ืืึธื ืืืจ ืืึทืจืคึฟื ืขืก (ืืื ืืืจ)
ืืืขื ืื ืืืืืขืจ ืืขื ืขื ืืจืืืก ืืื ืืื ืืื ืืขืืืขื ืคึผืฉืื SQLืืื ืึท ืคืึทืจืืืึทืืืขืจ ืืื ืืืื ืจืืกืืฉ ืืึทืืึธืืื, ืืืจ ืืึธืื ืกืงืืืืื ื ETL ืคึผืจืึทืกืขืกืึทื ืึทืงืึท ืืึทืื ืคืืึธืื ื ืืฆื ืฆืืืื ืืืฉืืจืื ืื ืืืฆื ืฆื ืืื ืื:
- ืื ืคืึธืจืืึทืืืงืึท ืืึทืื ืฆืขื ืืขืจ - ืึท ืืึธืจ ืืืขืจืกืึทืืึทื ืกืืกืืขื, ืืึธืจ ืคึผืจืึธืืืงืืืื, ืืื ืืืื ืืืืืขื ืข ืืึทืื ืืืึทืจื, ืืืื ืืืืืขื ืข ืืืขืจืกืืข. ืืืื ืืึธื ืืืื, ืืื ืืขืืืืื ื 1% ืคืื ืืืึทื ืงืืืคึผืึทืืืืึทืืื. ืคืืจืืืืก? ื ื, ืขืจืฉืืขืจ, ืืขื ืฆืืืื ื ืฉืืขืื ืคืกืืืืืืืืฉื ืืจืืง ืืืืฃ ืืื ืื ืขืจืืขืฅ ืฆืืจืืง ืืื ืื 380 ืก. ืฆืืืืืื ืก, ืื ืืึทื ืืื ืืืืืื ื ืคึฟืึทืจ ืืึธืจ ืกืึทืคืืกืืึทืงืืืืื ืคึผืจืึทืกืขืกืึทื, ืืคืืขืงืึธืื ืจืืืืก ืคืื ืงืึทืืคึผืึธืื ืึทื ืฅ ืืื ืื ืืขืจืข ืืืืขืจ ืืืืืืืง ืคืึทืจื ืขืืื ื ืคึฟืขืึดืงืืืื. ืืืจ ืืืขืื ื ืืฉื ืืึธืื ืขืคึผืขืก ืืืขืื ืืขื ืคืึทืงื ืึทื ืขืก ืงืึธืก ืืื ืคืื ืืื ืึท ืึทืืจืืืก AXNUMX ืคืืืื ืคึผืขืจ ืืึธืจ.
ืืื ืืืื, ืื ืกืงืจืขืขื ืฉืึธื ืงืขื ืฉืึทืื ืืขื ืืฉื ืืื ืืขืจ 30 ืึท ืืืกื

- ืกืงื ืกืขืจืืืืจืขืจ ืื ืืขืืจืึทืืืึธื ืกืขืจืืืืจืขืจ - ืืืจ ืืขืืืืื ื ืืขื ืืึธืืขืจ ืืื ืืื ืืืขืจ ืื ืขืจืืขื ืคึผืจืืืขืงื ืคืืึธืื. ื ื, ืืื ืคืึทืงื: ืืืจ ืฉืืื ื ืืฆื SQL Server, ืืื ืขืก ืืืึธืื ืืืื ืขืคืขืก ืงืจืื ื ืืฉื ืฆื ื ืืฆื ืื ETL ืืืฉืืจืื. ืึทืืฅ ืืืขืื ืืื ืืื ืืื: ืื ืฆืืืื ื ืืื ืฉืืื, ืืื ืื ืคึผืจืึธืืจืขืก ืจืืคึผืึธืจืฅ ... ืึธืืขืจ ืืึธืก ืืื ื ืืฉื ืืืึธืก ืืืจ ืืืืข ืืืืืืืืืจื ืคึผืจืึธืืืงืื, ืืึทืงืข, ื ืืฉื ืคึฟืึทืจ ืืืึธืก. ืืืขืจืกืืข ืขืก
dtsx(ืืืึธืก ืืื ืงืกืื ืืื ื ืึธืืื ืืืึธืก ืืขื ืขื ืืขืืืฉื ืืืขื ืืขืจืืืขืืืขื) ืืืจ ืงืขื ืขื, ืึธืืขืจ ืืืึธืก ืืื ืื ืคืื ื? ืืื ืืืขืื ืืึทืื ืึท ืคึผืขืงื ืคืื ืืึทืกืงืก ืืืึธืก ืืืขื ืฉืืขืคึผื ืึท ืืื ืืขืจื ืืืฉื ืคืื ืืืื ืกืขืจืืืขืจ ืฆื ืื ืืขืจื? ืคืืจืืืืก, ืึท ืืื ืืขืจื, ืฆืืืึทื ืฆืืง ืคืื ืืื ืืืขื ืืึทืื ืืืื ืืื ืืขืงืก ืคืื ืืขืจ ืคืึทืื ืึทืืืขืง ืืฉืขืช ืงืืืงืื ื ืื ืืืื ืงื ืขืคึผื. ืืืขืจ ืขืก ืืืฉืืืื ืงืืงื ืืขืจ ืืึธืืขืจื:
ืืืจ ืืืื ืืืฉืืืื ืงืืงื ืคึฟืึทืจ ืืืขืื ืืืืก. ืขืก ืืื ืืคืืื ืงืืืึทื ืขืก ืืขืงืืืขื ืฆื ืึท ืืื-ืืขืฉืจืืื SSIS ืคึผืขืงื ืืขื ืขืจืึทืืึธืจ ...
... ืืื ืืขืืึธืื ืึท ื ืืึทืข ืึทืจืืขื ืืขืคึฟืื ืขื ืืืจ. ืืื ืืืืฃ ืืื ืึทืคึผืึทืืฉื ืึทืืจืคืืึธืื ืึธืืืืขืจืืืง ืืืจ.
ืืืขื ืืื ืืขืืขืจื ื ืึทื ETL ืคึผืจืึธืฆืขืก ืืืกืงืจืืคึผืฉืึทื ื ืืขื ืขื ืคึผืื ืงื ืคึผืืืืึธื ืงืึธื, ืืื ืงืขื ื ืืฉื ืืึธืื ืืขืืืขื ืืขืจ ืืื ืืึทื ืกืื ื ืคึฟืึทืจ ืคืจืืื. ืืึธืก ืืื ืืื ืืึทืื ืกืืจืืื ืืขื ืขื ืืื ืืขืจืืขื ืืง ืฆื ืืืขืจืืืืฉืึทื ืืื ืืืคืคืขืจืื ื, ืืื ืืืกื ืืืฉื ืืื ืึท ืืืื ืกืืจืืงืืืจ ืคืื ืืื ืืขืจืืขืจ ืคืื ืืึทืืึทืืืืกืื ืืื ืืืื ืฆืื ืืื ืืขืืืืจื ืึท ืขื ืื ืคืื ืคึผืืืืึธื ืงืึธื ืืื ืืืื ืืื ืึท ืืึทืื ืฆื ืฆืืืื 13 "ืกืงืจืื ื.
ืึทืกืขืืืึทื ืึท ืงื ืืื
ืืึธืืืจ ื ืืฉื ืืึทืื ืขืก ืึท ืืึทื ืฅ ืงืื ืืขืจ - ืืึธืจืื ืืื ื ืืฉื ืจืขืื ืืืขืื ืืึธืจ ืงืืึธืจ ืืื ืืขืจ ืืึธื ืืื ืื ืืึธ, ืืื ืื ืกืืึธืืื ื ืึทืืจืคืืึธืื, ืืืื ืืืืกืืขืจืืืืืืืข ืืึทืืึทืืืืก, ืกืขืืืขืจืื ืืื ืื ืืขืจืข ืืืื ืืืกืงืจืืืื ืืื ืื ืืึธืงืืืขื ืื.
ืึทืืื ืึทื ืืืจ ืงืขื ืขื ืึธื ืืืืื ืขืงืกืคึผืขืจืึทืืขื ืืื ื ืืืืื, ืืื ืกืงืขืืฉื ืืืืก docker-compose.yml ืืื ืืืขืืขืืข:
- ืืื ืก ืืึทืคึผื ืื ืคืึทืงืืืฉ ืึทืืจืคืืึธืื: ืกืืฉืขืืืืขืจ, ืืืขืืกืขืจืืืขืจ. ืืืื ืืืขื ืืืื ืืืืคื ืืึธืจื ืคึฟืึทืจ ืืึธื ืืืึธืจืื ื ืกืขืืืขืจืื ืืึทืกืงืก (ืืืืึทื ืขืก ืืื ืฉืืื ืคึผืืฉื ืืื
apache/airflow:1.10.10-python3.7, ืืื ืืืจ ืืึธื ื ืื ืืจืึทืืื); - ืคึผืึธืกืืืจืขืกืงื, ืืื ืืืึธืก ืึทืืจืคืืึธืื ืืืขื ืฉืจืืึทืื ืืืึทื ืกืขืจืืืืก ืืื ืคึฟืึธืจืืึทืฆืืข (ืกืงืขืืืฉืืืขืจ ืืึทืื, ืืืจืืคืืจืื ื ืกืืึทืืืกืืืง, ืืื"ื ื), ืืื ืกืขืืขืจื ืืืขื ืฆืืืื ืืขืขื ืืืงื ืืึทืกืงืก;
- ืจืขืืืก, ืืืืก ืืืขื ืฉืคึผืืื ืืื ืึท ืึทืจืืขื ืืขืงืืขืจ ืคึฟืึทืจ ืกืขืืืขืจืื;
- ืกืขืืืขืจืื ืึทืจืืขืืขืจ, ืฐืขืืืข ืจ ืฐืข ื ืืืฒ ื ืืืืกืคืืจ ื ื ื ืืืืคืืืื .
- ืฆื ืืขืงืข
./dagsืืืจ ืืืขืื ืฉืืขืื ืฆืืืึทืืขื ืืื ืืืขืจ ืืขืงืขืก ืืื ืืืกืงืจืืคึผืฉืึทื ื ืคืื ืื ืืึทืืก. ืืื ืืืขืื ืืืื ืคึผืืงื ืึทืจืืืฃ ืืืืฃ ืื ืคืืืขื, ืึทืืื ืขืก ืืื ื ืื ืืึทืจืคึฟื ืฆื ืืึทื ืื ืืื ืฆืข ืึธื ืืืืื ื ืึธื ืืขืืขืจ ื ืืกื.
ืืื ืขืืืขืืข ืขืจืืขืจ ืื ืงืึธื ืืื ืื ืืืืฉืคืืื ืืื ื ืืฉื ืืขืืขืื ืืื ืคืื (ืืื ื ืืฉื ืฆื ืงืืึทืืขืจ ืื ืืขืงืกื), ืืื ืืื ืขืืืขืืข ืขืจืืขืจ ืขืก ืืื ืืึทืืึทืคืืื ืืื ืืขื ืคึผืจืึธืฆืขืก. ืืึทื ืฅ ืืจืืขืื ืงืึธื ืืืืฉืคืืื ืงืขื ืขื ืืืื ืืขืคึฟืื ืขื ืืื ืื ืจืืคึผืึทืืึทืืึธืจื .
ืืึธืงืงืขืจ-ืงืึธืืคึผืึธืกืข.ืืื
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerืืขืจืืช:
- ืืฒ ื ืฆืืืืืขื ืฉืืขื ื ืืข ื ืงืืืคืืืืฆืืข , ืื ื ืื ื ืื ื ืื ื ืืข ืจ ืืจืขืืืข ืจ ืคืืจืืื ื ืืื ืฃ ืืข ื ืืืงืื ื ื ืืืื - ืืืื ืืืืขืจ ืฆื ืงืึธื ืืจืึธืืืจื ืขืก. ืืคึฟืฉืจ ืืืจ ืืืขื ื ืืฉื ืืึทืจืคึฟื ืขืคึผืขืก ืึทื ืืขืจืฉ ืืื ืืขืื.
- ืื ืึทืืจืคืืึธืื ืกืขืืืื ืืก ืืขื ืขื ืืืจืขืืืืื ื ืื ืืืืื ืืืจื
airflow.cfg, ืึธืืขืจ ืืืื ืืืจื ืื ืืืืืจืึทื ืืขื ืึทื ืืืขืจืืึทืืึทืื (ืืืื ืฆื ืื ืืขืืืขืืึธืคึผืขืจืก), ืืืึธืก ืืื ืืืืืขืื ืืขืืืืื ื. - ืืขืืืืื ืืืขื, ืขืก ืืื ื ืืฉื ืคึผืจืึธืืืงืฆืืข-ืืจืืื: ืืื ืืืืืืจืึทืืื ืืื ื ืืฉื ืฉืืขืื ืืึทืจืืืขืึทืฅ ืืืืฃ ืื ืงืึทื ืืืื ืขืจื ืืื ื ืืฉื ืึทืจื ืืื ืืืืขืจืืืื. ืืืขืจ ืืื ืืื ืื ืืื ืืืื ืืืึธืก ืืื ืคึผืึทืกืืง ืคึฟืึทืจ ืืื ืืืขืจ ืืงืกืคึผืขืจืึทืืึทื ืฅ.
- ืืืืขืจืง ืื:
- ืืขืจ ืืขืงืข ืืื ืืึธื ืืึธื ืืืื ืฆืืืจืืืืขื ืคึฟืึทืจ ืืืืืข ืื ืกืงืขืืืฉืืืขืจ ืืื ืืืขืจืก.
- ืืขืจ ืืขืืืืงืขืจ ืึทืคึผืืืื ืฆื ืึทืืข ืืจืื-ืคึผืึทืจืืื ืืืืืจืขืจืื - ืืื ืึทืืข ืืืื ืืืื ืืื ืกืืึทืืืจื ืืืืฃ ืืืฉืื ืขื ืืื ืึท ืกืงืขืืืฉืืืขืจ ืืื ืืืขืจืก.
ื ื, ืืืฆื ืขืก ืืื ืคึผืฉืื:
$ docker-compose up --scale worker=3ื ืึธื ืึทืืฅ ืืื ืึทืจืืืฃ, ืืืจ ืงืขื ืขื ืงืืงื ืืื ืื ืืืขื ืื ืืขืจืคืืืกืื:
- ืึทืืจืคืืึธืื:
- ืืืื:
Basic concepts
ืืืื ืืืจ ืืึธื ื ืื ืคึฟืึทืจืฉืืืื ืขืคึผืขืก ืืื ืึทืืข ืื "ืืึธื", ืืึธ ืืื ืึท ืงืืจืฅ ืืืึธืกืึทืจ:
- ืกืืฉืขืืืืขืจ - ืืขืจ ืืขืจืกื ืืืืืืืง ืืึธืืขืจ ืืื ืึทืืจืคืืึธืื, ืืืึธืก ืืืื ืืืืขืจ ืึทื ืจืึธืืืึทืฅ ืึทืจืืขื ืฉืืืขืจ, ืืื ื ืืฉื ืืขื ืืฉื: ืขืจ ืืึธื ืืืึธืจืก ืื ืคึผืืึทื, ืืขืจืืืึทื ืืืงืื ืืขื ืืึทืื, ืืืืคื ืืึทืกืงืก.
ืืื ืึทืืืขืืืื, ืืื ืขืืืขืจืข ืืืขืจืกืืขืก, ืขืก ืืื ืคึผืจืึธืืืขืืก ืืื ืืืงืึธืจื (ื ืืื, ื ืื ืึทืื ืืืฉืึท, ืึธืืขืจ ืืืงืก) ืืื ืขืก ืืื ืืขืืืขื ืืคืืื ืึท ืืขืืึทื ืคึผืึทืจืึทืืขืืขืจ ืืื ืื ืงืึธื ืคืืืก
run_duration- ืืืึทื ืจืืกืืึทืจื ืืขืืึทืืขื. ืืืขืจ ืืืฆื ืึทืืฅ ืืื ืืื. - DAG (aka "dag") ืืื ืึท "ืืืจืขืงืืขื ืึทืกืืงืืืง ืืจืึทืคืืง", ืึธืืขืจ ืึทืืึท ืึท ืืขืคึฟืื ืืฆืืข ืืืขื ืืืื ืขื ืงืืืื ืคึฟืึทืจ ืืืขืจ ืขืก ืื, ืึธืืขืจ ืืื ืขืกืึทื ืก ืขืก ืืื ืึท ืงืึทื ืืืื ืขืจ ืคึฟืึทืจ ืืึทืกืงืก ืื ืืขืจืึทืงืืื ื ืืื ืืขืืขืจ ืื ืืขืจืขืจ (ืืขื ืืืืืืขืจ) ืึธืืขืจ ืึทื ืึทื ืึทืืึธื ืคืื ืคึผืขืงื ืืื SSIS ืืื ืืืึธืจืงืคืืึธืื. ืืื ืื ืคืึธืจืืึทืืืงืึท.
ื ืื ืฅ ื ื ืื ื ืงืข ื ืืื ื ืืฒ ื ืฉืืชืื , ืืืข ืจ ืื ืจ ืฐืขื ื ื ืืฉ ื ืฆ ื ื ืฒ ืืขืจืงืืืข ื .
- DAG Run โ ืึทื ืื ืืฉืืืืืืืจืื ืืึธื, ืืืึธืก ืืืขืจื ืืึทืฉืืืื ืึทื ืืืืืขื ืขื
execution_date. ืืึทืืจืึทื ื ืคืื ืืืื ืืึธื ืงืขื ืขื ืืึทื ืฅ ืึทืจืืขื ืืื ืคึผืึทืจืึทืืขื (ืืืื, ืคืื ืงืืจืก, ืืืจ ืืึธื ืืขืืืื ืืืื ืืึทืกืงืก ืืืืขืืคึผืึทืืึทื ื). - ืึธืคึผืขืจืึทืืึธืจ - ืืึธืก ืืขื ืขื ืคึผืึทืจืฅ ืคืื ืงืึธื ืคืึทืจืึทื ืืืืึธืจืืืขื ืคึฟืึทืจ ืคึผืขืจืคืึธืจืืื ื ืึท ืกืคึผืขืฆืืคืืฉ ืงืึทืืฃ. ืขืก ืืขื ืขื ืืจืื ืืืืคึผืก ืคืื ืึธืคึผืขืจืืืืขืจื:
- ืึทืงืฆืืข, ืฐ ื ืืื ืืืข ืจ ืืืืืืืข
PythonOperator, ืืืึธืก ืงืขื ืขื ืืืกืคืืจื ืงืืื (ืืืืืืง) ืคึผืืืืึธื ืงืึธื; - ืึทืจืืืขืจืคืืจื, ืืืึธืก ืึทืจืืืขืจืคืืจื ืืึทืื ืคืื ืึธืจื ืฆื ืึธืจื, ืืึธืื
MsSqlToHiveTransfer; - ืกืขื ืกืขืจ ืขืก ืืืขื ืืืื ืืึธืื ืืืจ ืฆื ืจืขืึทืืืจื ืึธืืขืจ ืคึผืึทืืขืืขื ืื ืืืืึทืืขืจ ืืืจืืคืืจืื ื ืคืื ืื ืืึธื ืืืืืขืจ ืื ืคึผืึทืกืืจืื ื ืคืื ืงืืื ืืขืฉืขืขื ืืฉ.
HttpSensorืงืขื ืขื ืฆืืขื ืื ืกืคึผืขืกืืคืืขื ืขื ืืคึผืืื ื, ืืื ืืืขื ืืขืจ ืืขืืืืื ืขื ืืคืขืจ ืืื ืืืงืืืขื, ืึธื ืืืืื ืื ืึทืจืืืขืจืคืืจืGoogleCloudStorageToS3Operator. ืึท ื ืืึทืืขืจืืง ืืืื ืื ื ืืืขื ืคืจืขืื: "ืคืืจืืืืก? ื ืึธื ืึทืืข, ืืืจ ืงืขื ืขื ืืึธื ืจืขืคึผืึทืืืฉืึทื ื ืจืขืื ืืื ืืขืจ ืึธืคึผืขืจืึทืืึธืจ! ืืื ืืขืจืืืขืจ, ืึทืืื ืืื ื ืืฉื ืฆื ืคืึทืจืืืืื ืื ืึทืจืืขื ืืขืงื ืืื ืกืืึทืง ืึธืคึผืขืจืืืืขืจื. ืืขืจ ืกืขื ืกืขืจ ืกืืึทืจืฅ, ืืขืกืฅ ืืื ืฉืืึทืจืื ืืื ืืขืจ ืืืืึทืืขืจ ืคึผืจืืืื.
- ืึทืงืฆืืข, ืฐ ื ืืื ืืืข ืจ ืืืืืืืข
- ืึทืจืืขื - ืืขืจืงืืขืจื ืึธืคึผืขืจืืืืขืจื, ืจืึทืืึทืจืืืึทืก ืคืื ืืืคึผ, ืืื ืึทืืึทืืฉื ืฆื ืึท ืืึธื ืืขื ืขื ืคึผืจืึธืืึธืืขื ืฆื ืื ืจืื ืคืื ืึทืจืืขื.
- ืึทืจืืขื ืืืึทืฉืคึผืื โ ืืืขื ืืขืจ ืืืืขืืืื ืขืจ ืคืืื ืขืจ ืืื ืืืฉืืืกื ืื ืก'ืืื ืฆืืื ืฆื ืฉืืงื ืื ืืืืคืืืื ืืื ืงืืืฃ ืงืขืื ืื ืืจืืืืืขืจ-ืืจืืขืืขืจ (ืืืืื ืืืืคื ืืจื, ืืืื ืืืจ ื ืืฆื
LocalExecutorืึธืืขืจ ืฆื ืึท ืืืืึทื ื ืึธืืข ืืื ืื ืคืึทื ืคืืCeleryExecutor), ืขืก ืึทืกืืื ื ืืื ืึท ืงืึธื ืืขืงืกื (ื"ื ืึท ืกืืื ืคืื ืืืขืจืืึทืืึทืื - ืืืจืืคืืจืื ื ืคึผืึทืจืึทืืขืืขืจืก), ืืงืกืคึผืึทื ืื ืืึทืคึฟืขื ืึธืืขืจ ืืขืื ืืขืืคึผืืึทืืขืก ืืื ืฉืืขืื ืืื ืืื ืึท ืืขืงื.
ืืืฉืขื ืขืจืืืืื ื ืืึทืกืงืก
ืขืจืฉืืขืจ, ืืึธืื ืก ืึทืืืืืื ืื ืึทืืืขืืืื ืข ืกืืขืืข ืคืื โโโโืืื ืืืขืจ ืืึธื, ืืื ืืขืืึธืื ืืืจ ืืืขืื ืื ืืขืจืืืงื ืืื ืืขืจ ืืื ืืขืจ ืืื ืื ืืขืืึทืืืก, ืืืืึทื ืืืจ ื ืืฆื ืขืืืขืืข ื ืื-ื ืืืืืืึทื ืกืึทืืืฉืึทื ื.
ืึทืืื, ืืื ืืืื ืกืืืคึผืืึทืกื ืคืึธืจืขื, ืึทืืึท ืึท ืืึธื ืืืขื ืงืืงื ืืื ืืึธืก:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)ืืึทืืืึทืืืข ืจืึทืืืืจืึทืฅ:
- ืขืจืฉืืขืจ, ืึทืจืืึทื ืคืืจ ืื ืคืืจืืื ืื ืืืืก ืืื ืขืคึผืขืก ืึทื ืืขืจืฉ;
sql_server_ds- ืืืList[namedtuple[str, str]]ืืื ืื ื ืขืืขื ืคืื ืงืึทื ืขืงืฉืึทื ื ืคืื ืึทืืจืคืืึธืื ืงืึทื ืขืงืฉืึทื ื ืืื ืื ืืึทืืึทืืืืกืื ืคืื ืืืึธืก ืืืจ ืืืขืื ื ืขืืขื ืืื ืืืขืจ ืืขืืขืจ;dag- ืึท ืืขืืื ืคืื ืืื ืืืขืจ ืืึธื, ืืืึธืก ืืืื ืืืื ืืืglobals(), ืึทื ืืขืจืฉ ืึทืืจืคืืึธืื ืืืขื ื ืืฉื ืืขืคึฟืื ืขื ืขืก. ืืึทื ืืืื ืืึทืจืฃ ืฆื ืืึธืื:- ืืืึธืก ืืื ืืืื ื ืึธืืขื
orders- ืืขืจ ื ืึธืืขื ืืืขื ืืืื ืืขืืืืื ืืื ืื ืืืขื ืฆืืืื ื, - ืึทื ืขืก ืืืขื ืึทืจืืขืื ืกืืึทืจืืื ื ืืืึท ืืึทืืื ืึทืื ืืืืฃ ืืืื 8,
- ืืื ืขืก ืืึธื ืงืึทืืขืจ ืืขืขืจืขื ืืขืืขืจ 6 ืฉืขื (ืคึฟืึทืจ ืื ืงืื ืืืื, ืืึธ ืึทื ืฉืืึธื
timedelta()ืขืจืืืืืcron-ืืื ืืข0 0 0/6 ? * * *, ืคึฟืึทืจ ืื ืืืืื ืืงืขืจ ืงืื - ืึทื ืืืืกืืจืืง ืืื@daily);
- ืืืึธืก ืืื ืืืื ื ืึธืืขื
workflow()ืืืขื ืืึธื ืื ืืืืคึผื ืึทืจืืขื, ืึธืืขืจ ื ืืฉื ืืืฆื. ืืืฆื ืืืจ ืืืขืื ืคืฉืื ืืึทืืคึผ ืืื ืืืขืจ ืงืึธื ืืขืงืกื ืืื ืื ืงืืึธืฅ.- ืืื ืืืฆื ืื ืคึผืฉืื ืืึทืืืฉ ืคืื ืงืจืืืืืื ื ืืึทืกืงืก:
- ืืึธืืืจ ืืืจืืืืื ืืื ืืืขืจืข ืืงืืจืื;
- ืื ืืฉืึทืืืื
PythonOperator, ืฐืขืืืข ืจ ืฐืข ื ืืืืืคืืจ ื ืืื ืืืข ืจ ืืืงworkflow(). ืื ืืืืกื ื ืืฉื ืคืึทืจืืขืกื ืฆื ืกืคึผืขืฆืืคืืฆืืจื ืึท ืืื ืฆืืง (ืืื ืื ืืึธื) ื ืึธืืขื ืคืื ืื ืึทืจืืขื ืืื ืฆืืืฉืขืคึผืขื ืื ืืึธื ืืื. ืคืึธืprovide_contextืืื ืงืขืจ, ืืืขื ืืืกื ื ืึธื ืึทืจืืืืขื ืื ืืื ืื ืคึฟืื ืงืฆืืข, ืืืึธืก ืืืจ ืืืขืื ืงืขืจืคืึทืื ืงืืืึทืื ื ืืฆื**context.
ืึทื ืก ืึทืืข ืคึฟืึทืจ ืืืฆื. ืืืึธืก ืืืจ ืืึธืื:
- ื ืืึท ืืึธื ืืื ืื ืืืขื ืฆืืืื ื,
- ืืืื ืืื ืึท ืืึทืื ืืื ืืขืจื ืืึทืกืงืก ืืืึธืก ืืืขื ืืืื ืขืงืกืึทืงืืืืึทื ืืื ืคึผืึทืจืึทืืขื (ืืืื ืื ืึทืืจืคืืึธืื, ืกืขืืืขืจืื ืืื ืกืขืจืืืขืจ ืืึทืื ืกืขืืืื ืืก ืืึธืื ืขืก).
ื ื, ืืืจ ืึผืืขื ืืึธืื ืขืก.

ืืืขืจ ืืืขื ืื ืกืืึทืืืจื ืื ืืืคึผืขื ืืึทื ืกืื?
ืฆื ืคืึทืจืคึผืึธืฉืขืืขืจื ืืขื ืืึทื ืฆื ืขื ืื, ืืึธื ืืื ืขืก ืึทืจืฒึทื ืืขืืืืื docker-compose.yml ืคึผืจืึทืกืขืกืื ื requirements.txt ืืืืฃ ืึทืืข ื ืึธืืื.
ืืืฆื ืืืจ ืืืื:

ืืจืื ืกืงืืืขืจื ืืขื ืขื ืึทืจืืขื ืื ืกืืึทื ืกืื ืคึผืจืึทืกืขืกื ืืืจื ืื ืกืงืขืืืฉืืืขืจ.
ืืืจ ืืืึทืจืื ืึท ืืืกื, ืื ืืึทืกืงืก ืืขื ืขื ืกื ืึทืคึผื ืืืจื ืืืขืจืก:

ืื ืืจืื, ืคืื ืงืืจืก, ืืึธืื ืืขืืจืืขื ืืฆืืื. ืจืืื ืืขื ืขื ื ืืฉื ืืืืขืจ ืืฆืืื.
ืืืจื ืืขื ืืืขื, ืขืก ืืื ืงืืื ืืขืงืข ืืืืฃ ืืื ืืืขืจ ืคึผืจืึธืืืงื
./dags, ืขืก ืืื ืงืืื ืกืื ืืงืจืึทื ืึทืืืืฉืึทื ืฆืืืืฉื ืืืฉืื ืขื - ืึทืืข ืื ืืึทืื ืืขื ืขื ืืืgitืืืืฃ ืืื ืืืขืจ Gitlab, ืืื Gitlab CI ืืืกืืจืืืืืฅ ืืขืจืืืึทื ืืืงืื ืืขื ืฆื ืืืฉืื ืขื ืืืขื ืืขืจืืืฉืื ื ืืืmaster.
ื ืืืกื ืืืขืื ืืืื
ืืฉืขืช ืื ืึทืจืืขืืขืจ ืืึธืื ืืื ืืืขืจ ืืึทืื ืฉืึทืคืึทืื, ืืึธืืืจ ืืขืืขื ืงืขื ืืืขืื ืื ืื ืืขืจ ืืขืฆืืึทื ืืืึธืก ืงืขื ืขื ืืืืึทืื ืืื ืื ืขืคึผืขืก - ืืืื.
ืืขืจ ืขืจืฉืืขืจ ืืืึทื ืืื ืงืืฆืขืจ ืืื ืคึฟืึธืจืืึทืฆืืข ืืืืฃ ืึทืจืืขื ื ืึธืืื:

ืื ืืขืจืกื ืกืึทืืฉืขืจืืืืึทื ืืืึทื ืืื ืืึทืกืงืก ืืขืฉืืงื ืฆื ืึทืจืืขื:

ืื ืืขืจืกื ื ืืื ืข ืืืึทื ืืื ืื ืกืืึทืืืก ืคืื ืืื ืืืขืจ ืืขืงืืขืจ:

ืื ืืขืจืกื ืกืืจืืืงืื ื ืืืึทื ืืื ืืื ืืจืึทืคืก ืคืื ืื ืฉืืึทื ืคืื ืืึทืกืงืก ืืื ืืืืขืจ ืืืจืืคืืจืื ื ืฆืืื:

ืืืจ ืจืืืึธืื ืื ืึทื ืืขืจืืึธืืืื
ืึทืืื, ืึทืืข ืื ืืึทืกืงืก ืืึธืื ืฉืืื ืืขืขื ืืืงื, ืื ืืืื ืืึทื ืงืขื ืขื ืืืื ืืขืคืืจื ืึทืืืขืง.

ืื ื ืกืณืืฒื ืข ื ืืขืฐืข ื ืืื ืฅ ืขืืืขื ืข ืคืืจืฐืืื ืืขื ืข โ ืคื ื ืืฒ ื ืื ื ืื ืืขืจ ืข ืกืืื . ืืืื ืึทืืจืคืืึธืื ืืื ืืขืืืืื ื ืจืืืืืง, ืื ืืขืืืข ืกืงืืืขืจื ืึธื ืืืืึทืื ืึทื ืื ืืึทืื ืืขื ืขื ืืืฉืืืื ื ืืฉื ืื ืืขืงืืืขื.
ืืืจ ืืึทืจืคึฟื ืฆื ืงืืงื ืืื ืื ืงืืึธืฅ ืืื ืจืืกืืึทืจื ืืขืคืืื ืึทืจืืขื ืื ืกืืึทื ืกืื.
ืืืจื ืืขืื ืึท ืงืืืง ืืืืฃ ืงืืื ืงืืืึทืืจืึทื, ืืืจ ืืืขืื ืืขื ืื ืึทืงืฉืึทื ื ืื ืืืฆื ืฆื ืืื ืื:

ืืืจ ืงืขื ืขื ื ืขืืขื ืขืก ืืื ืืึทืื ืงืืึธืจ ืคึฟืึทืจ ืื ืืขืคืืื ืืืื ืขืจ. ืึทื ืืื, ืืืจ ืคืึทืจืืขืกื ืึทื ืขืคึผืขืก ืืื ืืขืคืืื ืืื ืืึธืจื, ืืื ืืขืจ ืืขืืืืงืขืจ ืึทืจืืขื ืืืึทืฉืคึผืื ืืืขื ืืืื ืฆื ืื ืกืงืขืืืฉืืืขืจ.

ืขืก ืืื ืงืืึธืจ ืึทื ืฆื ืืึธื ืืึธืก ืืื ืื ืืืื ืืื ืึทืืข ืื ืจืืื ืกืงืืืขืจื ืืื ื ืืฉื ืืืืขืจ ืืขื ืืฉืืขื - ืืึธืก ืืื ื ืืฉื ืืืึธืก ืืืจ ืืขืจืืืึทืจืื ืคืื Airflow. ืืขืืืืื ืืืขื, ืืืจ ืืึธืื ืืืึธืคื ืคืื ืืึทืกืข ืฆืขืฉืืขืจืื ื: Browse/Task Instances

ืืื ืก ืืืืกืงืืืึทืื ืึทืืฅ ืืื ืึทืืึธื ืืื ืืึทืฉืืขืืืง ืขืก ืฆื ื ืื, ืืื ืื ืจืืืืืง ื ืืืขืจ:

ื ืึธื ืจืืื ืืงืื ื, ืืื ืืืขืจ ืืึทืงืกืืก ืงืืงื ืืื ืืึธืก (ืืื ืงืขื ืขื ื ืืฉื ืืืึทืจืื ืคึฟืึทืจ ืื ืกืงืขืืืฉืืืขืจ ืฆื ืคึผืืึทื ืืื):

ืงืึทื ืขืงืฉืึทื ื, ืืืงืก ืืื ืื ืืขืจืข ืืืขืจืืึทืืึทืื
ืขืก ืืื ืฆืืื ืฆื ืงืืงื ืืื ืื ืืืืึทืืขืจ DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ะะพัะฟะพะดะฐ ั
ะพัะพัะธะต, ะพััะตัั ะพะฑะฝะพะฒะปะตะฝั"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ะะฐัะฐั, ะฟัะพััะฟะฐะนัั, ะผั {{ dag.dag_id }} ััะพะฝะธะปะธ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]ืึทืืขืืขื ืืื ืืืฅ ืืขืจืืืึทื ืืืงื ืืืืขืจ ืจืืคึผืึธืจืฅ, ืจืขืื? ืืึธ ืขืก ืืื ืืืืืขืจ: ืขืก ืืื ืึท ืจืฉืืื ืคืื ืืงืืจืื ืคืื ืืืึธืก ืฆื ืืึทืงืืืขื ืืึทืื; ืขืก ืืื ืึท ืจืฉืืื ืคืื ืืื ืฆื ืฉืืขืื ืขืก; ืื ืืืืกื ื ืืฉื ืคืึทืจืืขืกื ืฆื ืืึธื ืง ืืืขื ืึทืืฅ ืืึทืคึผืึทื ื ืึธืืขืจ ืืจืืืงืก ืึทืจืึธืคึผ (ื ื, ืืึธืก ืืื ื ืืฉื ืืืขืื ืืื ืื, ื ืื).
ืืึธืืืจ ืืืืืขืจ ืืืจืืืืื ืื ืืขืงืข ืืื ืงืืง ืืืืฃ ืื ื ืืึทืข ืืึธืื ืข ืืืื:
from commons.operators import TelegramBotSendMessage- ืืึธืจื ืืฉื ืคึผืจืืืืขื ืฅ ืืื ืื ืฆื ืืึทืื ืืื ืืืขืจ ืืืืืขื ืข ืึธืคึผืขืจืืืืขืจื, ืืืึธืก ืืืจ ืืึธืื ืืขืืืืื ื ืืืจื ืืึทืื ืึท ืงืืืื ืจืึทืคึผืขืจ ืคึฟืึทืจ ืฉืืงื ืึทืจืืืงืืขื ืฆื ืื ืืืึธืงืงืขื. (ืืืจ ืืืขืื ืจืขืื ืืขืจ ืืืขืื ืืขื ืึธืคึผืขืจืึทืืึธืจ ืืื ืื);default_args={}- ืืึธื ืงืขื ืขื ืคืึทืจืฉืคึผืจืืืื ืื ืืขืืืข ืึทืจืืืืขื ืื ืฆื ืึทืืข ืืืึทื ืึธืคึผืขืจืืืืขืจื;to='{{ var.value.all_the_kings_men }}'- ืคืขืืtoืืื ืืืขืจ ืืืขื ื ืืฉื ืืืื ืืึทืจืืงืึธืืึทื, ืึธืืขืจ ืืืฉืขื ืขืจืืืืึทื ืืื ืึทืืืงืึทืืื ื ืืฆื ืืืฉืื ืืืฉืึท ืืื ืึท ืืืึทืืขืืืืืง ืืื ืึท ืจืฉืืื ืคืื ืืืืืื, ืืืึธืก ืืื ืงืขืจืคืึทืื ืฉืืขืื ืืืAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- ืึธืคึผืขืจืึทืืึธืจ ืงืึทืืขืจ ืฆืืฉืืึทื ื. ืืื ืืื ืืืขืจ ืคืึทื, ืืขืจ ืืจืืื ืืืขื ืืืื ืืขืฉืืงื ืฆื ืื ืืึธืกืกืขืก ืืืืื ืืืื ืึทืืข ืืขืคึผืขื ืืขื ืกืืขืก ืืขื ืขื ืืงืืื ืืฆืืื;tg_bot_conn_id='tg_main'- ืืขื ืืชconn_idืึธื ื ืขืืขื ืื ืืืขื ืืืคืืขืจืก ืคืื ืื ืงืึทื ืขืงืฉืึทื ื ืืืึธืก ืืืจ ืืึทืื ืืืAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- ืึทืจืืืงืืขื ืืื ืืขืืขืืจืึทื ืืืขื ืคืืืขื ืึทืืืขืง ืืืืื ืืืื ืขืก ืืขื ืขื ืืขืคืืื ืืึทืกืงืก;task_concurrency=1- ืืืจ ืคืึทืจืืืขืจื ืื ืกืืืืึทืืืืื ืืึทืก ืงืึทืืขืจ ืคืื ืขืืืขืืข ืึทืจืืขื ืื ืกืืึทื ืกืื ืคืื ืืืื ืึทืจืืขื. ืึทื ืืขืจืฉ, ืืืจ ืืืขืื ืืึทืงืืืขื ืขืืืขืืข ืกืืืืึทืืืืื ืืึทืก ืืึธื ืืฉืืVerticaOperator(ืงืืงื ืืืง ืืืืฃ ืืืื ืืืฉ);report_update >> [email, tg]- ืึทืืขVerticaOperatorืืืขื ืฉืืืืขื ืฆื ืฉืืงื ืืจืืื ืืื ืึทืจืืืงืืขื, ืืื ืืึธืก:

ืึธืืขืจ ืืื ื ืื ื ืึธืืืึทืคืืืื ื ืึธืคึผืขืจืืืืขืจื ืืึธืื ืคืึทืจืฉืืืขื ืข ืงืึทืืขืจ ืื ืึธืื, ืืืืื ืืืื ืขืจ ืืืขื ืึทืจืืขืื. ืืื ืืจื View ืึทืืฅ ืงืืงื ืึท ืืืกื ืืืืื ืืงืขืจ ืงืืึธืจ:

ืืื ืืืขื ืืึธืื ืขืืืขืืข ืืืขืจืืขืจ ืืืขืื ืืึทืงืจืึธืก ืืื ืืืืขืจืข ืืืจืื - ืืืขืจืืึทืืึทืื.
ืืึทืงืจืึธืก ืืขื ืขื ืืืฉืื ืืืฉืึท ืึธืจืืืึธืืืขืจืก ืืืึธืก ืงืขื ืขื ืึทืจืืึทื ืืืืื ืคืึทืจืฉืืื ื ืืฆืืง ืืื ืคึฟืึธืจืืึทืฆืืข ืืื ืึธืคึผืขืจืึทืืึธืจ ืึทืจืืืืขื ืื. ืคึฟืึทืจ ืืืึทืฉืคึผืื, ืืื ืืึธืก:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} ืืืขื ืืงืกืคึผืึทื ื ืฆื ืื ืืื ืืึทืื ืคืื ืื ืงืึธื ืืขืงืกื ืืืึทืืขืืืืืง execution_date ืืื ืืขื ืคึฟืึธืจืืึทื YYYY-MM-DD: 2020-07-14. ืืขืจ ืืขืกืืขืจ ืืืื ืืื ืึทื ืงืึธื ืืขืงืกื ืืืขืจืืึทืืึทืื ืืขื ืขื ื ืืืื ืฆื ืึท ืกืคึผืขืฆืืคืืฉ ืึทืจืืขื ืืืึทืฉืคึผืื (ืึท ืงืืืึทืืจืึทื ืืื ืื ืืจื View), ืืื ืืืขื ืจืืกืืึทืจืืื ืื ืคึผืืึทืกืืึธืืืืขืจื ืืืขื ืืงืกืคึผืึทื ื ืฆื ืื ืืขืืืข ืืืึทืืืขืก.
ืื ืึทืกืืื ื ืืืึทืืืขืก ืงืขื ืขื ืืืื ืืืืื ืืื ืื ืจืขื ืืขืจื ืงื ืขืคึผื ืืื ืืขืืขืจ ืึทืจืืขื ืืืึทืฉืคึผืื. ืึทืืื ืงืืงื ืื ืึทืจืืขื ืคึฟืึทืจ ืฉืืงื ืึท ืืจืืื:

ืืื ืึทืืื ืคึฟืึทืจ ืื ืึทืจืืขื ืืื ืฉืืงื ืึท ืึธื ืืึธื:

ืื ืคืื ืจืฉืืื ืคืื ืืขืืืื-ืืื ืืึทืงืจืึธืก ืคึฟืึทืจ ืื ืืขืฆืืข ืื ืืืฆื ืืืขืจืกืืข ืืื ืื ืืืฆื ืืึธ:
ืืขืจืฆื, ืืื ืื ืืืืฃ ืคืื ืคึผืืืืื ืก, ืืืจ ืงืขื ืขื ืืขืจืงืืขืจื ืืื ืืืขืจ ืืืืืขื ืข ืืึทืงืจืึธืก, ืึธืืขืจ ืืึธืก ืืื ืึท ืืึธืจ ืึทื ืืขืจืฉ ืืขืฉืืืืข.
ืืื ืึทืืืฉืึทื ืฆื ืื ืคึผืจืขืืขืคืื ืขื ืืื ืื, ืืืจ ืงืขื ืขื ืคืึทืจืืืึทืื ืื ืืืึทืืืขืก ืคืื ืืื ืืืขืจ ืืืขืจืืึทืืึทืื (ืืื ืืื ืฉืืื ืืขื ืืฆื ืืขื ืืืืื ืืื ืื ืงืึธื). ืืื ืก ืฉืึทืคึฟื ืืื Admin/Variables ืึท ืคึผืึธืจ ืคืื ืืจืขืงืืขื:

ืึทื ืก ืขืก, ืืืจ ืงืขื ืขื ื ืืฆื:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')ืื ืืืขืจื ืงืขื ืขื ืืืื ืึท ืกืงืึทืืึทืจ, ืึธืืขืจ ืขืก ืงืขื ืืืื ืึทื ืืืึทืืื JSON. ืืื ืืขื ืคืึทื ืคืื JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}ื ืึธืจ ื ืืฆื ืืขื ืืจื ืฆื ืื ืืขืืขืื ืฉืืืกื: {{ var.json.bot_config.bot.token }}.
ืืื ืืืขื ืืืฉ ืืึธืื ืืืื ืืืึธืจื ืืื ืืืืึทืื ืืืื ืกืงืจืขืขื ืฉืึธื ืืืขืื ืงืึทื ืขืงืฉืึทื ื. ืึทืืฅ ืืื ืขืืขืืขื ืืึทืจ ืืึธ: ืืืืฃ ืืขื ืืืึทื Admin/Connections ืืืจ ืืึทืื ืึท ืงืฉืจ, ืืืืื ืืื ืืืขืจ ืืึธืืื ืก / ืคึผืึทืกืืืขืจืื ืืื ืืขืจ ืกืคึผืขืฆืืคืืฉ ืคึผืึทืจืึทืืขืืขืจืก. ืืืื:

ืคึผืึทืกืืืขืจืื ืงืขื ืขื ืืืื ืื ืงืจืืคึผืืื (ืืขืจ ืงืขืจืคืึทืื ืืื ืืื ืื ืคืขืืืงืืึทื ืึธืคึผืฆืืข), ืึธืืขืจ ืืืจ ืงืขื ืขื ื ืืฉื ืกืคึผืขืฆืืคืืฆืืจื ืื ืงืฉืจ ืืืคึผ (ืืื ืืื ืืึธื tg_main) - ืืขืจ ืคืึทืงื ืืื ืึทื ืื ืจืฉืืื ืคืื ืืืืคึผืก ืืื ืืึทืจืืืืืืขืจื ืืื ืึทืืจืคืืึธืื ืืึธืืขืืก ืืื ืงืขื ืขื ื ืื ืืืื ืืงืกืคึผืึทื ืืื ืึธื ืืึทืงืืืขื ืืื ืื ืืงืืจ ืงืึธื (ืืืื ืคึผืืืฆืืื ื ืืื ืืื ื ืืฉื Google ืขืคึผืขืก, ืืืืข ืคืึทืจืจืืืื ืืืจ), ืึธืืขืจ ืืึธืจื ืืฉื ืืืขื ืืึทืืื ืืื ืื ืคืื ืืึทืงืืืขื ืงืจืขืืืฅ ืคืฉืื ืืืจื ื ืึธืืขื.
ืืืจ ืงืขื ืขื ืืืื ืืึทืื ืขืืืขืืข ืงืึทื ืขืงืฉืึทื ื ืืื ืื ืืขืืืข ื ืึธืืขื: ืืื ืืขื ืคืึทื, ืืขืจ ืืืคึฟื BaseHook.get_connection(), ืืืึธืก ืืขืฅ ืืื ืื ืงืึทื ืขืงืฉืึทื ื ืืืจื ื ืึธืืขื, ืืืขื ืืขืื ืืจืึทืค ืคึฟืื ืขืืืขืืข ื ืืืืกืืืงืขืก (ืขืก ืืืึธืื ืืืื ืืขืจ ืืึทืืืฉืืงืึทื ืฆื ืืึทืื ืจืึธืื ื ืจืึธืืื, ืึธืืขืจ ืืืจ ืืึธืื ืืึธืก ืฆื ืื ืืขืืืืกื ืคืื ืื ืึทืืจืคืืึธืื ืืขืืืขืืึธืคึผืขืจืก).
ืืืขืจืืึทืืึทืื ืืื ืงืึทื ืขืงืฉืึทื ื ืืขื ืขื ืืืืขืจ ืงืื ืืืฉืืจืื, ืึธืืขืจ ืขืก ืืื ืืืืืืืง ื ืืฉื ืฆื ืคืึทืจืืืจื ืื ืืืึธื ืคืื ืืืึธืก ืคึผืึทืจืฅ ืคืื ืืืื ืคืืึธืื ืืืจ ืงืจืึธื ืืื ืงืึธื ืืื ืืืึธืก ืคึผืึทืจืฅ ืืืจ ืืขืื ืฆื ืึทืืจืคืืึธืื ืคึฟืึทืจ ืกืืึธืจืืืืฉ. ืืืืฃ ืื ืืืื ืืึทื ื, ืืขืฉืืืื ื ืืฉืึทื ืืื ื ืึท ืืืขืจื, ืืืฉื, ืืจืืืืงืึทืกืื, ืงืขื ืขื ืืืื ืืึทืงืืืขื ืืืจื ืื ืื. ืืืืฃ ืื ืื ืืขืจืข ืืึทื ื, ืืึธืก ืืื ื ืึธื ืึท ืฆืืจืืงืงืขืจ ืฆื ืื ืืืื ืืื, ืืืึธืก ืืืจ (ืืื) ืืขืืืืื ืฆื ืืึทืงืืืขื ืืึทืคืจืืึทืขื ืคืื.
ืืจืืขืื ืืื ืงืึทื ืขืงืฉืึทื ื ืืื ืืืื ืขืจ ืคืื ืื ืืึทืกืงืก ืืืงืก. ืืื ืึทืืืขืืืื, ืึทืืจืคืืึธืื ืืืงืก ืืขื ืขื ืืืืืื ืคึฟืึทืจ ืงืึทื ืขืงืืื ื ืขืก ืฆื ืืจืื-ืคึผืึทืจืืื ืืึทืืื ืื ืืก ืืื ืืืืืจืขืจืื. ืืืฉื, JiraHook ืืืขื ืขืคืขื ืขื ืึท ืงืืืขื ื ืคึฟืึทืจ ืืื ืื ืฆื ืื ืืขืจืึทืงื ืืื Jira (ืืืจ ืงืขื ืขื ืืึทื ืืึทืกืงืก ืืื ืืื ืฆืืจืืง), ืืื ืืื ืื ืืืืฃ SambaHook ืืืจ ืงืขื ืขื ืฉืืืคึผื ืึท ืืืืข ืืขืงืข ืฆื smb-ืคึผืื ืงื.
ืืื ืก ืคืื ืึทื ืืขืจืงืืืึทืื ืื ืื ืื ืึธืคึผืขืจืึทืืึธืจ
ืืื ืืืจ ืืขื ืขื ืืขืืืขื ื ืึธืขื ื ืฆื ืงืืงื ืืื ืขืก ืืื ืืขืืืื TelegramBotSendMessage
ืงืึธืืขืงืก commons/operators.py ืืื ืืขืจ ืึธืคึผืขืจืึทืืึธืจ ืืื:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)ืืึธ, ืืื ืึทืืฅ ืึทื ืืขืจืฉ ืืื ืึทืืจืคืืึธืื, ืึทืืฅ ืืื ืืืืขืจ ืคึผืฉืื:
- ืืจืืฉื ืคืื
BaseOperator, ืืืึธืก ืืืคึผืืึทืืึทื ืฅ ืืึทื ืฅ ืึท ืคึผืืึทืฅ ืคืื ืึทืืจืคืืึธืื-ืกืคึผืขืฆืืคืืฉ ืฉืืึธืคึผื (ืืฉืขืง ืขืก ืืื ืืืื ืคืจืืึทืข ืฆืืึทื) - ืคืขืืืขืจ ืืืืืข
template_fields, ืืื ืืืึธืก ืืืฉืื ืืืฉืึท ืืืขื ืงืืงื ืคึฟืึทืจ ืืึทืงืจืึธืก ืฆื ืคึผืจืึธืฆืขืก. - ืึธืจืืึทื ืืืืจื ืื ืจืขืื ืืขื ืืช ืคึฟืึทืจ
__init__(), ืืขืฉืืขืื ืื ืืืคืึธืืฅ ืืื ื ืืืืืง. - ืืื ืืืื ืืึธืื ื ืืฉื ืคืึทืจืืขืกื ืืืขืื ืื ืืฉืึทืืืืื ื ืืขื ืึธืืืขืก.
- ืืขืขืคื ื ืื ืงืึธืจืึทืกืคึผืึทื ืืื ื ืงืจืืง
TelegramBotHook, ืืืงืืืขื ืึท ืงืืืขื ื ืืืืคืขืฅ ืคืื ืขืก. - ืึธืืืืขืจืจืืืืึทื (ืจืืืืคืืื ื) ืืืคึฟื
BaseOperator.execute(), ืืืึธืก ืึทืืจืคืึธื ืืืขื ืืืืืืฉ ืืืขื ืื ืฆืืื ืงืืื ืฆื ืงืึทืืขืจ ืื ืึธืคึผืขืจืึทืืึธืจ - ืืื ืขืก ืืืจ ืื ืกืืจืืืขื ื ืื ืืืืคึผื ืงืึทืืฃ, ืึธื ืคืึทืจืืขืกื ืฆื ืงืืึธืฅ ืืื. (ืืื, ืืืจ ืงืืึธืฅ ืืื ืืืืึทื ืฆืstdoutะธstderr- ืึทืืจืคืืึธืื ืืืขื ืื ืืขืจืกืขืคึผื ืึทืืฅ, ืืึทื ืืืืงืืขื ืขืก ืฉืืื ืืื ืฉืืขืื ืขืก ืืื ืขืก ืืึทืจืฃ ืฆื ืืืื.)
ืืื ืก ืืขื ืืืึธืก ืืืจ ืืึธืื ืืื commons/hooks.py. ืืขืจ ืขืจืฉืืขืจ ืืืื ืคืื ืืขืจ ืืขืงืข, ืืื ืื ืงืจืืง ืืื:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientืืื ืืึธื ื ืื ืืคืืื ืืืืกื ืืืึธืก ืงืขื ืขื ืืืื ืืขืจืงืืขืจื ืืึธ, ืืื ืืืขื ื ืึธืจ ืืึธื ืื ืืืืืืืง ืคืื ืงืื:
- ืืืจ ืืจืฉืขื ืขื, ืืจืึทืืื ืืืขืื ืื ืืขื ืืช - ืืื ืจืืึฟ ืงืึทืกืขืก ืขืก ืืืขื ืืืื ืืืื ืขืจ:
conn_id; - ืึธืืืืขืจืจืืืืื ื ื ืึธืจืืึทื ืืขืืืึธืืก: ืืื ืืืืืืขื ืืื
get_conn(), ืืื ืืืึธืก ืืื ืืึทืงืืืขื ืงืฉืจ ืคึผืึทืจืึทืืขืืขืจืก ืืืจื ื ืึธืืขื ืืื ื ืึธืจ ืืึทืงืืืขื ืื ืึธืคึผืืืืืื ืextra(ืืึธืก ืืื ืึท ืคืขืื ืคึฟืึทืจ JSON), ืืื ืืืึธืก ืืื (ืืืื ืืืื ืืืืืขื ืข ืืื ืกืืจืืงืฆืืขืก!) ืฉืืขืื ืื ืืขืืขืืจืึทื ืืึธื ืืึธืงืขื:{"bot_token": "YOuRAwEsomeBOtToKen"}. - ืืื ืืึทืื ืึท ืืืึทืฉืคึผืื ืคืื ืืื ืืืขืจ
TelegramBot, ืืขืื ืืื ืึท ืกืคึผืขืฆืืคืืฉ ืกืืืขื.
ืึทื ืก ืึทืืข. ืืืจ ืงืขื ืขื ืืึทืงืืืขื ืึท ืงืืืขื ื ืคึฟืื ืึท ืงืจืืง ื ืืฆื TelegramBotHook().clent ืึธืืขืจ TelegramBotHook().get_conn().
ืืื ืืขืจ ืฆืืืืืืขืจ ืืืื ืคืื ืืขืจ ืืขืงืข, ืืื ืืืึธืก ืืื ืืึทืื ืึท ืืืงืจืึธ-ืจืึทืคึผืขืจ ืคึฟืึทืจ ืื ืืขืืขืืจืึทื REST API, ืึทืืื ื ืืฉื ืฆื ืฉืืขืคึผื ืื ืืขืืืข ืฆืืืื ืืืื ืฉืืื sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))ืืขืจ ืจืืืืืง ืืืขื ืืื ืฆื ืืืืื ืขืก ืึทืืข ืึทืจืืืฃ:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- ืืื ืึท ืคึผืืืืื, ืฉืืขืื ืขืก ืืื ืึท ืฆืืืืจ ืจืืคึผืึทืืึทืืึธืจื ืืื ืืขืื ืขืก ืฆื ืขืคึฟื ืืงืืจ.
ืืฉืขืช ืืืจ ืืึธืื ืืขืืขืจื ื ืึทืืข ืืขื, ืืื ืืืขืจ ืืึทืจืืื ืืขืจืืืึทื ืืืงืื ืืขื ืืขืจืืื ืฆื ืคืึทืจืืึธืื ืืฆืืื ืืื ืฉืืงื ืึท ืืขืืช ืึธื ืืึธื ืฆื ืืืื ืงืึทื ืึทื. ืืื ืืืขื ื ืืืืืื ืืืื ืืฉืขืงื ืืืืก ืืื ืฉืืขืื...

ืขืคึผืขืก ืืื ืฆืขืืจืืื ืืื ืืื ืืืขืจ ืืึธื! ืืื ื ืื ืืึธืก ืืืึธืก ืืืจ ืืืืจืื ืคึฟืึทืจ? ืคึผืื ืงื!
ืืืขื ืืืจ ืขืก ืืืกื?
ืฆื ืืืจ ืคืืื ืืื ืืื ืืืกื ืขืคึผืขืก? ืขืก ืืืื ื ืึทื ืขืจ ืืึธื ืฆืืืขืืืื ืฆื ืึทืจืืืขืจืคืืจื ืืึทืื ืคืื ืกืงื ืกืขืจืืืืจืขืจ ืฆื ืืืขืจืืืงืึท, ืืื ืืึทื ืขืจ ืืขื ืืืขื ืขืก ืืื ืืื ืงืก ืืขืจ ืืขืืข, ืืืื ืกืงืึทื ืืึทื!
ืื ืืจืืืืึทื ืืื ืืขืืืขื ืืขืงืืืื, ืืื ื ืึธืจ ืืึธืื ืฆื ืืืกืืืคืขืจ ืขืืืขืืข ืืขืจืืื ืึธืืึธืืืข ืคึฟืึทืจ ืืืจ. ืืืฆื ืืืจ ืงืขื ืขื ืืึทื ืืืืฃ.
ืืื ืืืขืจ ืคึผืืึทื ืืื ืืขืืืขื ืืึธืก:
- ืืึทืื ืึท ืืึธื
- ืืืฉืขื ืขืจืืื ืืึทืกืงืก
- ืงืืง ืืื ืฉืืื ืึทืืฅ ืืื
- ืืึทืฉืืืืขื ืกืขืกืืข ื ืืืขืจื ืฆื ืคืืื
- ืืึทืงืืืขื ืืึทืื ืคึฟืื SQL Server
- ืฉืืขืื ืืึทืื ืืื ืืืขืจืืืงืึท
- ืงืืืึทืื ืกืืึทืืืกืืืง
ืึทืืื, ืฆื ืืึทืงืืืขื ืืขื ืึทืืข ืืขืืื ืืขื, ืืื ืืขืืืื ืึท ืืืกื ืืขืจืฆื ืฆื ืืื ืืืขืจ docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyืืึธ ืืืจ ืืึทืคึผื:
- ืืืขืจืืืงืึท ืืื ืืึทืืขืืึธืก
dwhืืื ืื ืืขืจืกื ืคืขืืืงืืึทื ืกืขืืืื ืืก, - ืืจืื ืื ืกืืึทื ืกืื ืคืื SQL Server,
- ืืืจ ืคึผืืึธืืืืจื ืื ืืึทืืึทืืืืกืื ืืื ืื ืืขืฆืืข ืืื ืขืืืขืืข ืืึทืื (ืืื ืืขืจ ืงืืื ืฆืืฉืืื ืื ืงืืง ืืื
mssql_init.py!)
ืืืจ ืงืึทืืขืจ ืึทืืฅ ืืื ืึท ืืืกื ืืขืจ ืงืึธืืคึผืืืฆืืจื ืืึทืคึฟืขื ืืื ืื ืืขืฆืืข ืืึธื:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3ืืืึธืก ืืื ืืืขืจ ื ืก ืจืึทื ืืึทืืืืืขืจ ืืืฉืขื ืขืจืืืืึทื ืงืขื ืขื ืืืื ืืขืืื ืืื ืืขื ื ืืืขืจ Data Profiling/Ad Hoc Query:

ืื ืืืืคึผื ืืึทื ืืื ื ืืฉื ืฆื ืืืืึทืื ืขืก ืฆื ืึทื ืึทืืืก
ืืึทืืจืึทืืื ืืื ืืขืืึทื ETL ืกืขืฉืึทื ื ืืื ืืึธื ื ืื, ืึทืืฅ ืืื ื ืืฉืืืง: ืืืจ ืืึทืื ืึท ืืึทืืึทืืืืก, ืึท ืืืฉ ืืื ืขืก, ืืึทื ืืืืงืืขื ืึทืืฅ ืืื ืึท ืงืึธื ืืขืงืกื ืคืึทืจืืืึทืืืขืจ, ืืื ืืืฆื ืืืจ ืืึธื ืืึธืก:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passืื ืฆืืื ืืื ืืขืงืืืขื ื ืขืืขื ืืื ืืืขืจ ืืึทืื ืคึฟืื ืืื ืืืขืจืข ืืื ืึท ืืึทืื ืืื ืืขืจื ืืืฉื. ืืื ืก ืืึธื ืืึธืก ืืื ืืืืขืจ ืคึผืฉืื ืฉืืจืืช:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- ื ืืฆื ืึท ืงืจืืง ืืืจ ืืึทืงืืืขื ืคึฟืื ืึทืืจืคืืึธืื
pymssql-ืคืึทืจืืื ืื - ืืึธืืืจ ืึทืจืืึทื ืืืืื ืึท ืงืึทื ืกืืจืืื ืฅ ืืื ืื ืคืึธืจืขื ืคืื ืึท ืืึทืืข ืืื ืื ืืงืฉื - ืืขืจ ืืืกืืขืจ ืืึธืืึธืจ ืืืขื ืืืึทืจืคื ืขืก ืืื ืื ืคึฟืื ืงืฆืืข.
- ืคืืืื ื ืืื ืืืขืจ ืืงืฉื
pandasืืืขืจ ืืืขื ืืื ืื ืืึทืงืืืขืDataFrame- ืขืก ืืืขื ืืืื ื ืืฆืืง ืคึฟืึทืจ ืืื ืื ืืื ืืขืจ ืฆืืงืื ืคึฟื.
ืืื ื ืืฆื ืกืึทืืกืืืืืฉืึทื
{dt}ืึทื ืฉืืึธื ืึท ืืงืฉื ืคึผืึทืจืึทืืขืืขืจ%sื ืื ืืืืึทื ืืื ืืื ืึท ืืืื ืคึผืื ืึธืงืืฉืืึธ, ืึธืืขืจ ืืืืึทืpandasืงืขื ืขื ื ืืฉื ืงืึธืคึผืข ืืืpymssqlืืื ืกืืืคึผืก ืขืก ืฆื ืื ืืขืฆืืข ืืืื ืขืจparams: List, ืืึธืืฉ ืขืจ ืืืื ืืึทืงืขtuple.
ืืืื ืืึธื ืึทื ืื ืืขืืืขืืึธืคึผืขืจpymssqlืืึทืฉืืึธืกื ื ืืฉื ืฆื ืฉืืืฆื ืืื ืขื ืืืึธืจ, ืืื ืขืก ืก ืฆืืึทื ืฆื ืจืืจื ืืืืกpyodbc.
ืืื ืก ืืขื ืืืึธืก ืึทืืจืคืืึธืื ืกืืึทืคื ืืื ืื ืึทืจืืืืขื ืื ืคืื ืืื ืืืขืจ ืคืึทื ืืงืฉืึทื ื:

ืืืื ืขืก ืืื ืงืืื ืืึทืื, ืขืก ืืื ืงืืื ืคืื ื ืฆื ืคืึธืจืืขืฆื. ืืืขืจ ืขืก ืืื ืืืื ืืึธืื ืข ืฆื ืืึทืืจืึทืืื ืื ืคืืืื ื ืืขืจืึธืื. ืืืขืจ ืืืก ืืื ื ืืฉื ืงืืื ืืขืืช. ืึทื, ืึทื, ืืืึธืก ืฆื ืืึธื?! ืืึธ ืก ืืืึธืก:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException ืึทืืจืคืืึธืื ืืืขื ืืึธืื ืืืจ ืึทื ืขืก ืืื ืคืืงืืืฉ ืงืืื ืืขืืช, ืึธืืขืจ ืืืจ ืกืงืืคึผืื ื ืื ืึทืจืืขื. ืื ืฆืืืื ื ืืืขื ื ืืฉื ืืึธืื ืึท ืืจืื ืึธืืขืจ ืจืืื ืงืืืึทืืจืึทื, ืึธืืขืจ ืืืขื ืืืื ืืื ื ืจืึธืืขืืืข.
ืืื ืก ืงืึธืจืืขื ืืื ืืืขืจ ืืึทืื ืขืืืขืืข ืฉืคืืืื:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])ื ืืืืื
- ืื ืืึทืืึทืืืืก ืคืื ืืืึธืก ืืืจ ืืขื ืืืขื ืึธืจืืขืจืก,
- ID ืคืื ืืื ืืืขืจ ืืคึผืืึธืึทื ืกืขืกืืข (ืขืก ืืืขื ืืืื ืึทื ืืขืจืฉ ืคึฟืึทืจ ืืขืืขืจ ืึทืจืืขื),
- ืืึทืฉ ืคึฟืื ืืขืจ ืืงืืจ ืืื ืกืืจ ืืืืขื ืืืคึฟืืงืึทืฆืืข - ืึทืืื ืึทื ืืื ืื ืืขืฆื ืืึทืืึทืืืืก (ืืื ืึทืืฅ ืืื ืืืืกืืขืืืกื ืืื ืืืื ืืืฉ) ืืืจ ืืึธืื ืึท ืืื ืฆืืง ืกืืจ ืืืขื ืืืคืืขืจ.
ืื ืืขืฆืืข ืฉืจืื ืืืืืื: ืืืกื ืึทืืฅ ืืื ืืืขืจืืืงืึท. ืืื, ืึทืืื ืืขื ืื, ืืืื ืขืจ ืคืื ืื ืืขืจืกื ืกืคึผืขืงืืึทืงืืึทืืขืจ ืืื ืขืคืขืงืืืื ืืืขืื ืฆื ืืึธื ืืึธืก ืืื ืืืจื CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- ืืืจ ืืึทืื ืึท ืกืคึผืขืฆืืขื ืคืึทืจืืึทืืืื ื ืฆืขื ืืขืจ
StringIO. pandasืืืขื ืืื ืืืืื ืืื ืืืขืจ ืืื ืขืกDataFrameืืืCSV-ืืื ืขืก.- ืืึธืืืจ ืขืคึฟืขื ืขื ืึท ืคึฟืึทืจืืื ืืื ื ืฆื ืืื ืืืขืจ ืืึทืืืื ืืืขืจืืืงืึท ืืื ืึท ืงืจืืง.
- ืืื ืืืฆื ืืื ืืขืจ ืืืืฃ
copy()ืืึธืืืจ ืฉืืงื ืืื ืืืขืจ ืืึทืื ืืืืื ืฆื Vertika!
ืืืจ ืืืขืื ื ืขืืขื ืคืื ืื ืฉืึธืคืขืจ ืืื ืคืืืข ืฉืืจืืช ืืขื ืขื ืึธื ืืขืคืืื ืืื ืืึธืื ืื ืกืขืกืืข ืคืึทืจืืืึทืืืขืจ ืึทื ืึทืืฅ ืืื ืืื:
session.loaded_rows = cursor.rowcount
session.successful = Trueืึทื ืก ืึทืืข.
ืืื ืคึผืจืึธืืืงืฆืืข, ืืืจ ืืึทืื ืื ืฆืื ืืขืืขืจ ืืึทื ืืืึทืื. ืื ืืื ืืื ืืืจ ืืขืืืื ื ืงืืืื ืข ืืืฉืื:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ืืื ืืื ื ืืฆื
VerticaOperator()ืืื ืืึทืื ืึท ืืึทืืึทืืืืก ืกืืฉืขืืึท ืืื ืึท ืืืฉ (ืืืื ืืื ืืึธื ื ืื ืฉืืื ืขืงืกืืกืืืจื, ืคืื ืืืืฃ). ืื ืืืืคึผื ืืึทื ืืื ืฆื ืฆืืืืืื ืื ืืืคึผืขื ืืึทื ืกืื ืจืืืืืง:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadืกืืืืื ื ืึทืจืืืฃ
"ื ื," ืืึธื ืื ืืืื ืืขืืึธืื, "ืืื ืขืก ื ืื ืืืช ืืืฆื?"
ืืขื ื ืืืจ ืงืึทื ืืืื ืกื ืึทื ืืื ืืื ืื ืืขืจืกื ืฉืจืขืงืืขื ืืืึทืข ืืื ืื ืืืึทืื?
ืืืฉืืืืึท ืืึธื ืึทืืืกืึธื, "ืื ืืจืืคืึทืืึธ"
ืืื ืืจืึทืืื ืืืื ืืืื ืืืจืื ืืื ืืื ืืึธืื ืึท ืคืึทืจืืขืกื: ืืืขืจ ืืืึธืื ืืืื ืื ืคืึทืกืืึทืกื ืฆื ืฉืึทืคึฟื ืืื ืงืึทืืขืจ ืึทื ETL ืคึผืจืึธืฆืขืก ืคึฟืื ืงืจืึทืฆื: ืืื ืืื ืืืืขืจ SSIS ืืื ืืืื ืืื ืืืจ ืืื Airflow ... ืืื ืืขืืึธืื ืืืจ ืืืึธืื ืืืื ืคืึทืจืืืืึทืื ืื ืคืื ืืืฉืึทืื. ืืืึทื, ืืื ืืจืึทืืื ืืืจ ืืืขื ืฉืืืืขื ืึทื ืืื ืืืขื ืืืืคึผืึทืก ืืื ืืืืฃ ืึทืืข ืคืจืึทื ืฅ!
ืืืืฃ ืึท ืืืกื ืืขืจ ืขืจื ืกื ืืึธื, Apache Airflow - ืืืจื ืืืกืงืจืืืืื ื ืคึผืจืึทืกืขืกืึทื ืืื ืื ืคืึธืจืขื ืคืื ืคึผืจืึธืืจืึทื ืงืึธื - ืืื ืืืื ืึทืจืืขื ืคืื ืืขืจ ืืึทืงืืืขื ืืื ืึธื ืืขื ืขื.
ืืืื ืึทื ืืืืึทืืึทื ืขืงืกืืขื ืกืืืืืืื: ืืืืืข ืืื ืืขืจืืื ืขื ืคืื ืคึผืืืืื ืก ืืื ืคึผืจืืืืกืคึผืึทืืืฉืึทื ืฆื ืกืงืึทืืึทืืืืืื - ืืื ืืืจ ืื ืืขืืขืื ืืืื ืฆื ื ืืฆื Airflow ืืื ืึผืืขื ืงืืื ืืขืื ื: ืืคืืื ืืื ืื ืคืื ืฆืืงื ืคืื ืงืึทืืขืงืืื ื, ืคึผืจืืคึผืขืจืื ื ืืื ืคึผืจืึทืกืขืกืื ื ืืึทืื, ืืคืืื ืืื ืืึธื ืืฉืื ื ืจืึทืงืึทืฅ (ืฆื ืืึทืจืก, ืืึธื) .
ืืขืฆื ืืืื, ืจืขืคึฟืขืจืขื ืฅ ืืื ืื ืคืึธืจืืืืฉืึทื ืึทื
ืื ืืจืึทืืืืข ืืืจ ืืขืืืืื ืคึฟืึทืจ ืืืจ
start_date. ืืึธ, ืืึธืก ืืื ืฉืืื ืึท ืืืืข ืืขืืข. ืืืจื ืืึทืืก ืืืืคึผื ืึทืจืืืืขื ืstart_dateืึทืืข ืคืึธืจื. ืืขืงืืฆืขืจ, ืืืื ืืืจ ืกืคึผืขืฆืืคืืฆืืจื ืืืstart_dateืงืจืึทื ื ืืึทืืข, ืืื ืืืืฃschedule_interval- ืืืื ืืึธื, ืืึทื ืืืขื ืงืึทืืขืจ ื ืื ืคืจืืขืจ ืืื ืืึธืจืื.start_date = datetime(2020, 7, 7, 0, 1, 2)ืืื ื ืื ืืขืจ ืคึผืจืึธืืืขืืก.
ืื ืื ืืขืจ ืืืจืืคืืจืื ื ืืขืืช ืืื ืคืืจืืื ืื ืืื ืืื:
Task is missing the start_date parameter, ืืืึธืก ืจืืึฟ ืึธืคื ืื ืืืงืืืฅ ืึทื ืืืจ ืคืืจืืขืกื ืฆื ืืื ืื ืืึธื ืฆื ืืขืจ ืึธืคึผืขืจืึทืืึธืจ.- ืึทืืฅ ืืืืฃ ืืืื ืืึทืฉืื. ืืึธ, ืืื ืืึทืืึทืืืืกืื (ืึทืืจืคืืึธืื ืืื ืืื ืืื ืืืขืจ ืงืึธืืืื ื), ืืื ืึท ืืืขื ืกืขืจืืืขืจ, ืืื ืึท ืกืงืขืืืฉืืืขืจ, ืืื ืืืขืจืก. ืืื ืขืก ืืคืืื ืืขืืจืืขื. ืึธืืขืจ ืืื ืืขืจ ืฆืืื, ืื ื ืืืขืจ ืคืื ืืึทืกืงืก ืคึฟืึทืจ ืื ืกืขืจืืืืกืขืก ืืขืืืืงืกื, ืืื ืืืขื PostgreSQL ืื ืืขืืืืื ืฆื ืจืืกืคึผืึทื ื ืฆื ืื ืืื ืืขืงืก ืืื 20 ืก ืึทื ืฉืืึธื ืคืื 5 ืืื, ืืืจ ืืขื ืืืขื ืขืก ืืื ืืขืคืืจื ืขืก ืึทืืืขืง.
- ืืึธืงืึทืืขืงืกืขืงืืืึธืจ. ืื, ืืืจ ืืืฆื ื ืื ืืขืจืืืฃ, ืืื ืืืจ ืืขื ืขื ืฉืืื ืืขืงืืืขื ืฆืื ืจืื ื ืคืื ืืขืจ ืชืืื. LocalExecutor ืืื ืืขืืืขื ืืขื ืื ืคึฟืึทืจ ืืื ืื ืืื ืืืฆื, ืึธืืขืจ ืืืฆื ืขืก ืืื ืฆืืื ืฆื ืืงืกืคึผืึทื ื ืืื ืืคึผืืืช ืืืื ืึทืจืืขืืขืจ, ืืื ืืืจ ืืืขืื ืืึธืื ืฆื ืึทืจืืขืื ืืึทืจืืขืจ ืฆื ืึทืจืืืขืจืคืืจื ืฆื CeleryExecutor. ืืื ืืื ื ืืืจ ืงืขื ืขื ืึทืจืืขืื ืืื ืขืก ืืืืฃ ืืืื ืืึทืฉืื, ืืึธืจื ืืฉื ืกืืึทืคึผืก ืืืจ ืคืื ื ืืฆื ืกืขืืืขืจืื ืืคืืื ืืืืฃ ืึท ืกืขืจืืืขืจ, ืืืึธืก "ืืขืืืืื ืืืขื ืืืขื ืงืืื ืืึธื ืืืื ืืื ืคึผืจืึธืืืงืฆืืข, ืืึธื ืขืกืืื!"
- ื ืื-ื ืืฆื ืืขืืืื-ืืื ืืืฉืืจืื:
- ืงืึธื ื ืขืงืืืึธื ืก ืฆื ืงืจืึธื ืกืขืจืืืืก ืงืจืึทืืขื ืืฉืึทืื,
- SLA ืืืกืึทื ืฆื ืจืืกืคึผืึทื ื ืฆื ืืึทืกืงืก ืืืึธืก ืืขื ืขื ื ืืฉื ืืขืขื ืืืงื ืืื ืฆืืื,
- XCom ืฆื ืืืขืงืกื ืืขืืึทืืึทืืึท (ืืื ืืขืืืื metaืืึทืื!) ืฆืืืืฉื ืืึธื ืก ืืึทืกืงืก.
- ืืืืืขื ืคืื ืคึผืึธืกื. ื ื ืืืืก ืงืขื ืืื ืืืื? ืึทืืขืจืฅ ืืขื ืขื ืฉืืขืื ืึทืจืืืฃ ืคึฟืึทืจ ืึทืืข ืจืืคึผืขืืืฉืึทื ื ืคืื ืืจืึทืคึผื ืืึทืกืงืก. ืืืฆื ืืื ืืืื ืึทืจืืขื Gmail ืขืก ืืขื ืขื> 90 ืง ืืืชืืืช ืคืื ืึทืืจืคืืึธืื, ืืื ืื ืืืขื ืคึผืึธืกื ืคึผื ืื ืืืื ืฆื ื ืขืืขื ืืื ืืืกืืขืงื ืืขืจ ืืื 100 ืืจืขืงืืขื ืืื ืึท ืฆืืื.
ืืขืจ ืืกืจืื ืืช:
ืืืื ืคืื ืืคืืื ืืจืขืกืขืจ ืึธืืึทืืืืฉืึทื
ืึผืื ืืื ืื ืืึธื ืึทืจืืขืื ื ืึธื ืืขืจ ืืื ืืื ืืืขืจ ืงืขืค ืืื ื ืืฉื ืืื ืืื ืืืขืจ ืืขื ื, Airflow ืืื ืฆืืืขืืจืืื ืืึธืก ืคึฟืึทืจ ืืื ืื:
- - ืขืก ื ืึธื ืืื ืขืงืกืคึผืขืจืืืขื ืืึทื ืกืืึทืืืก, ืืืึธืก ืงืขื ื ืืฉื ืคืึทืจืืืึทืื ืขืก ืฆื ืึทืจืืขืื. ืืื ืืืื ืืืืฃ, ืืืจ ืงืขื ื ื ืืฉื ืืืืื ืืึทืงืืืขื ืืื ืคึฟืึธืจืืึทืฆืืข ืืืขืื ืืึธื ืืื ืืึทืกืงืก, ืึธืืขืจ ืืืื ืืึทืืื / ืึธื ืืืืื ืึท ืืึธื, ืฉืึทืคึฟื ืึท DAG Run ืึธืืขืจ ืึท ืืขืงื.
- - ืคืืืข ืืืฉืืจืื ืืขื ืขื ืืืจืขืืืืื ืืืจื ืื ืืึทืคึฟืขืื ืฉืืจื ืืืึธืก ืืขื ืขื ื ืืฉื ืืืืื ืืืืึทืงืืืขื ืฆื ื ืืฆื ืืืจื WebUI, ืึธืืขืจ ืืขื ืขื ืืึธืจ ื ืืืึธ. ืืืฉื:
backfillืืืจืฃ ืฆื ืจืืกืืึทืจื ืึทืจืืขื ืื ืกืืึทื ืกืื.
ืคึฟืึทืจ ืืืึทืฉืคึผืื, ืึทื ืึทืืืก ืืขืงืืืขื ืืื ืืขืืืื: "ืืื ืืืื ืืึทืื, ืืึทืืืขืจ, ืืขื ืขื ืืืืื ืคืื 1 ืืื ืืืจ ืืื 13 ืืื ืืืจ! ืคืึทืจืจืืืื ืขืก, ืคืึทืจืจืืืื ืขืก, ืคืึทืจืจืืืื ืขืก, ืคืึทืจืจืืืื ืขืก! ืืื ืืืจ ืืขื ื ืึทืืึท ืึท ืืึทืืึท:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- ืืึทืืข ืืืฉืึทืื:
initdb,resetdb,upgradedb,checkdb. run, ืืืึธืก ืึทืืึทืื ืืืจ ืฆื ืงืึทืืขืจ ืืืื ืึทืจืืขื ืืืึทืฉืคึผืื, ืืื ืืคืืื ืคืึทืจืืขืกื ืึทืืข ืื ืืืคึผืขื ืืึทื ืกืื. ืืขืจืฆื, ืืืจ ืงืขื ืขื ืืืืคื ืขืก ืืืจืLocalExecutor, ืืคืืื ืืืื ืืืจ ืืึธืื ืึท ืกืขืืืขืจืื ืงื ืืื.- ืืื ืืขืจื ืื ืืขืืืข ืืื
test, ืึธืืขืจ ืขืก ืืื ื ืืฉื ืฉืจืืึทืื ืขืคึผืขืก ืฆื ืื ืืึทืืึทืืืืก. connectionsืึทืืึทืื ืืืจ ืฆื ืฉืึทืคึฟื ืงืึทื ืขืงืฉืึทื ื ืืื ืคืึทืจื ืขื ืคืื ืื ืฉืึธื.
- - ืึท ืืึทื ืฅ ืืึทืจืืงืึธืจ ืื ืืขืจืึทืงืฉืึทื ืืืคึฟื, ืืืึธืก ืืื ืืืขื ืคึฟืึทืจ ืคึผืืืืื ืก, ืืื ื ืืฉื ืืื ืืงืื ื ืืื ืืืื ืืขื ื. ืืืขืจ ืืืขืจ ืืืขื ืืึทืืื ืืื ืื ืคืื ืืืื ืฆื
/home/airflow/dags, ืืืืคืipythonืืื ืึธื ืืืืื ืืขืกืื ื ืึทืจืื? ืืืจ ืงืขื ื, ืืืฉื, ืึทืจืืืกืคืืจื ืึทืืข ืงืึทื ืขืงืฉืึทื ื ืืื ืืขื ืงืึธื:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - ืคืึทืจืืื ืื ืฆื ืึทืืจืคืืึธืื ืืขืืึทืืึทืืึท ืืึทืืึทืืืืก. ืืื ืืึธื ื ืืฉื ืจืขืงืึธืืขื ืืืจื ืฆื ืฉืจืืึทืื ืฆื ืขืก, ืึธืืขืจ ืืืจ ืงืขื ืขื ืืึทืงืืืขื ืึทืจืืขื ืฉืืึทืื ืคึฟืึทืจ ืคืึทืจืฉืืื ืกืคึผืขืฆืืคืืฉ ืืขืืจืืงืก ืคืื ืคืึทืกืืขืจ ืืื ืืจืื ืืขืจ ืืื ืืืจื ืงืืื ืคืื ืื ืึทืคึผืืก.
ืืื ืก ืืึธืื ืึทื ื ืื ืึทืืข ืคืื โโืืื ืืืขืจ ืืึทืกืงืก ืืขื ืขื ืืืขืืคึผืึธืืขื ื, ืึธืืขืจ ืืื ืงืขื ืขื ืืื ืคืึทืจืืึธืื ืืื ืืึธืก ืืื ื ืึธืจืืึทื. ืืืขืจ ืึท ืืืกื ืืืืืจืขื ืืื ืฉืืื ืกืึทืกืคึผืืฉืึทืก, ืืื ืืืจ ืืึทืจืคึฟื ืฆื ืงืึธื ืืจืึธืืืจื ืขืก.
ืืื ืืืื, SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ืจืขืคึฟืขืจืขื ืฆื
ื ื, ืคืื ืงืืจืก, ืื ืขืจืฉืืขืจ ืฆืขื ืืื ืงืก ืคึฟืื Google ืืขื ืขื ืื ืืื ืืึทืื ืคืื ืื Airflow ืืขืงืข ืคึฟืื ืืืื ืืืงืืึทืจืงืก.
- - ืคืื ืงืืจืก, ืืืจ ืืึทืจืคึฟื ืฆื ืึธื ืืืืื ืืื ืื ืึธืคืืก. ืืึทืงืืืืขื ืืืืฉืึทื, ืึธืืขืจ ืืืขืจ ืืืืขื ื ืื ืื ืกืืจืึทืงืฉืึทื ื?
- - ื ื, ืืคึผืืืช ืืืืขื ืขื ืื ืจืขืงืึทืืึทื ืืืืฉืึทื ื ืคืื ืื ืงืจืืืืืขืจื.
- - ืื ืึธื ืืืื: ืื ืืึทื ืืฆืขืจ ืฆืืืื ื ืืื ืืืืืขืจ
- โ ืื ืืจืื ืึพืืึทืืจืืคึฟื ืืฒึทื ืขื ืืื ืืึทืฉืจืืื, ืืืื ืืืจ ืืึธื (ืคึผืืืฆ!) ืขืคึผืขืก ืคึฟืื ืืืจ ื ืืฉื ืคึฟืึทืจืฉืืึทื ืขื.
- - ืึท ืงืืจืฅ ืืืขืืืืืึทืืขืจ ืฆื ืืึทืฉืืขืืืงื ืึท ืึทืืจืคืืึธืื ืงื ืืื.
- - ืึผืืขื ืืขืจ ืืขืืืืงืขืจ ืืฉืืงืึทืืืข ืึทืจืืืงื, ืึทืืืฅ ืึทื ืขืก ืืื ืืขืจ ืคืึธืจืืึทืืืืึทื ืืื ืืืืื ืืงืขืจืข ืืืืฉืคืืื.
- - ืืืขืื ืืจืืขืื ืืื ืงืึทื ืืืฉืึทื ืืงืฉืึทื ืืื ืกืขืืืขืจืื.
- - ืืืขืื ืื ืืืืขืืึธืืขื ืกื ืคืื ืืึทืกืงืก, ืืึธืืืื ื ืืืจื ืฉืืึทื ืึทื ืฉืืึธื ืคืื ืืึทืืข, ืืจืึทื ืกืคืึธืจืืึทืฆืืข, ืืขืงืข ืกืืจืืงืืืจ ืืื ืื ืืขืจืข ืืฉืืงืึทืืืข ืืื ืื.
- - ืึทืจืืขื ืืื ืืจืืืขืจ ืจืื ืืืคึผืขื ืืึทื ืกืื, ืืืึธืก ืืื ืืขืจืืื ื ืืืืื ืืื ืืืื ืคืืจืืื.
- - ืืื ืฆื ืืึทืงืืืขื ืขืืืขืืข "ืึทืจืืขื ืืื ืืืขื" ืคึผืจืึธืืืขืืก ืืื ืื ืกืงืขืืืฉืืืขืจ, ืืึธืื ืคืึทืจืคืึทืื ืืึทืื ืืื ืคึผืจืืืึธืจืึทืืืื ืืึทืกืงืก.
- - ื ืืฆืืง SQL ืงืืืืจืื ืคึฟืึทืจ ืึทืืจืคืืึธืื ืืขืืึทืืึทืืึท.
- - ืขืก ืืื ืึท ื ืืฆืืง ืึธืคึผืืืืืื ื ืืืขืื ืงืจืืืืืื ื ืึท ืื ืื ืกืขื ืกืขืจ.
- - ืึท ืืฉืืงืึทืืืข ืงืืจืฅ ืืึธื ืืืขืื ืืืืขื ืึทื ืื ืคืจืึทืกืืจืึทืงืืฉืขืจ ืืืืฃ AWS ืคึฟืึทืจ ืืึทืืึท ืืืืกื ืฉืึทืคึฟื.
- - ืคึผืจืึธืกื ืืืกืืืืงืก (ืืืขื ืขืืืขืืข ืืขื ืืฉื ื ืึธื ืืึธื ื ืื ืืืืขื ืขื ืื ืื ืกืืจืึทืงืฉืึทื ื).
- - ืฉืืืืื ืฆื ืืื ืืขื ืืฉื ืืขืจืึทื ืื ืืื ืกืืึธืจืื ื ืคึผืึทืกืืืขืจืื, ืืึธืืฉ ืืืจ ืงืขื ืขื ื ืึธืจ ื ืืฆื ืงืึทื ืขืงืฉืึทื ื.
- - ืืืคึผืืืกืึทื ืคืึธืจืืืขืจืืื ื DAG, ืืืึทืจืคื ืงืึธื ืืขืงืกื ืืื ืคืึทื ืืงืฉืึทื ื, ืืืืืขืจ ืืืขืื ืืืคึผืขื ืืึทื ืกืื, ืืื ืืืื ืืืขืื ืกืงืืคึผืื ื ืึทืจืืขื ืืึธื ืืฉืื.
- - ืืืขืื ื ืืฆื
default argumentsะธparamsืืื ืืขืืคึผืืึทืืขืก, ืืื ืืขืืื ื ืืื ืืืขืื ืืืขืจืืึทืืึทืื ืืื ืงืึทื ืขืงืฉืึทื ื. - - ืึท ืืขืจืฆืืืืื ื ืืืขืื ืืื ืื ืคึผืืึทื ืขืจ ืืื ืฆืืืขืืจืืื ืคึฟืึทืจ ืึทืืจืคืืึธืื 2.0.
- - ืึท ืืืกื ืึทืืืืืืืื ืึทืจืืืงื ืืืขืื ืื ืืืคึผืืืืืึทื ื ืคืื ืืื ืืืขืจ ืงื ืืื ืืื
docker-compose. - - ืืื ืึทืืืฉ ืืึทืกืงืก ื ืืฆื ืืขืืคึผืืึทืืขืก ืืื ืงืึธื ืืขืงืกื ืคืึธืจืืืขืจืืื ื.
- - ื ืึธืจืืึทื ืืื ืื ืื ื ืึธืืืึทืคืึทืงืืืฉืึทื ื ืืืจื ืคึผืึธืกื ืืื ืกืืึทืงืง.
- - ืืจืึทื ืืฉืื ื ืืึทืกืงืก, ืืึทืงืจืึธืก ืืื XCom.
ืืื ืืื ืงืก ืื ืืืึทืืืื ืืื ืืขื ืึทืจืืืงื:
- - ืึธืจืืืึธืืืขืจืก ืื ืืืฆื ืคึฟืึทืจ ื ืืฆื ืืื ืืขืืคึผืืึทืืขืก.
- โ ืคึผืจืึธืกืืข ืืขืืชืื ืืืื ืฉืืคื ืืขื.
- -
docker-composeืคึฟืึทืจ ืืงืกืคึผืขืจืึทืืึทื ืฅ, ืืืืึทืืื ื ืืื ืืขืจ. - - ืคึผืืืืึธื ืจืึทืคึผืขืจ ืคึฟืึทืจ ืืขืืขืืจืึทื REST API.
ืืงืืจ: www.habr.com




