แแแแแ แฏแแแ, แแ แแแ แแแแแขแ แ แแแแแแแแแแ - แแแแแแแแ Vezet แฏแแฃแคแแก แแแแแแขแแแแก แแแแแ แขแแแแแขแแก แแแแแชแแแแ แแแแแแแ แ.
แแ แแแขแงแแแ ETL แแ แแชแแกแแแแก แแแแแแแแ แแแแก แจแแกแแแแจแแแ แฎแแแกแแฌแงแแแ - Apache Airflow. แแแแ แแ Airflow แแแแแแแ แแ แแแแแแฎแ แแแ แแ แแ แแแแแแฎแ แแแแ, แ แแ แแฅแแแ แฃแแแ แแแแแแแ แแแ แแแก แแแจแแแแช แแ, แแฃ แแฅแแแ แแ แฎแแ แ แฉแแ แแฃแแ แแแแแชแแแแ แแแแแแแแจแ, แแแแ แแ แแญแแ แแแแแ แแแ แแแแฃแแแ แแแแฌแงแแก แแแแแกแแแแ แ แแ แแชแแกแ แแ แแแแแขแ แแแแ แแแแ แจแแกแ แฃแแแแ.
แแแแฎ, แแ แแ แ แแฎแแแแ แแแขแงแแแ, แแ แแแแ แแแฉแแแแแ: แแ แแแ แแแแก แแฅแแก แแแแ แ แแแแ, แแแ แแแแก แกแฃแ แแแ แแ แ แแแแแแแแแชแแแแ.

แ แแกแแช แฉแแแฃแแแแ แแ แฎแแแแแ แกแแขแงแแ Airflow / Wikimedia Commons-แจแ google-แจแ
แกแแ แฉแแแ
แจแแกแแแแแ
Apache Airflow แแกแแแแแแ, แ แแแแ แช Django:
- แแแแแแจแ แแแฌแแ แแแ
- แแ แแก แจแแกแแแแจแแแแ แแแแแแแกแขแ แแชแแฃแแ แแแแแแ,
- แแแคแแ แแแแแแแ แแแแฃแกแแแฆแแ แแแ แแแแแ
- แแฎแแแแ แฃแแแแแกแ, แแ แแก แแแแแแแ แกแ แฃแแแแ แแแแกแฎแแแแแแฃแแ แแแแแแแแกแแแแก, แแแ แซแแ (แ แแแแ แช แแแขแแก แฌแแ แฌแแ แแ):
- แแแแชแแแแแแก แแแจแแแแ แแ แแแแแขแแ แแแแ แจแแฃแแฆแฃแแแแ แ แแแแแแแแแก แแแแฅแแแแแแ (แ แแแแแแแแ แแแแฎแฃแ แ/แแฃแแแ แแแขแ แแ แแฅแแแแ แกแแแแแกแ แแแแชแแแ แกแแจแฃแแแแแแก)
- แแแแแแแฃแ แ แกแแแฃแจแแ แแแแแแแก แแแแแ แแ แแแแ แซแแแแแ แแแ แขแแแ แแแกแแฌแแ แ แแ แแแกแแแแแ แแแแแแแก แแแแแแแ
- แแ แแแแแกแแแแ แ แแแแแชแแแแ แแแแแก แแ API-แแแแก แแ แแแแแแแแแ แแแแแแจแแ แแแแก แจแแกแแซแแแแแแแ แ แแแแ แช แแแ แแแแแแแแแขแแแแก, แแกแแแ แกแแฎแแจแ แแแแแแแแแฃแแ แแแแแแแขแแแแก แแแแแงแแแแแแ (แ แแช แซแแแแแ แแแ แขแแแแ).
แฉแแแ แแแงแแแแแ Apache Airflow แแกแ:
- แฉแแแ แแแแ แแแแแ แแแแแชแแแแแก แกแฎแแแแแกแฎแแ แฌแงแแ แแแแแแแ (แแแแ แ SQL Server แแ PostgreSQL แแแกแขแแแชแแแแ, แกแฎแแแแแกแฎแแ API แแแแแแแชแแแแแก แแแขแ แแแแ, แแฃแแแแช 1C) DWH-แจแ แแ ODS-แจแ (แแแแฅแแก Vertica แแ Clickhouse).
- แ แแแแแแแ แแแฌแแแแแ
cron, แ แแแแแแช แแฌแงแแแก แแแแแชแแแแ แแแแกแแแแแแชแแแก แแ แแชแแกแแแก ODS-แแ แแ แแกแแแ แแแแแขแ แแแแแก แแแ แจแแแแ แฉแฃแแแแแก.
แแแแ แแ แแแแ แฉแแแแก แแแแฎแแแแแแแแแแก แคแแ แแแแ แแ แแ แแแขแแ แ แกแแ แแแ แ 32 แแแ แแแแ แแ 50 GB แแแแ แแขแแฃแแ แแแฎแกแแแ แแแแ. Airflow-แจแ แแก แแฃแจแแแแก:
- แแแขแ 200 แแแแ (แ แแแแฃแ แแ แกแแแฃแจแแ แแแแแแแแ, แ แแแแแแจแแช แฉแแแ แแแแแแแขแแ แแแแแแแแแแ),
- แแแแแแฃแแจแ แกแแจแฃแแแแ 70 แแแแแแแแ,
- แแก แกแแแแแ แแฌแงแแแ (แแกแแแ แกแแจแฃแแแแ) แกแแแแจแ แแ แแฎแแ.
แแ แแแแก แจแแกแแฎแแ, แแฃ แ แแแแ แแแแคแแ แแแแแแ, แฅแแแแแ แแแแฌแแ , แแแแ แแ แแฎแแ แแแแแกแแแฆแแ แแ รผber-แแ แแแแแแ, แ แแแแแกแแช แแแแแแญแ แแ:
แแ แกแแแแแก แกแแแ แฌแงแแ แ SQL แกแแ แแแ แ, แแแแแแฃแแก แแฅแแก 50 แแแแแชแแแแ แแแแ - แแ แแ แแ แแแฅแขแแก แแแแแแแแ, แจแแกแแแแแแกแแ, แแแ แแฅแแ แแแแแ แกแขแ แฃแฅแขแฃแ แ (แแแแฅแแแก แงแแแแแแ, mua-ha-ha), แ แแช แแแจแแแแก, แ แแ แแแแแแฃแแก แแฅแแก Orders แชแฎแ แแแ (แกแแแแแแแแ แแ, แชแฎแ แแแ แแ แกแแฎแแแ แจแแแซแแแแ แแแแแกแแแแ แแแแแแกแจแ แแงแแก แฉแแ แแฃแแ). แฉแแแ แแแฆแแแ แแแแแชแแแแแก แกแแ แแแกแแก แแแแแแแก แแแแแขแแแแ (แฌแงแแ แแก แกแแ แแแ แ, แฌแงแแ แแก แแแแแชแแแแ แแแแ, ETL แแแแชแแแแก ID) แแ แแฃแแฃแแ แงแแแแแ แแงแ แแ แแแ, แแแฅแแแ, Vertica-แจแ.
แแแแแ แฌแแแแแแ!
แซแแ แแแแแ แแแฌแแแ, แแ แแฅแขแแแฃแแ (แแ แชแแขแ แแแแ แแฃแแ)
แ แแขแแ แฉแแแ (แแ แแฅแแแ)
แ แแชแ แฎแแแแ แแแแ แแงแ แแ แแ แฃแแ แแแ SQLแแ แ แ แฃแกแฃแ แกแแชแแแ แแแญแ แแแแจแ, แฉแแแ แแแแแขแงแฃแแ ETL แแ แแชแแกแแแ, แแแฃ แแแแแชแแแแ แแแแแแแแ, แฉแแแแแแแก แฎแแแแแกแแฌแแแแแ แแ แ แแแกแขแ แฃแแแแขแแก แแแแแงแแแแแแ:
- Informatica Power Center - แฃแแแแฃแ แแกแแ แแแแ แชแแแแแฃแแ แกแแกแขแแแ, แฃแแแแฃแ แแกแแ แแ แแแฃแฅแขแแฃแแ, แกแแแฃแแแ แ แแแแ แแขแฃแ แแ, แกแแแฃแแแ แ แแแ แกแแแ. แฆแแแ แแแ แฅแแแก แแแกแ แจแแกแแซแแแแแแแแแแก 1% แแแแแแแงแแแ. แ แแขแแ? แแกแ, แแแ แแแ แ แแแจแ, แแก แแแขแแ แคแแแกแ, แกแแแฆแแช 380-แแแแ แฌแแแแแแแ, แแแแแแ แแแแ แแแฌแแแแก แแฎแแแแก แฉแแแแแ. แแแแ แแช, แแก แแแแขแ แแฅแชแแ แจแแฅแแแแแแ แฃแแแแฃแ แแกแแ แแแแแแ แแ แแชแแกแแแแกแแแแก, แแแแแแแแแขแแแแก แแฆแจแคแแแแแฃแแ แฎแแแแฎแแแ แแแแแงแแแแแแกแแแแก แแ แกแฎแแ แซแแแแแ แแแแจแแแแแแแแแ แกแแฌแแ แแแก แฎแ แแแแแแกแแแแก. แแแแก แจแแกแแฎแแ, แ แแ แฆแแ แก, แแกแแแ แ แแแแ แช Airbus AXNUMX แคแ แแ / แฌแแแแฌแแแจแ, แฉแแแ แแ แแคแแ แก แแแขแงแแแ.
แคแ แแฎแแแแ, แกแแ แแแจแแขแแ แจแแแซแแแแ แแแแแ แแแแแแ แแแแก 30 แฌแแแแแ แแแแแแแแแแ

- SQL Server แแแขแแแ แแชแแแก แกแแ แแแ แ - แแก แแแฎแแแแแ แแแแแแแงแแแแ แฉแแแแก แจแแแแแ แแแฅแขแฃแ แแแแแแแแจแ. แกแแแแแแแแแแจแ: แฉแแแ แฃแแแ แแแงแแแแแ SQL Server-แก แแ แ แแขแแแฆแแช แแ แแแแแแแ แฃแแ แแฅแแแแ แแ แแแแแแแงแแแแ แแแกแ ETL แแแกแขแ แฃแแแแขแแแ. แแแกแจแ แงแแแแแคแแ แ แแแ แแแ: แแแขแแ แคแแแกแแช แแแแแแแ แแ แแ แแแ แแกแแก แแแแแ แแจแแแ... แแแแ แแ แแก แแ แแ แแก แแแแแแ, แ แแก แแแแแช แฉแแแ แแแแงแแแ แก แแ แแแ แแแฃแแ แแ แแแฃแฅแขแแแ, แแฐ, แแ แ แแแแกแแแแก. แแแกแ แแแ แกแแ
dtsx(แ แแแแแแช แแ แแก XML แแแแแซแแแแ แจแแ แฌแงแแฃแแ แจแแแแฎแแแแ) แจแแแแแซแแแ, แแแแ แแ แ แ แแแ แ แแฅแแก? แ แแก แแขแงแแแ แแแแแแแแแก แแแแแขแแก แจแแฅแแแแแ, แ แแแแแแช แแแแแแขแแแก แแกแแแแ แชแฎแ แแแก แแ แแ แกแแ แแแ แแแแ แแแแ แแแ? แแแแฎ, แ แ แแกแแ, แแฅแแแแ แกแแฉแแแแแแแแ แแแแ แแชแ แชแแแแแแ แฉแแแแแแ แแแแ, แแแฃแกแแก แฆแแแแแแ แแแญแแ แแ. แแแแ แแ แแก แแแแแแแแแ แฃแคแ แ แแแแฃแ แแ แแแแแแงแฃแ แแแ:
แฉแแแ, แ แ แแฅแแ แฃแแแ, แแแซแแแแแ แแแแแกแแแแแก. แกแแฅแแ แแ ะฟะพััะธ แแแแแแ แแแแแแแฌแแ แ SSIS แแแแแขแแก แแแแแ แแขแแ แแแ ...
โฆแแ แแแ แ แแฎแแแ แกแแแกแแฎแฃแ แ แแแแแแ. แแ Apache Airflow-แแ แแแแแกแฌแ แ แแแกแแ.
แ แแแแกแแช แแแแแ แแแแ, แ แแ ETL แแ แแชแแกแแก แแฆแฌแแ แแแแแแแ แแแ แขแแแ แแแแแแแก แแแแแ, แฃแแ แแแแ แแ แแชแแแแแแแ แกแแฎแแ แฃแแแกแแแ. แแกแ แฎแแแแแแ แแแแแชแแแแ แแแแแแแแแก แแแ แกแแแแ แแ แแแแกแฎแแแแแแแแ แแ แแกแแแแ แแแแแชแแแแ แแแแแแแ แแ แแแแแ แกแขแ แฃแฅแขแฃแ แแก แแฅแแแ แชแฎแ แแแแแแก แแ แ แกแแแแแแแจแ แแแแแขแแแ แแแแแแแก แแแแแก แกแแแแแฎแ แแแฎแแ แแ แแแแฎแแแแ แแ แแ 13 แแแฃแแแแ แแแ แแแแ.
แแแแกแขแแ แแก แแฌแงแแแ
แแฃ แแแแแฌแงแแแ แแแแแแแแ แกแแแแแจแแ แแแฆแก แแ แแ แแแกแแฃแแ แแ แแฅ แกแ แฃแแแแ แชแฎแแ แ แแฆแแชแแแแแ, แ แแแแ แแชแแ Airflow-แแก, แแฅแแแ แแแแ แแ แฉแแฃแแ แแแแแชแแแแ แแแแแก, Celery-แแก แแ แแแแแแจแ แแฆแฌแแ แแแ แกแฎแแ แจแแแแฎแแแแแแแก แแแงแแแแแ.
แ แแแ แแแฃแงแแแแแแแแ แแแแแฌแงแแ แแฅแกแแแ แแแแแขแแแ, แแแแฎแแขแ แแกแแแแ docker-compose.yml แ แแแแแจแแช:
- แ แแแแฃแ แแ แแแฌแแแ แฐแแแ แแก: แแแแ แแแ, แแแแกแแ แแแ แ. แงแแแแแแ แแกแแแ แขแ แแแแแแก แแฅ แแแแฎแฃแ แแก แแแแชแแแแแแก แแแแแขแแ แแแแแกแแแแก (แ แแแแแ แแก แฃแแแ แฉแแกแแฃแแแ
apache/airflow:1.10.10-python3.7, แแแแ แแ แฉแแแ แฌแแแแแฆแแแแแ แแ แแแ แ) - PostgreSQL, แ แแแแแจแแช Airflow แฉแแฌแแ แก แแแแแก แกแแ แแแกแแก แแแคแแ แแแชแแแก (แแแแ แแแแก แแแแแชแแแแแก, แจแแกแ แฃแแแแแก แกแขแแขแแกแขแแแแก แแ แ.แจ.), แฎแแแ Celery แแแแแจแแแแก แแแกแ แฃแแแแฃแ แแแแแแแแแแก;
- Redis, แ แแแแแแช แจแแแกแ แฃแแแแก แกแแแฃแจแแ แแ แแแแ แก Celery-แกแแแแก;
- แแแแฎแฃแ แแก แแฃแจแ, แ แแแแแแช แแแแแแแแแแแก แฃแจแฃแแแ แจแแกแ แฃแแแแแจแ แแฅแแแแ แแแแแแแแฃแแ.
- แกแแฅแแฆแแแแแจแ
./dagsแฉแแแ แแแแแแแขแแแ แฉแแแแก แคแแแแแแก dags-แแก แแฆแฌแแ แแ. แแกแแแ แแแงแแแแแ แคแ แแแแก แแ แแก, แแแแขแแ แแ แแ แแก แกแแญแแ แ แงแแแแแ แแแชแแแแแแแแก แจแแแแแ แแแแแ แแแกแขแแแ แแแแแแแ แแแ.
แแแแแแ แ แแแแแแแก, แแแแแแแแแแจแ แแแแ แแ แแ แแก แกแ แฃแแแ แแแฉแแแแแแ (แแกแ, แ แแ แแ แแแฎแแแก แขแแฅแกแขแแก แแแแแแซแฃแ แแแ), แแแแ แแ แกแแแฆแแช แแแ แจแแชแแแแแแ แแ แแชแแกแจแ. แกแแแฃแจแแ แแแแแก แกแ แฃแแ แแแแแแแแแแ แจแแแแซแแแแ แแฎแแแแ แกแแชแแแจแ .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerแจแแแแจแแแแแ:
- แแแแแแแแชแแแก แแฌแงแแแแกแแก แแแแฌแแแแ แแแงแ แแแแแแแ แชแแแแแ แแแแฏแก - แแฃแชแแแแแแแ แจแแแแแฌแแแ. แจแแแซแแแแ แจแแแก แชแฎแแแ แแแแจแ แกแฎแแ แแ แแคแแ แ แแญแแ แแแก.
- แฐแแแ แแก แแแแแแแก แงแแแแ แแแ แแแแขแ แ แฎแแแแแกแแฌแแแแแแ แแ แ แแฎแแแแ แแแจแแแแแแ
airflow.cfg, แแ แแแแ แแแ แแแแก แชแแแแแแแแก แกแแจแฃแแแแแแ (แแแแแแแแแ แแแแก แฌแงแแแแแแ), แ แแแแแแแช แแ แแแแแแ แแแกแแ แแแแแ. - แแฃแแแแ แแแแ, แแก แแ แแ แแก แแแแ แฌแแ แแแแแแกแแแแก: แแ แแแแแ แแฎ แแ แแแแแงแแแ แแฃแแแกแชแแแ แแแแขแแแแแ แแแแ, แแ แจแแแแฌแฃแฎแ แฃแกแแคแ แแฎแแแแ. แแแแ แแ แแ แแแแแแแแ แฉแแแแ แแฅแกแแแ แแแแแขแแขแแ แแแแกแแแแก แจแแกแแคแแ แแกแ แแแแแแแแฃแ แ.
- แฒฉแแแแแจแแ:
- dag แกแแฅแแฆแแแแ แฎแแแแแกแแฌแแแแแ แฃแแแ แแงแแก แ แแแแ แช แแแแแแแแแแแกแแแแก, แแกแแแ แแฃแจแแแแแแกแแแแก.
- แแแแแ แแฎแแแ แแแกแแแ แแฎแแ แแก แงแแแแ แแแแแแแแแแแก - แแกแแแ แงแแแแ แฃแแแ แแงแแก แแแแแกแขแแแแ แแแฃแแ แแแแฅแแแแแแ, แ แแแแแแกแแช แแฅแแ แแ แแคแแแ แแ แแฃแจแแแ.
แแกแ, แแฎแแ แแก แแแ แขแแแแ:
$ docker-compose up --scale worker=3แแแก แจแแแแแ แ แแช แงแแแแแคแแ แ แแแแแ แแแแ, แจแแแแซแแแแ แแแแแฎแแแแ แแแ แแแขแแ แคแแแกแแแก:
- แฐแแแ แ:
- แงแแแแแแ:
แซแแ แแแแแ แชแแแแแแ
แแฃ แแฅแแแ แแแ แแคแแ แ แแแแแแ แงแแแแ แแ "แแแแจแ", แแแจแแ แแฅ แแ แแก แแแแแ แแแฅแกแแแแแ:
- Scheduler - แงแแแแแแ แแแแจแแแแแแแแแ แแแซแ Airflow-แจแ, แ แแแแแแช แแแแแขแ แแแแแก, แ แแ แ แแแแขแแแ แแแแ แก แแฃแจแแแแแ แแ แแ แ แแแแแแแแ: แแแแแขแ แแแแแก แแแแ แแแก, แแแแแแฎแแแแก แแแแแแแแแแก, แแฌแงแแแก แแแแแแแแแแก.
แแแแแแแ, แซแแแ แแแ แกแแแแจแ แแแก แฐแฅแแแแ แแ แแแแแแแแ แแแฎแกแแแ แแแแกแแแ (แแ แ, แแ แ แแแแแแแ, แแ แแแแ แแแแแแแ) แแ แแแแแแแแ แแแแแก แแแ แแแแขแ แ แแแแคแแแฃแ แแชแแแจแแช แแ แแแ แฉแ
run_duration- แแแกแ แแแแแขแแแ แแแแก แแแขแแ แแแแ. แแแแ แแ แแฎแแ แงแแแแแคแแ แ แแแ แแแแแ. - DAG (aka "dag") - "แแแแแ แแฃแแ แแชแแแแฃแ แ แแ แแคแแแ", แแแแ แแ แแกแแแ แแแแแแ แขแแแ แ แแแแแแแแ แแแแแแแแก แแขแงแแแก, แแแแ แแ แกแแแแแแแแแแจแ แแก แแ แแก แแแแขแแแแแ แ แแแแชแแแแแแกแแแแก, แ แแแแแแแช แฃแ แแแแ แแฅแแแแแแแ แแ แแแแแแแแแ (แแฎ. แฅแแแแแ) แแ แแแแแขแแก แแแแแแแ SSIS-แจแ แแ Workflow Informatica-แจแ. .
แแแฆแแแแก แแแ แแ, แจแแแซแแแแ แแ แกแแแแแแแก แฅแแแแแแแแ, แแแแ แแ, แกแแแแ แแฃแแแ, แแแ แแแ แแแแแฆแฌแแแ.
- DAG Run - แแแแชแแแแแแแแฃแแ dag, แ แแแแแกแแช แแแแญแแแ แกแแแฃแแแ แ
execution_date. แแแแแ แแแแแก แแแแ แแแแแก แจแแฃแซแแแแ แแแ แแแแแฃแ แแ แแแฃแจแแแ (แ แ แแฅแแ แฃแแแ, แแฃ แแแแแแแแแแ แแแแแแแขแแแขแฃแ แแ แแแแแแแ). - แแแแ แแขแแ แ แแ แแก แแแแแก แแแฌแแแแแ, แ แแแแแแแช แแแกแฃแฎแแกแแแแแแแแ แแ แแแ แแแแแ แแขแฃแแ แแแฅแแแแแแแก แจแแกแ แฃแแแแแแ. แแ แกแแแแแก แกแแแ แขแแแแก แแแแ แแขแแ แ:
- แแฅแชแแแฉแแแแ แกแแงแแแ แแแแแแ
PythonOperator, แ แแแแแกแแช แจแแฃแซแแแ แแแแแกแแแแ แ (แแแแแแฃแ แ) แแแแแแแก แแแแแก แจแแกแ แฃแแแแ; - แแแแแ แแชแฎแแ, แ แแแแแแช แแแแแกแชแแแก แแแแแชแแแแแก แแแแแแแแแ แแแแแแแ, แแแฅแแแ,
MsSqlToHiveTransfer; - แกแแแกแแ แ แแแแ แแก แแฎแ แแ, แแก แกแแจแฃแแแแแแก แแแแชแแแ แ แแแแแ แแแ แแแแฎแแแแแ แแ แจแแแแแแแ แแแแแก แจแแแแแแแ แจแแกแ แฃแแแแ, แกแแแแ แแแแแแแ แแ แแแฎแแแแ.
HttpSensorแจแแฃแซแแแ แแแแกแแแฆแแ แฃแแ แกแแแแแแ แฌแแ แขแแแแก แแแงแแแแ แแ แ แแแแกแแช แกแแกแฃแ แแแแ แแแกแฃแฎแ แแแแแแแ, แแแแฌแงแแ แแแแแชแแแGoogleCloudStorageToS3Operator. แชแแแแแกแแแงแแแ แ แแแแแแ แแแแแฎแแแก: โแ แแขแแ? แงแแแแแแแ แแแแก แจแแแแแ, แแฅแแแ แจแแแแซแแแแ แแแแแแแแ แแแแแแ แแแ แแแ แแแแแ แแแแ แแขแแ แจแ! โ แแ แจแแแแแ, แแแแกแแแแก, แ แแ แแ แแแแฎแฃแ แแก แแแแชแแแแแแก แแฃแแ แจแแฉแแ แแแฃแแ แแแแ แแขแแ แแแแ. แกแแแกแแ แ แแฌแงแแแ, แแแแฌแแแแก แแ แแแแแแ แแแแแแแแ แแชแแแแแแแแแ.
- แแฅแชแแแฉแแแแ แกแแงแแแ แแแแแแ
- แกแแแฃแจแแ - แแแแแแ แแ แแแฃแแ แแแแ แแขแแ แแแ, แแแแฃแ แฉแแแแแ แขแแแแกแ, แแ แแแแแแ แแแฃแแแ แแ แแแ แแแแแแแแแก แ แแแแจแ.
- แแแแแแแแแก แแแแแแแแ - แ แแแแกแแช แแแแแ แแแฃแ แแ แแแแแแแแแแแ แแแแแฌแงแแแขแ, แ แแ แแ แ แแงแ แแแแชแแแแแแก แแ แซแแแแจแ แแแแแแแแ แจแแแกแ แฃแแแแแ-แแฃแจแแแแแแ (แแแแแแแ, แแฃ แแแงแแแแแ
LocalExecutorแแ แแแกแขแแแชแแฃแ แ แแแแแซแแก แจแแแแฎแแแแแจแCeleryExecutor), แแก แแแแญแแแก แแแ แแแแขแแฅแกแขแก (แแแฃ แชแแแแแแแแก แแ แแแแแแแแ - แจแแกแ แฃแแแแแก แแแ แแแแขแ แแแ), แแคแแ แแแแแก แแ แซแแแแแแแแก แแ แจแแแแแฎแแแก แจแแแแแแแแก แแ แแแ แแแแแแแก แแแ.
แฉแแแ แแฅแแแแ แแแแแแแแแแก
แแแ แแแ แ แแแจแ, แแแแแ แแแแแแงแแ แฉแแแแ แชแแแแก แแแแแแ แกแฅแแแ แแ แจแแแแแ แฃแคแ แ แแ แฃแคแ แ แฉแแแฃแฆแ แแแแแแแแ แแแขแแแแแก, แ แแแแแ แแแงแแแแแ แ แแแแแแแแ แแ แแขแ แแแแแแฃแ แแแแแฌแงแแแขแแแแแแแก.
แแกแ แ แแ, แแแกแ แฃแแแ แขแแแแกแ แคแแ แแแ, แแกแแแ แแแคแ แแกแ แแแแแแงแฃแ แแแ:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)แแแแแ แแแแแ แแแแแ:
- แแแ แแแ แ แแแจแ, แฉแแแ แจแแแแแแขแแแ แกแแญแแ แ แแแแแแก แแ แแแแแ แ แแฆแแช;
sql_server_ds- แ แแแแแแแList[namedtuple[str, str]]แแแแจแแ แแแแก แกแแฎแแแแแแ Airflow Connections-แแแ แแ แแแแแชแแแแ แแแแแแแ, แกแแแแแแแช แแแแฆแแแ แฉแแแแก แคแแ แคแแขแแก;dag- แฉแแแแ แแแแแก แแแแชแฎแแแแแ, แ แแแแแแช แแฃแชแแแแแแแ แฃแแแ แแงแแกglobals()แแแ แแ Airflow แแแ แแแแแแก. แแฃแแแ แแกแแแ แฃแแแ แแฅแแแก:- แ แ แฅแแแ
orders- แจแแแแแ แแก แกแแฎแแแ แแแแแฉแแแแแ แแแ แแแขแแ แคแแแกแจแ, - แ แแ แแก แแแฃแจแแแแแก แ แแ แแแแแกแแก แจแฃแแฆแแแแแแ,
- แแ แแก แฃแแแ แแแฃแจแแแก, แแแแฎแแแแแแ แงแแแแ 6 แกแแแแจแ (แแแแชแ แ แแแญแแแแกแแแแก แแฅ แแแชแแแแ
timedelta()แแแกแแจแแแแแcron- แฎแแแ0 0 0/6 ? * * *, แแแแแแแแ แแแแแ แแกแแแแก - แแแแแแฅแแ แแแกแฌแแแก@daily);
- แ แ แฅแแแ
workflow()แแแแแแ แกแแฅแแแก แแแแแแแแแก, แแแแ แแ แแ แ แแฎแแ. แแ แแ แแแกแแแแก, แฉแแแ แฃแแ แแแแ แแแแแแแขแแแ แฉแแแแก แแแแขแแฅแกแขแก แแฃแ แแแแจแ.- แแฎแแ แแ แแแแแแแแแแแก แจแแฅแแแแก แแแ แขแแแ แแแแแ:
- แฉแแแ แแแแแแแแ แ แฉแแแแก แฌแงแแ แแแแจแ;
- แแแแชแแแแแแแชแแ
PythonOperator, แ แแแแแแช แจแแแกแ แฃแแแแก แฉแแแแก แแฃแแกworkflow(). แแ แแแแแแแฌแงแแแ แแแแแแแแแก แฃแแแแแแฃแ แ (แแแแแก แคแแ แแแแแจแ) แกแแฎแแแแก แแแแแแแแ แแ แแแแแ แแแแแก แแแแแ. แแ แแจแprovide_contextแแแแแก แแฎแ แแ, แแแแแขแแแแ แแ แแฃแแแแขแแแก แฉแแแกแฎแแแก แคแฃแแฅแชแแแจแ, แ แแแแแกแแช แฉแแแ แงแฃแ แแแฆแแแแ แแแแ แแแแแ**context.
แฏแแ -แฏแแ แแแแ แกแฃแ แแกแแ. แ แแช แแแแแฆแแ:
- แแฎแแแ แแแ แขแงแแ แแแ แแแขแแ แคแแแกแจแ,
- แแกแ แแแฎแแแแ แ แแแแแแแแ, แ แแแแแแช แจแแกแ แฃแแแแแ แแแ แแแแแฃแ แแ (แแฃ แแแแก แกแแจแฃแแแแแแก แแซแแแแ Airflow, Celery แแแ แแแแขแ แแแ แแ แกแแ แแแ แแก แแแชแฃแแแแ).
แแกแ, แแแแฅแแแก แแแแฎแแแ.

แแแ แแแแแแกแขแแแแ แแแก แแแแแแแแแแฃแแแแแแก?
แแ แงแแแแแคแ แแก แแแกแแแแ แขแแแแแแแ, แแ แฉแแแแแฅแ docker-compose.yml แแแแฃแจแแแแแ requirements.txt แงแแแแ แแแแแซแแ.
แแฎแแ แแก แแแฅแ แ:

แแแชแ แแกแคแแ แ แแแแแ แแขแแแ แแ แแก แแแแแแแแแก แแแกแขแแแชแแแแ, แ แแแแแแแช แแแแฃแจแแแแแฃแแแ แแ แแคแแแแก แแแแ .
แฉแแแ แชแแขแแก แแแแแแแแแ, แแแแแแแแแแก แแกแ แฃแแแแแ แแฃแจแแแ:

แแฌแแแแแแแแ, แ แ แแฅแแ แฃแแแ, แฌแแ แแแขแแแแ แแแแกแ แฃแแแก แกแแแฃแจแแ. แฌแแแแแแ แแ แช แแฃ แแกแ แฌแแ แแแขแแแฃแแแแ แแ แแแ.
แกแฎแแแแ แจแแ แแก, แฉแแแแก แกแแฅแแฆแแแแแแ แแ แแ แแก แกแแฅแแฆแแแแ
./dags, แแ แแ แแก แกแแแฅแ แแแแแแชแแ แแแแฅแแแแแก แจแแ แแก - แงแแแแ แแแ แขแงแแ แแแแกgitแฉแแแแก Gitlab-แแ แแ Gitlab CI แแแ แชแแแแแก แแแแแฎแแแแแแก แแแแฅแแแแแแ แแแแ แแแแแแแแกแแกmaster.
แชแแขแ แ แแ แงแแแแแแแก แจแแกแแฎแแ
แกแแแแ แแฃแจแแแ แฉแแแแก แกแแฌแแแแ แแก แฃแ แขแงแแแแ, แแแแแฎแกแแแแ แแแแแ แแ แแ แแแกแขแ แฃแแแแขแ, แ แแแแแกแแช แจแแฃแซแแแ แ แแฆแแชแแก แฉแแแแแแ - แงแแแแแแ.
แแแ แแแแแแ แแแแ แแ แแฃแจแแแ แแแแแซแแแแก แจแแกแแฎแแ แจแแแแฏแแแแแแแ แแแคแแ แแแชแแแ:

แงแแแแแแ แแแขแแแกแแฃแ แ แแแแ แแ แแแแชแแแแแแ, แ แแแแแแแช แแฃแจแแแแแแแ:

แงแแแแแแ แแแกแแฌแงแแแ แแแแ แแ แฉแแแแ แแ แแแแ แแก แกแขแแขแฃแกแแ:

แงแแแแแแ แแแแแแ แแแแ แแแ แแแแแแแแแก แกแขแแขแฃแกแแก แแ แแคแแแแแ แแ แแแแ แจแแกแ แฃแแแแแก แแ แ:

แแแขแแแ แแฃแแก แแขแแแ แแแแ
แแกแ แ แแ, แงแแแแ แแแแแแแแ แจแแกแ แฃแแแแฃแแแ, แจแแแแซแแแแ แแแญแ แแแแแ แฌแแแงแแแแแ.

แแ แแงแ แแแแ แ แแแญแ แแแ - แแแ แแฃ แแ แแแแแแแก แแแแ. Airflow-แแก แกแฌแแ แแ แแแแแงแแแแแแก แจแแแแฎแแแแแจแ, แกแฌแแ แแ แแก แแแแแ แแขแแแ แแแฃแแแแแแก แแแแแ, แ แแ แแแแแชแแแแแ แแแแแแแแแ แแ แแแแแแ.
แแฅแแแ แฃแแแ แฃแงแฃแ แแ แแฃแ แแแแก แแ แแแแแขแแแ แแแ แแแชแแแแ แแแแแแแแแก แจแแแแฎแแแแแแ.
แแแแแกแแแแ แแแแแ แแขแแ แแแฌแแแแฃแแแแแ แฉแแแ แแแแฎแแแ แฉแแแแแแแก แฎแแแแแกแแฌแแแแ แแแฅแแแแแแแแก:

แจแแแแซแแแแ แแแฆแแ แแ แแแแกแฃแคแแแแแ แแแชแแแฃแแ. แแแฃ, แฉแแแ แแแแแแฌแงแแแแ, แ แแ แแฅ แ แแฆแแช แแแ แแแฎแแ แฎแแ แแ แแแแแ แแแกแขแแแชแแแก แแแแชแแแ แแแแแแ แแแแ แแแแ.

แแแแแแแ, แ แแ แแแฃแกแแ แแแแก แแแแแแแแ แงแแแแ แฌแแแแแ แแแแแ แแขแแ แแ แแ แแก แซแแแแแ แฐแฃแแแแฃแ แ - แแแแก แแ แแแแแ Airflow-แกแแแ. แแฃแแแแ แแแแ, แฉแแแ แแแแฅแแก แแแกแแแ แแแ แแแแแแแฃแ แแแแก แแแ แแฆแ: Browse/Task Instances

แแแแแ แแแแ แฉแแแ แงแแแแแคแแ แ แแ แแแ แแฃแแแ แแ แแแแแแแแ แฃแแแ แแฃแแแ, แแแแฌแแแแฃแแแ แกแฌแแ แแฃแแฅแขแแ:

แแแกแฃแคแแแแแแแก แจแแแแแ แฉแแแแ แขแแฅแกแแแ แแกแ แแแแแแงแฃแ แแแ (แแกแแแ แฃแแแ แแแแแ แแแแ แแแแก แแแแแจแแแแก):

แแแแจแแ แแแ, แแแแแแแ แแ แกแฎแแ แชแแแแแแแ
แแ แแ แแแแแแฎแแแแ แจแแแแแ DAG-แก, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ะะพัะฟะพะดะฐ ั
ะพัะพัะธะต, ะพััะตัั ะพะฑะฝะพะฒะปะตะฝั"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ะะฐัะฐั, ะฟัะพััะฟะฐะนัั, ะผั {{ dag.dag_id }} ััะพะฝะธะปะธ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]แงแแแแแ แแแแกแแ แแแแแแแ แแแแแ แแจแแก แแแแแฎแแแแ? แแก แแ แแก แแกแแ แแก: แแ แแก แฌแงแแ แแแแแก แกแแ, แกแแแแแแแช แฃแแแ แแแแฆแแ แแแแแชแแแแแ; แแ แแก แกแแ, แกแแแแช แแแแแแแกแแแแ; แแ แแแแแแแฌแงแแแ แแแ แแแแ, แ แแแแกแแช แงแแแแแคแแ แ แแแฎแแ แแ แแแคแฃแญแแ (แแแ แแ, แแก แฉแแแแแ แแ แแ แแก, แแ แ).
แแแแแ แแแแแ แแแแแแฎแแแแ แคแแแแก แแ แแแแแแฎแแแแ แแฎแแ แแฃแแแแแแ แแแแแแแก:
from commons.operators import TelegramBotSendMessage- แแ แแคแแ แ แแแแจแแแก แฎแแแก แจแแแแแฅแแแ แกแแแฃแแแ แ แแแแ แแขแแ แแแ, แ แแแแช แแแกแแ แแแแแแ Unblocked-แแ แจแแขแงแแแแแแแแแแก แแแกแแแแแแแแ แแแขแแ แ แจแแคแฃแแแแ. (แแ แแแแ แแขแแ แแ แแแฌแแ แแแแแแ แฅแแแแแ แแแกแแฃแแ แแแ);default_args={}- dag-แก แจแแฃแซแแแ แแ แแ แแ แแแแแ แแ แแฃแแแแขแแแแก แแแแ แชแแแแแ แแแก แงแแแแ แแแแ แแขแแ แแ;to='{{ var.value.all_the_kings_men }}'- แแแแtoแฉแแแ แแ แแแแฅแแแแ แแงแแ แ แแแแแ แแแฃแแ, แแแแ แแ แแแแแแแฃแ แแ แแแแแ แแ แแแฃแแ Jinja-แก แแแแแงแแแแแแ แแ แชแแแแแ แแ.แคแแกแขแแก แกแแแ, แ แแแแแแช แแ แคแ แแฎแแแแ แฉแแแแAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- แแแแ แแขแแ แแก แแแจแแแแแก แแแ แแแ. แฉแแแแก แจแแแแฎแแแแแจแ, แฌแแ แแแ แฃแคแ แแกแแแแกแแแ แแแคแ แแแแแก แแฎแแแแ แแ แจแแแแฎแแแแแจแ, แแฃ แงแแแแ แแแแแแแแแแฃแแแแ แแแแฃแจแแแแ แฌแแ แแแขแแแแ;tg_bot_conn_id='tg_main'- แแ แแฃแแแแขแแแconn_idแแแแแฆแแ แแแแจแแ แแก ID-แแแ, แ แแแแแแจแแช แฉแแแ แแฅแแแแAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Telegram-แจแ แจแแขแงแแแแแแแแแ แแแคแ แแแแแแ แแฎแแแแ แแ แจแแแแฎแแแแแจแ, แแฃ แแ แกแแแแแก แแแแชแแแแแ;task_concurrency=1- แฉแแแ แแแ แซแแแแแ แแ แแ แแแแชแแแแก แ แแแแแแแแ แแแแชแแแแก แแ แแแ แแฃแ แแแจแแแแแก. แฌแแแแแฆแแแแ แจแแแแฎแแแแแจแ, แฉแแแ แแแแแฆแแแ แ แแแแแแแแ แแ แแแ แแฃแแแ แแแจแแแแแกVerticaOperator(แแ แ แแแแแแแก แฃแงแฃแ แแแก);report_update >> [email, tg]- แงแแแแVerticaOperatorแแแแฎแแแแ แ แฌแแ แแแแแแกแ แแ แจแแขแงแแแแแแแแแแก แแแแแแแแแกแแก, แ แแแแ แแชแแ:

แแแแ แแ แ แแแแแ แจแแแขแงแแแแแแแแ แแแแ แแขแแ แแแก แแฅแแ แแแจแแแแแก แแแแกแฎแแแแแแฃแแ แแแ แแแแแ, แแฎแแแแ แแ แแ แแแฃแจแแแแแก. แฎแแแแแก แฎแแแจแ แงแแแแแคแแ แ แแแแแแแแ แแแแฃแแแฃแ แแ แแแแแแงแฃแ แแแ:

แ แแแแแแแแ แกแแขแงแแแก แแแขแงแแ แแแแก แจแแกแแฎแแ แแแแ แแแแ แแ แแแแ แแแแแแ แแแ - แชแแแแแแแ.
แแแแ แ แแ แแก Jinja แฉแแแแชแแแแแ, แ แแแแแกแแช แจแแฃแซแแแ แจแแชแแแแแก แกแฎแแแแแกแฎแแ แกแแกแแ แแแแแ แแแคแแ แแแชแแ แแแแ แแขแแ แแก แแ แแฃแแแแขแแแจแ. แแแแแแแแแ, แแกแ:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} แแแคแแ แแแแแแแ แแแแขแแฅแกแขแฃแ แ แชแแแแแแก แจแแแแแ แกแแแแ execution_date แคแแ แแแขแจแ YYYY-MM-DD: 2020-07-14. แกแแฃแแแแแกแ แแแฌแแแ แแ แแก แแก, แ แแ แแแแขแแฅแกแขแฃแ แ แชแแแแแแแ แแแแแแ แแแฃแแแ แแแแแ แแขแฃแแ แแแแชแแแแก แแแแแแแแแ (แแแแแ แแขแ แฎแแแแแก แฎแแแจแ) แแ แแแแแขแแแ แแแแกแแก, แฉแแแแชแแแแแแก แแแแแแ แแแคแแ แแแแแแแ แแแแแ แแแแจแแแแแแแแแแ.
แแแแแญแแแฃแแ แแแแจแแแแแแแแแแก แแแฎแแ แจแแกแแซแแแแแแแ Rendered แฆแแแแแแก แแแแแงแแแแแแ แแแแแแฃแ แแแแแแแแแก แแแกแขแแแชแแแแ. แแกแแ แแแแแแแแ แฌแแ แแแแก แแแแแแแแแ:

แแกแ แ แแ, แแแแแแแแแก แจแแกแ แฃแแแแแกแแก แจแแขแงแแแแแแแแก แแแแแแแแ:

แฃแแฎแแแกแ แฎแแแแแกแแฌแแแแแ แแแ แกแแแกแแแแก แฉแแจแแแแแฃแแ แแแแ แแแแแก แกแ แฃแแ แกแแ แฎแแแแแกแแฌแแแแแแ แแฅ:
แฃแคแ แ แแแขแแช, แแแแแแแขแแแแก แแแฎแแแ แแแแ แฉแแแ แจแแแแแซแแแ แแแแแแแชแฎแแแแ แกแแแฃแแแ แ แแแแ แแแแ, แแแแ แแ แแก แกแฎแแ แแแแแแแ.
แแแ แแ แฌแแแแกแฌแแ แแแแกแแแฆแแ แฃแแ แแแแแแแแกแ, แฉแแแ แจแแแแแซแแแ แจแแแชแแแแแ แฉแแแแ แชแแแแแแแแก แแแแจแแแแแแแแแ (แแก แฃแแแ แแแแแแแงแแแ แแแแแ แแแชแแแฃแ แแแแจแ). แจแแแฅแแแแ แจแแแแแ Admin/Variables แ แแแแแแแแ แ แแ:

แงแแแแแคแแ แ แ แแช แจแแแแซแแแแ แแแแแแงแแแแ:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')แแแแจแแแแแแแ แจแแแซแแแแ แแงแแก แกแแแแแ แ, แแ แแกแแแ แจแแแซแแแแ แแงแแก JSON. JSON-แแก แจแแแแฎแแแแแจแ:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}แฃแแ แแแแ แแแแแแงแแแแ แแแ แกแแกแฃแ แแแแ แแแกแแฆแแแแกแแแ: {{ var.json.bot_config.bot.token }}.
แกแแขแงแแแกแแขแงแแแ แแแขแงแแ แแ แ แกแแขแงแแแก แแ แแแฉแแแแแ แแ แ แกแแ แแแจแแขแก แแแแจแแ แแแ. แแฅ แงแแแแแคแแ แ แแแแแแแขแแ แฃแแแ: แแแแ แแแ Admin/Connections แฉแแแ แแฅแแแแ แแแแจแแ แก, แแแแแขแแแ แฉแแแแก แจแแกแแแแก/แแแ แแแก แแ แฃแคแ แ แแแแแ แแขแฃแ แแแ แแแแขแ แแแก. แฒแแแแแ แแ:

แแแ แแแแแ แจแแแซแแแแ แแงแแก แแแจแแคแ แฃแแ (แฃแคแ แ แกแแคแฃแซแแแแแแแ, แแแแ แ แแแแฃแแแกแฎแแแแ), แแ แจแแแแซแแแแ แแแแแขแแแแ แแแแจแแ แแก แขแแแ (แ แแแแ แช แแ แแแแแแแแ tg_main) - แคแแฅแขแแ, แ แแ แขแแแแแแก แกแแ แแแแแแ แแแฃแแแ Airflow-แแก แแแแแแแแจแ แแ แแ แจแแแซแแแแ แแแคแแ แแแแแแก แกแแฌแงแแก แแแแแแจแ แแแฎแแแแ แแก แแแ แแจแ (แแฃ แฃแชแแ แแ แแแแซแแแแ แ แแแ, แจแแแแกแฌแแ แแ), แแแแ แแ แแ แแคแแ แ แจแแแแแจแแแก แแ แแแแขแแแแก แแแฆแแแแจแ แแฎแแแแ แกแแฎแแแ.
แแฅแแแ แแกแแแ แจแแแแซแแแแ แแแแแแแแ แ แแแแแแแแ แแแแจแแ แ แแแแแ แกแแฎแแแแ: แแ แจแแแแฎแแแแแจแ, แแแแแแ BaseHook.get_connection(), แ แแแแแแช แแแแซแแแแก แแแแจแแ แแแก แกแแฎแแแแ, แแแแแชแแแก แจแแแแฎแแแแแแ แ แแแแแแแแ แกแแฎแแแแแแ (แฃแคแ แ แแแแแแฃแ แ แแฅแแแแแแ Round Robin-แแก แแแแแแแแ, แแแแ แแ แแแแแ แแก Airflow แแแแแแแแแ แแแแก แกแแแแแกแแ แแแแขแแแแ).
แชแแแแแแแ แแ แแแแจแแ แแแ, แ แ แแฅแแ แฃแแแ, แแแแแ แ แแแกแขแ แฃแแแแขแแแแ, แแแแ แแ แแแแจแแแแแแแแแแ, แ แแ แแ แแแแแ แแแ แแแแแแกแ: แแฅแแแแ แแแแแแแแแก แ แแแแ แแแฌแแแแแก แแแแฎแแแ แแแแแ แแแแจแ แแ แ แแแแ แแแฌแแแแแก แแซแแแแ Airflow-แก แจแแกแแแแฎแแ. แแ แแแก แแฎแ แแ, แแแกแแฎแแ แฎแแแแแแ แแแแจแแแแแแแแก แกแฌแ แแคแแ แจแแชแแแ, แแแแแแแแแ, แกแแคแแกแขแ แงแฃแแ, แแแขแแ แคแแแกแแก แกแแจแฃแแแแแแ. แแแแ แแก แแฎแ แแ, แแก แแแแแ แแแฃแกแแก แแแฌแแแแฃแแแแแแ แแแแ แฃแแแแแ, แ แแแแแกแแแแแช แฉแแแ (แแ) แแแแแแแแ แแแแแก แแแฆแฌแแแ.
แแแแจแแ แแแแแ แแฃแจแแแแ แแ แ-แแ แแ แแแแชแแแแ แแแแแแแ. แแแแแแแ, แกแแฐแแแ แ แแแแแแแก แแแแแแแ แแ แแก แแแกแแแ แแฎแแ แแก แกแแ แแแกแแแแแ แแ แแแแแแแแแแแแแแ แแแแแแจแแ แแแแก แฌแแ แขแแแแแ. แฒแแแแแแแแ, JiraHook แแแฎแกแแแก แแแแแแขแก แฉแแแแแแแก แฏแแ แแกแแแ แฃแ แแแแ แแแแแกแแแแก (แจแแแแซแแแแ แแแแชแแแแแแก แแแแแขแแแ แฌแแ แแ แฃแแแ) แแ แแแฎแแแ แแแแ SambaHook แจแแแแซแแแแ แแแแงแแแแ แแแแแแแแ แแแ แคแแแแ smb- แฌแแ แขแแแ.
แแแ แแแแฃแแ แแแแ แแขแแ แแก แแแ แฉแแแ
แแ แฉแแแ แแแแฃแแฎแแแแแแ แแแแก แงแฃแ แแแแก, แแฃ แ แแแแ แแแแแแแแ TelegramBotSendMessage
แแแแ commons/operators.py แ แแแแฃแ แแแแ แแขแแ แแแ:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)แแฅ, แแกแแแ แ แแแแ แช แงแแแแแคแแ แ Airflow-แจแ, แงแแแแแคแแ แ แซแแแแแ แแแ แขแแแแ:
- แแแแแแแแ แแแแแ แแแฆแแแฃแแ
BaseOperator, แ แแแแแแช แแฎแแ แชแแแแแแก แฐแแแ แแก แแแแแแแก แกแแแชแแคแแแฃแ แ แแแแแแแแ แแแแแก (แจแแฎแแแแ แแฅแแแแก แแแกแแแแแแแก) - แแแแแชแฎแแแแแฃแแ แแแแแแ
template_fields, แ แแแแแจแแช Jinja แแซแแแก แแแแ แแแแก แแแกแแแฃแจแแแแแแแ. - แแแแฌแงแ แกแฌแแ แ แแ แแฃแแแแขแแแ
__init__()แกแแญแแ แแแแแก แจแแแแฎแแแแแจแ แแแแงแแแแ แแแแฃแแแกแฎแแแแ แแแ แแแแขแ แแแ. - แแ แช แฌแแแแแ แแก แแแแชแแแแแแแชแแ แแแแแแแฌแงแแ.
- แแแฎแกแแแแ แจแแกแแแแแแกแ แแแแแแ
TelegramBotHookแแแกแแแ แแแแฆแ แแแแแแขแแก แแแแแฅแขแ. - แแแแแคแแ แฃแแ (แฎแแแแฎแแแ) แแแแแแ
BaseOperator.execute(), แ แแแแแกแแช Airfow แแขแ แแแแแแก, แ แแแแกแแช แแแแ แแขแแ แแก แแแจแแแแแก แแ แ แแแแแแแ - แแแกแจแ แฉแแแ แแแแแแฎแแ แชแแแแแแ แแแแแแ แแแฅแแแแแแแก, แแแแแแแแฌแงแแแแ แจแแกแแแ. (แฉแแแ แจแแแแแแแ แ, แกแฎแแแแ แจแแ แแก, แแแ แแแแแstdoutะธstderr- แฐแแแ แแก แแแแแแ แแแแแญแ แแก แงแแแแแคแแ แก, แแแแแแแ แจแแแแแฎแแแแก, แกแแญแแ แแแแแก แจแแแแฎแแแแแจแ แแแจแแแก.)
แแแแฎแแ แ แ แแแแฅแแก commons/hooks.py. แคแแแแแก แแแ แแแแ แแแฌแแแ, แแแแแ แแแฃแญแแ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientแแ แช แแ แแแชแ, แ แ แแแฎแกแแ แแฅ, แฃแแ แแแแ แแฆแแแแจแแแ แแแแจแแแแแแแแ แแฃแแฅแขแแแก:
- แฉแแแ แแแฆแแแ แแแแแแแแ แแแแแก, แแคแแฅแ แแแ แแ แแฃแแแแขแแแแ - แฃแแแขแแก แจแแแแฎแแแแแจแ แแก แแฅแแแแ แแ แแ:
conn_id; - แกแขแแแแแ แขแฃแแ แแแแแแแแแก แแแแแแแฎแแ: แแ แจแแแแฆแฃแแ แแแแ
get_conn(), แ แแแแแจแแช แแแฆแแ แแแแจแแ แแก แแแ แแแแขแ แแแก แกแแฎแแแแ แแ แฃแแ แแแแ แแแฆแแ แแแแงแแคแแแแแแกextra(แแก แแ แแก JSON แแแแ), แ แแแแแจแแช แแ (แฉแแแ แแแกแขแ แฃแฅแชแแแก แแแฎแแแแแ!) แแแแ Telegram-แแก แแแขแแก แแแขแแแก:{"bot_token": "YOuRAwEsomeBOtToKen"}. - แแ แแฅแแแ แแแแแแแแก แฉแแแแ
TelegramBot, แแแกแชแแก แแแก แแแแแ แแขแฃแแ แแแจแแแ.
แฒกแฃแ แแก แแ แแก. แแฅแแแ แจแแแแซแแแแ แแแแฆแแ แแแแแแขแ แแแฃแญแแแแ แแแแแงแแแแแแ TelegramBotHook().clent แแ TelegramBotHook().get_conn().
แแ แคแแแแแก แแแแ แ แแแฌแแแ, แ แแแแแจแแช แแแแแแแ แแแแ แแจแแคแฃแแแแก Telegram REST API-แกแแแแก, แ แแ แแแแแ แแ แแแแแแขแแแแ แแ แแ แแแแแแแกแแแแก sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))แกแฌแแ แ แแแ แแ แแก แงแแแแแคแ แแก แแแแแขแแแ:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- แแแแแแแขแจแ แฉแแแแ แกแแฏแแ แ แกแแชแแแจแ แแ แแแแชแแ แฆแแ แฌแงแแ แแก.
แกแแแแ แแ แงแแแแแคแแ แก แแกแฌแแแแแแแแ, แฉแแแแ แแแฎแกแแแแแแก แแแแแฎแแแแแแ แฌแแ แแแขแแแแ แฉแแแแ แแ แแ แแ แฎแแ แจแแชแแแแแก แจแแขแงแแแแแแแ แแแแแแแแแแแแ. แแแแแ แแ แจแแแแแแฌแแ, แแ แแกแฌแแ แแ แแฃ แแ แ...

แ แแฆแแช แแแขแแฎแ แฉแแแแก แแแแจแ! แแแแก แแ แแแแแแแ? แแฃแกแขแแ!
แแแแแแแก แแแแ แแ?
แแ แซแแแ, แ แแ แ แแฆแแช แแแแแแ แฉแ? แแขแงแแแ SQL Server-แแแแ แแแ แขแแแแจแ แแแแแชแแแแแแก แแแแแขแแแแก แแแแแแ แ แแ แแแ แ แแแฆแ แแ แแแแแแแ แแแแแแแแ แแแซแแ แแแ!
แแก แกแแกแแกแขแแแ แแงแ แแแแแแแแแแ แแฃแแ, แแ แฃแแ แแแแ แแแแแฌแแ แแฅแแแแแแแก แ แแแแ แขแแ แแแแแแแแแแก แแแจแแคแแ แ. แแฎแแ แจแแแแซแแแ แฃแคแ แ แจแแ แก แฌแแฎแแแแ.
แฉแแแแ แแแแแ แแกแแแ แแงแ:
- แแแแแแแ
- แแแแชแแแแแแก แแแแแ แแ แแแ
- แแแฎแแ แ แ แแแแแแแ แงแแแแแคแแ แ
- แแแแแแญแแ แกแแกแแแก แแแแ แแแ แจแแแกแแแแก
- แแแแฆแแ แแแแแชแแแแแ SQL แกแแ แแแ แแแแ
- แฉแแแแ แแแแแชแแแแแ แแแ แขแแแแจแ
- แจแแแแ แแแแ แกแขแแขแแกแขแแแ
แแกแ แ แแ, แแแแกแแแแก, แ แแ แแก แงแแแแแคแแ แ แแแแฅแแแแแแก, แแ แแแแแแแแ แแแขแแ แ แแแแแขแแแ แฉแแแแก docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyแแฅ แฉแแแ แแแ แแแ:
- แแแ แขแแแ, แ แแแแ แช แแแกแแแแซแแแ
dwhแงแแแแแแ แแแแฃแแแกแฎแแแแ แแแ แแแแขแ แแแแ, - SQL แกแแ แแแ แแก แกแแแ แแแกแขแแแชแแ,
- แฉแแแ แแแแกแแแ แแ แฃแแแแแกแแแแแแก แแแแแชแแแแ แแแแแแก แแแ แแแแฃแแ แแแแแชแแแแแแ (แแ แแแแแแ แจแแแแฎแแแแแจแ แแ แจแแแแกแฌแแแแแ
mssql_init.py!)
แฉแแแ แแแฌแงแแแ แงแแแแ แแแ แแก แแแแแ แฃแคแ แ แ แแฃแแ แแ แซแแแแแแก แแแฎแแแ แแแแ, แแแแ แ แฌแแแ แแ แแก:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3แแก, แ แแช แฉแแแแแ แกแแกแฌแแฃแแแ แ แแแแแแแแแ แแ แจแแฅแแแ, แจแแแแซแแแแ แแแแแแงแแแแ แแแแแ Data Profiling/Ad Hoc Query:

แแแแแแ แแ แแก แแ แแฉแแแแ แแแแแแขแแแแกแแแก
แแแแฃแจแแแแแ ETL แกแแกแแแแ แแ แแแแแแแแแ, แแฅ แงแแแแแคแแ แ แขแ แแแแแแฃแ แแ: แฉแแแ แแแแแแแแ แแแแแก, แแ แแก แแแกแจแ แแแจแแแ, แงแแแแแคแแ แก แแแฎแแแแ แแแแขแแฅแกแขแแก แแแแแฏแแ แแแ แแ แแฎแแ แแแแก แแแแแแแแ:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passแฒแ แ แแแแแแ แจแแแแ แแแแ แฉแแแแ แแแแแชแแแแแ แฉแแแแ แแกแแแฎแแแแ แ แแแแแแแแแ. แแแแแ แแแแแแแแแ แแก แซแแแแแ แแ แแแ แแขแแแแแฃแแ แฎแแแแแแก แแแฎแแแ แแแแ:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- แฐแฃแแแก แแแฎแแแ แแแแ แแแฆแแแ Airflow-แแแ
pymssql- แแแแแแจแแ แแแ - แแแแแ แจแแแชแแแแแ แแแแฎแแแแแจแ แจแแแฆแฃแแแ แแแ แแฆแแก แกแแฎแแ - แแก แฉแแแแแแแก แคแฃแแฅแชแแแก แจแแแแแแแก แซแ แแแแก แแแแ .
- แแแแแแแก แฉแแแแก แแแแฎแแแแแก
pandasแแแ แแแแแแฆแแแกDataFrame- แแแแแแแแจแ แแแแแแแแแแแแ.
แแ แแแงแแแแ แฉแแแแชแแแแแแก
{dt}แแแแฎแแแแแก แแแ แแแแขแ แแก แแแชแแแแ%sแแ แ แแแแขแแ, แ แแ แแแ แแขแ แแแแแฅแแ แแแ , แแ แแแแ แแแแขแแpandasแแแ แฃแแแแแแแแแpymssqlแแ แกแ แแแแแแก แฃแแแแแกแแแแแกparams: Listแแฃแแชแ แแแก แแแแแแแแแ แกแฃแ แกtuple.
แแกแแแ แแแแแแแแแกแฌแแแแ, แ แแ แแแแแแแแแ แpymssqlแแแแแฌแงแแแขแ แแฆแแ แแแแญแแ แ แแแกแแแแก แแฎแแ แ แแ แแ แแ แแแแแฎแแแแpyodbc.
แแแแฎแแ, แ แแแ แแแกแแแแ Airflow-แแ แฉแแแแ แคแฃแแฅแชแแแแแก แแ แแฃแแแแขแแแ:

แแฃ แแแแแชแแแแแ แแ แแ แแก, แแแจแแ แแแแ แซแแแแแแก แแแ แ แแ แแฅแแก. แแแแ แแ แแกแแแ แฃแชแแแฃแ แแ แจแแแกแแแแก แฌแแ แแแขแแแฃแแแ แแแฉแแแแ. แแแแ แแ แแก แแ แแ แแก แจแแชแแแแ. แ-แ-แแฐ แ แ แแฅแแ?! แแ แแ แ แ:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException แแขแงแแแก Airflow-แก, แ แแ แจแแชแแแแแแ แแ แแ แแก, แแแแ แแ แฉแแแ แแแแแแขแแแแแ แแแแแแแแแก. แแแขแแ แคแแแกแก แแ แแฅแแแแ แแฌแแแแ แแ แฌแแแแแ แแแแแ แแขแ, แแ แแแแ แแแ แแแกแคแแ แ.
แแแแแ แแแแแแงแแ แแ แฉแแแแ แแแแแชแแแแแ แแ แแแแแ แกแแแขแ:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])แแแ แซแแ
- แแแแแชแแแแ แแแแ, แกแแแแแแแช แแแแฆแแ แจแแแแแแแแ,
- แฉแแแแ แแแขแแแ แแแก แกแแกแแแก ID (แแก แแแแกแฎแแแแแแฃแแ แแฅแแแแ แงแแแแแ แแแแชแแแแกแแแแก),
- แฐแแจแ แฌแงแแ แแแแ แแ แจแแแแแแแก ID - แแกแ, แ แแ แกแแแแแแ แแแแแชแแแแ แแแแแจแ (แกแแแแช แงแแแแแคแแ แ แแ แ แชแฎแ แแแจแแ แฉแแกแแฃแแ) แแแฅแแแแแก แฃแแแแแแฃแ แ แจแแแแแแแก ID.
แ แฉแแแ แแแแ แแแแแฏแ: แฉแแแกแฎแแ แงแแแแแคแแ แ แแแ แขแแแแจแ. แแ, แฃแชแแแฃแ แแ แกแแแแแ แแกแ, แแแแก แแแแแแแแแก แแ แ-แแ แแ แงแแแแแแ แกแแแแฎแแแแ แแแ แแ แแคแแฅแขแฃแ แ แแแแ CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- แฉแแแ แแแแแแแแแ แกแแแชแแแแฃแ แแแแฆแแแก
StringIO. pandasแแแแแแแ แแแแงแแแแแก แฉแแแแDataFrameแกแแฎแแCSV- แฎแแแแแ.- แแแแแ แแแแฎแกแแแ แแแแจแแ แ แฉแแแแก แกแแงแแแ แแ แแแ แขแแแแกแแแ แแแฃแญแแ.
- แแฎแแ แแ แแแฎแแแ แแแแ
copy()แแแแแแแแแ แฉแแแแ แแแแแชแแแแแ แแแ แแแแแ แแแ แขแแแแจแ!
แแซแฆแแแแกแแแ แแแแฆแแแ แ แแแแแแ แฎแแแ แแงแ แจแแแกแแแฃแแ แแ แกแแกแแแก แแแแแฏแแ แก แแแฃแแแแแแ, แ แแ แงแแแแแคแแ แ แฌแแกแ แแแจแแ:
session.loaded_rows = cursor.rowcount
session.successful = Trueแฒกแฃแ แแก แแ แแก.
แแแงแแแแแกแแก แฉแแแ แแฅแแแแ แกแแแแแแ แคแแ แคแแขแแก แฎแแแแ. แแฅ แแ แแแแแชแ แฉแแแก แแแแก แแแขแแ แ แแแแฅแแแ:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)แแ แแแงแแแแ
VerticaOperator()แแ แแฅแแแ แแแแแชแแแแ แแแแแก แกแฅแแแแก แแ แชแฎแ แแแก (แแฃ แฃแแแ แแ แแ แกแแแแแก, แ แ แแฅแแ แฃแแแ). แแแแแแ แแ แแแแแแแแแแฃแแแแแแแก แกแฌแแ แแ แแแฌแงแแแ:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadแจแแฏแแแแแ
- แแแ แแ, - แแฅแแ แแแขแแ แ แแแแแแ, - แแ แ, แแฎแแ
แแแ แฌแแฃแแแแฃแแ แฎแแ , แ แแ แแ แแแ แงแแแแแแ แกแแจแแแแแ แชแฎแแแแแ แขแงแแจแ?
แฏแฃแแแ แแแแแแแกแแแ, แแ แแคแแแ
แแคแแฅแ แแ, แแ แแ แฉแแแก แแแแแแแแก แ แแ แแแฅแแแแแก แแแแแฃแ แกแ: แแแ แกแฌแ แแคแแ แจแแฅแแแแก แแ แแแแฌแงแแแก ETL แแ แแชแแกแก แแฃแแแแแ: แแกแแแ แแแแแแแแ SSIS-แแ แแ แแแฃแกแแ แแ แแ Airflow-แแ... แแ แจแแแแแ แฉแแแ แแกแแแ แจแแแแแแ แแแ แแแแแแก แกแแแแ แขแแแแก... แแแแแ, แแแแแ แแแแแแแแฎแแแแแ, แ แแ แงแแแแ แคแ แแแขแแ แแแแแแแ แชแฎแแ!
แแฃ แชแแขแ แฃแคแ แ แกแแ แแแแฃแแแ, แแแจแแ Apache Airflow - แแ แแชแแกแแแแก แแฆแฌแแ แแ แแ แแแ แแแแก แแแแแก แกแแฎแแ - แจแแแกแ แฃแแ แฉแแแ แกแแฅแแ แแแแ แ แฃแคแ แ แแแแคแแ แขแฃแแ แแ แกแแกแแแแแแแ.
แแแกแ แจแแฃแแฆแฃแแแแ แแแคแแ แแแแแ, แ แแแแ แช แแแแแแแขแแแแก, แแกแ แแแกแจแขแแแฃแ แแแแกแแแแ แแแแ แแแแแแแแก แแแแแกแแแ แแกแแ, แแแซแแแแ แจแแกแแซแแแแแแแแก แแแแแแงแแแแ Airflow แแแแฅแแแก แแแแแกแแแแ แกแคแแ แแจแ: แแแแแชแแแแแแก แจแแแ แแแแแแก, แแแแแแแแแแกแ แแ แแแแฃแจแแแแแแก แกแ แฃแ แชแแแแจแแช แแ, แแฃแแแแช แ แแแแขแแแแก แแแจแแแแแกแแก (แแแ แกแแ, แแฃแ แกแ).
แแแฌแแแ แกแแแแแแ, แแแแแแแแ แแ แแแคแแ แแแชแแ
แ แแแ แฉแแแ แจแแแแแ แแแแ แแฅแแแแแแแก
start_date. แแแแฎ, แแก แฃแแแ แแแแแแแแ แแแ แแแแแ. แแแแแก แแแแแแ แ แแ แแฃแแแแขแแก แแแจแแแแแแstart_dateแงแแแแ แแแแแก. แแแแแแ, แแฃ แแแฃแแแแแแstart_dateแแแแแแแแ แ แแแ แแฆแ แแschedule_interval- แแ แ แแฆแแก, แแแจแแ DAG แแแแฌแงแแแ แฎแแแ แแ แ แฃแแแ แแก.start_date = datetime(2020, 7, 7, 0, 1, 2)แแ แแ แแแแแ แ แแ แแแแแแ.
แแแกแแแ แแแแแแจแแ แแแฃแแแ แแแแแ แแ แแ แแแจแแแแแก แจแแชแแแแ:
Task is missing the start_date parameter, แ แแช แงแแแแแแ แฎแจแแ แแ แแแฃแแแแแแก แแแแแ, แ แแ แแแแแแแฌแงแแแ แแแแแแแก แแแแ แแขแแ แแแ แแแแแแจแแ แแแ.- แงแแแแ แแ แ แแแแฅแแแแแ. แแแแฎ, แแ แแแแแแ (แแแแแ Airflow แแ แฉแแแแ แกแแคแแ แ), แแ แแแ แกแแ แแแ แ, แแ แแ แแคแแแ แแ แแฃแจแแแ. แแ แแแฃแจแแแ แแแแแช. แแแแ แแ แแ แแแ แแแแแแแแแแแจแ, แกแแ แแแกแแแแกแแแแก แแแแแแแแแแแก แ แแแแแแแแ แแแแแแ แแ แแ แ แแแแกแแช PostgreSQL-แแ แแแแฌแงแ แแแแแฅแกแแ แ แแแแแ แแแ 20 ms-แแก แแแชแแแแ 5 แฌแแแจแ, แฉแแแ แแแแฆแแ แแแ แแ แฌแแแแงแแแแแ.
- แแแแแแฃแ แ แแฆแแแกแ แฃแแแแแแ. แแแแฎ, แฉแแแ แแกแแ แแแกแแ แแกแฎแแแแแ แ แแ แฃแแแ แฃแคแกแแ แฃแแแก แแแ แแก แแแแแแแ. LocalExecutor แแฅแแแแ แกแแแแแ แแกแ แแงแ แฉแแแแแแแก, แแแแ แแ แแฎแแ แแ แแ แแแแแคแแ แแแแแ แแแแแแฃแ แแ แแ แแฃแจแแแ แแ แฉแแแ แแแแ แ แจแ แแแ แแแแแแฌแแแก CeleryExecutor-แจแ แแแแแกแแกแแแแแแ. แแ แแแแก แแแแแแแแกแฌแแแแแแ, แ แแ แแฅแแแ แจแแแแซแแแแ แแแกแแแ แแฃแจแแแแ แแ แ แแแแ แแขแแ, แแ แแคแแ แ แแแจแแแ แฎแแแก, แแแแแแงแแแแ Celery แแฃแแแแช แกแแ แแแ แแ, แ แแแแแแช "แ แ แแฅแแ แฃแแแ, แแ แแกแแแแก แจแแแ แฌแแ แแแแแแจแ, แแฃแแฌแ แคแแแแ!"
- แแแแแฃแงแแแแแแแแ แฉแแจแแแแแฃแแ แฎแแแกแแฌแงแแแแ:
- แแแแจแแ แ แแแแกแแฎแฃแ แแแแก แกแแ แแแคแแแแขแแแแก แจแแกแแแแฎแแ,
- SLA แแแแแขแ แแแ แฃแแแกแฃแฎแแก แแแแแแแแแแก, แ แแแแแแแช แแ แแฃแแแ แแ แจแแกแ แฃแแแ,
- xcom แแแขแแแแแแชแแแแแแก แแแชแแแแกแแแแก (แแ แแแฅแแ แแแขแแแแแแชแแแแแ!) dag แแแแชแแแแแก แจแแ แแก.
- แคแแกแขแแก แแแ แแขแแ แแแแแงแแแแแ. แแแ, แ แ แแแฅแแ? แแแงแแแแแฃแแ แแงแ แกแแแแแแแแแชแแ แแแชแแแฃแ แแแแแแแแแก แงแแแแ แแแแแแ แแแแกแแแแก. แแฎแแ แฉแแแก แกแแแฃแจแแ Gmail-แก แแฅแแก >90 แแแแกแ แแ.แฌแแ แแแ Airflow-แแแ แแ แแแ แคแแกแขแแก แแญแแแ แฃแแ แก แแแแแแก แแ แแแ แแฃแแแ 100-แแ แแแขแแก แแฆแแแแแ แแ แฌแแจแแแแ.
แแแขแ แฎแแ แแแแแแ:
แแแขแ แแแขแแแแขแแแแชแแแก แฎแแแกแแฌแงแแแแ
แแแแกแแแแก, แ แแ แฃแคแ แ แแแขแแ แแแแฃแจแแแ แแแแแ แแ แแ แ แฎแแแแแแ, Airflow-แ แแแแแแแแแแ แแก:
- - แฏแแ แแแแแ แแฅแแก แแฅแกแแแ แแแแแขแแขแแ แแก แกแขแแขแฃแกแ, แ แแช แแ แฃแจแแแก แฎแแแก แแฃแจแแแแแจแ. แแแกแแแ แแ แแแ, แแฅแแแ แจแแแแซแแแแ แแ แ แแฎแแแแ แแแแฆแแ แแแคแแ แแแชแแ dags แแ แแแแชแแแแแแก แจแแกแแฎแแ, แแ แแแแ แจแแแฉแแ แแ/แแแแฌแงแแ dag, แจแแฅแแแแ DAG Run แแ pool.
- - แแ แแแแแ แแแกแขแ แฃแแแแขแ แฎแแแแแกแแฌแแแแแแ แแ แซแแแแแแก แกแขแ แแฅแแแจแ, แ แแแแแแแช แแ แ แแฎแแแแ แแแฃแฎแแ แฎแแแแแแ WebUI-แแก แกแแจแฃแแแแแแ แแแแแกแแงแแแแแแแ, แแ แแแแ แกแแแ แแแ แแ แแ แกแแแแแก. แฒแแแแแแแแ:
backfillแกแแญแแ แแ แแแแชแแแแแแก แจแแแแฎแแแแแแแก แแแแแขแแแ แแแ.
แแแแแแแแแ, แแแแแแแแ แแแแแแขแแแแกแแแ แแ แแฅแแแก: โแจแแ แแ, แแแฎแแแแแ, 1-แแแ 13 แแแแแ แแแแ แแแแแชแแแแแจแ แกแแกแฃแแแแ แแแฅแแก! แแแแกแฌแแ แ, แแแแกแฌแแ แ, แแแแกแฌแแ แ, แแแแกแฌแแ แ!โ แแ แจแแ แฎแแ แแกแแแ แฐแแแ:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- แแแแแก แกแแ แแแกแ:
initdb,resetdb,upgradedb,checkdb. run, แ แแแแแแช แกแแจแฃแแแแแแก แแแซแแแแ แจแแแกแ แฃแแแ แแ แแ แแแกแขแแแชแแแก แแแแแแแแ แแ แแแแขแแแแ แงแแแแ แแแแแแแแแแฃแแแแแก แฅแฃแแแแ. แฃแคแ แ แแแขแแช, แจแแแแซแแแแ แแแกแ แแแจแแแแLocalExecutor, แแแจแแแแช แแ, แแฃ แแฅแแแ แแแฅแแ แแแแฎแฃแ แแก แแขแแแแแ.- แแแแฅแแแก แแแแแแก แแแแแแแก
test, แแฎแแแแ แแแแแแจแแช แแ แแคแแ แก แฌแแ แก. connectionsแญแฃแ แแแแแ แแแแจแแ แแแแก แแแกแแแ แแแ แจแแฅแแแแก แกแแจแฃแแแแแแก แแซแแแแ.
- - แฃแ แแแแ แแฅแแแแแแแก แกแแแแแแ แแซแแแ แแแ, แ แแแแแแช แแแแแฃแแแแแแแ แแแแแแแขแแแแกแแแแก แแ แแ แ แแแกแจแ แแแขแแ แ แฎแแแแแแ. แแแแ แแ แแแ แแแแจแแแก แฎแแแก
/home/airflow/dags, แแแแฅแแชแipythonแแ แแแแฌแงแ แแ แแฃแแแแ? แแฅแแแ แจแแแแซแแแแ, แแแแแแแแแ, แงแแแแ แแแแจแแ แแก แแฅแกแแแ แขแ แจแแแแแแ แแแแแ:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Airflow แแแขแแแแแแชแแแแแแก แแแแแกแแแ แแแแแแจแแ แแแ. แแ แแ แแแ แฉแแแ แแแกแแ แแแฌแแ แแก, แแแแ แแ แแแแแแแแแก แแแแแแแ แแแแแแแก แแแฆแแแ แกแฎแแแแแกแฎแแ แกแแแชแแคแแแฃแ แ แแแขแ แแแแแแกแแแแก แจแแแซแแแแ แแแแ แแ แฃแคแ แ แกแฌแ แแคแ แแ แแแ แขแแแ แแงแแก, แแแแ แ แ แแแแแแแ API-แแก แแแจแแแแแแ.
แแแฅแแแ, แ แแ แฉแแแแ แงแแแแ แแแแแแแแ แแ แแ แแก แฃแซแแฃแ แ, แแแแ แแ แแกแแแ แแแแฏแแ แจแแแซแแแแ แแแแชแแก แแ แแก แแแ แแแแฃแ แแ. แแแแ แแ แ แแแแแแแแ แแแแแแ แแแ แฃแแแ แกแแแญแแแ แแ แกแแญแแ แ แแฅแแแแ แจแแแแฌแแแแ.
แฃแคแ แแฎแแแแแ SQL-แก!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
แแแขแแ แแขแฃแ แ
แแ แ แ แแฅแแ แฃแแแ, แแแ แแแแ แแแ แแแฃแแ Google-แแก แแแแแจแแแแแแแ แแ แแก Airflow แกแแฅแแฆแแแแแก แจแแแแแ แกแ แฉแแแ แกแแแแจแแแแแแแแ.
- - แ แ แแฅแแ แฃแแแ, แแคแแกแแแแ แฃแแแ แแแแแฌแงแแ. แแแแฃแแแแขแแชแแ, แแแแ แแ แแแ แแแแฎแฃแแแแก แแแกแขแ แฃแฅแชแแแแก?
- - แแแ แแ, แแแแแช แฌแแแแแแฎแแ แจแแแฅแแแแแแแแก แ แแแแแแแแแชแแแแ.
- - แแแแแแแแแ: แแแแฎแแแ แแแแแก แแแขแแ แคแแแกแ แกแฃแ แแแแแจแ
- - แซแแ แแแแแ แชแแแแแแ แแแ แแแ แแ แแก แแฆแฌแแ แแแ, แแฃ (แแแฃแแแแแแแแ!) แฉแแแแแ แ แแฆแแช แแแ แแแแแ.
- - แแแแแ แกแแฎแแแแซแฆแแแแแแ แฐแแแ แแก แแแแแแแก แแแแกแขแแ แแก แแแกแแงแแแแแแแ.
- - แแแแฅแแแก แแแแแ แกแแแแขแแ แแกแ แกแขแแขแแ, แแแ แแ, แแแแแ, แแแขแ แคแแ แแแแแแแ แแ แแแแแแแ แแแแแแแแแแ.
- โ แกแแแแ แแกแแแ แแ แแแ แแฃแจแแแแแก แจแแกแแฎแแ.
- - แแแแชแแแแแแก แฃแแแแแแแแแก แจแแกแแฎแแ, แแแ แแฆแแก แแแชแแแแ ID-แแ แฉแแขแแแ แแแ, แขแ แแแกแคแแ แแแชแแ, แคแแแแแก แกแขแ แฃแฅแขแฃแ แ แแ แกแฎแแ แกแแแแขแแ แแกแ แ แแ.
- - แแแแชแแแแแแก แแแแแแแแแแฃแแแแแแ แแ Trigger Rule, แ แแแแแแช แแ แแฎแแแแ แฌแแ แกแฃแแจแ แแฆแแแแจแแ.
- - แ แแแแ แแแแแแแแฎแแ แแแ แแแแฃแแ โแกแแแฃแจแแแแแ, แ แแแแ แช แแก แแงแ แแแแแแแแแโ แแแแ แแแจแ, แฉแแขแแแ แแแ แแแแแ แแฃแแ แแแแแชแแแแแ แแ แแแแแแแแแแ แแ แแแ แแขแแขแแแ.
- - แกแแกแแ แแแแแ SQL แแแแฎแแแแแแ Airflow แแแขแแแแแแชแแแแแแกแแแแก.
- - แแ แแก แกแแกแแ แแแแแ แแแแงแแคแแแแแ แแแ แกแแแแแฃแ แ แกแแแกแแ แแก แจแแฅแแแแก แจแแกแแฎแแ.
- โ แกแแแแขแแ แแกแ แแแแแ แฉแแแแฌแแ แ แแแแแชแแแแ แแแชแแแแ แแแแกแแแแก AWS-แแ แแแคแ แแกแขแ แฃแฅแขแฃแ แแก แแจแแแแแแก แจแแกแแฎแแ.
- - แแแแ แชแแแแแฃแแ แจแแชแแแแแแ (แ แแแแกแแช แแแแแ แฏแแ แแแแแ แแ แแแแฎแฃแแแแก แแแกแขแ แฃแฅแชแแแแก).
- - แแแแฆแแแแ, แ แแแแ แแ แแแแแแ แฎแแแฎแก แแแ แแแแแแก แจแแแแฎแแแก, แแฃแแชแ แจแแแแซแแแแ แฃแแ แแแแ แแแแแแงแแแแ Connections.
- - แแแแแแชแแขแฃแ แ DAG แแแแแแแกแแแแ แแแแ, แแแแขแแฅแกแขแแก แแแแแแแแแ แคแฃแแฅแชแแแแจแ, แแกแแ แแแแแแแแแแฃแแแแแแแก แจแแกแแฎแแ แแ แแกแแแ แแแแชแแแแแแก แแแจแแแแแก แแแแแขแแแแแแก แจแแกแแฎแแ.
- - แแแแแงแแแแแแก แจแแกแแฎแแ
default argumentsะธparamsแจแแแแแแแแจแ, แแกแแแ แชแแแแแแแจแ แแ แแแแจแแ แแแจแ. - - แแแแแแ แแแแก แจแแกแแฎแแ, แแฃ แ แแแแ แแแแแแแแ แแแแแแแแแแ Airflow 2.0-แแกแแแแก.
- - แแแแแ แแแซแแแแแแฃแแ แกแขแแขแแ แฉแแแแ แแแแกแขแแ แแก แแแแแแแแแแก แจแแกแแฎแแ
docker-compose. - - แแแแแแแฃแ แ แแแแชแแแแแ แจแแแแแแแแแกแ แแ แแแแขแแฅแกแขแแก แแแแแแแกแแแแ แแแแแก แแแแแงแแแแแแ.
- - แกแขแแแแแ แขแฃแแ แแ แแแ แแแแฃแแ แจแแขแงแแแแแแแแแ แคแแกแขแแ แแ Slack-แแ.
- - แแแแจแขแแแแแก แแแแชแแแแแ, แแแแ แแแแ แแ XCom.
แแ แกแขแแขแแแจแ แแแแแงแแแแแฃแแ แแแฃแแแแ:
- - แฉแแแแชแแแแแแก แแแแแแ แฎแแแแแกแแฌแแแแแแ แจแแแแแแแแจแ แแแแแกแแงแแแแแแแ.
- โ แฎแจแแ แ แจแแชแแแแแแ แแแคแแแแก แจแแฅแแแแกแแก.
- -
docker-composeแแฅแกแแแ แแแแแขแแแแกแแแแก, แแแแแ แแแแกแแแแก แแ แกแฎแแ. - - แแแแแแแก แจแแคแฃแแแ Telegram REST API-แกแแแแก.
แฌแงแแ แ: www.habr.com




