á°ááᣠáĽá á˛ááľáŞ áááŞáᎠáá - á¨Vezet á¨áŠáŁááŤáá˝ áĄáľá á¨áľááłá ááá á¨ááἠáááá˛áľá˘
ᨠETL áá°áśá˝á ááááááľ áľá á ááľ á áľá°áá ááŁáŞáŤ áĽáááááłáá - Apache Airflow. ááá áá á¨á á¨á áá°áľ á áŁá áááἠáĽá ááá-áĽá áľááá á áá¨á áá°áśá˝ ááľáĽ áŁááłá°áá á áĽááá áááá¨áąáľ áááŁá ááá áá áááááá áá°áśá˝á á á¨ááá áááá áĽá á ááááá¸áá áá¨áłá°á áŤáľáááááłáá˘
áĽá á á, áĽá áĽáť á ááááá, áá á°áá á áłá: ááŽááŤá áĽá áŽáľ, á á˝á áłá áá˝ áĽááłáá˝ áĽá áááŽá˝ á ááľ.
á á¨á áá°áľ/ááŞáá˛áŤ áŽáááľ á¨áááá áá ááá á˛áŤá°áá áĽááá áá á¨ááŤáŠáľ ááá
áááŤ
ááá˘áŤ ááá ááá ᣠá°ááŁáŤá (áĽá áľáá˝ á˛ááŞ) ááááľáá ááĽá (áĽá ááĽááľá) áááľá°á áá°áĽá°áĽ áá°á¨áłá á˝áá°-ááłáŚá˝ áľáŤáá˝á áĽáááĽáŤáá áľá á á ᣠáľáá˝ á¨á°áŤááá áĽáááááá áááááśá˝, ááá ááá˝ áĽá ááá˝ á°ááááŽá˝ áĽá áŚááŹá°áá áá°áá°á ááľááľ áá? áá ááá
á¨áá¨á¨áťá ááá, ááŁááť áĽá áá¨á ááŁááťáá˝
ááá˘áŤ
Apache á¨á á¨á áá°áľ áá áĽáá° ááá ááá˘
- á Python á°áá
- áĽáŠ á¨á áľá°áłáłáŞ ááá á á áŁ
- ááá°áá°á áá áá°á á¨áá˝á
- á¨á°áťá áĽáť áĽá á¨á°á°áŤá áá ááá áá°ááŤáŠ ááááá˝ áááľá (á¨áŤáľ á ááľ áĽáá°á°áťáá) ááá˘
- áá°áĽ á ááá áá˝áá˝ áá áľáŤáá˝á áááľ áĽá áááŁá á (áĽá á´á᪠/ áŠá áááľáľ áĽá á ááá ááá áľáááłá)
- ᨠPython áŽáľ áááťá áĽá ááá¨áłáľ á áŁá ááá á áá á°áááá á¨áľáŤ áá°áľ áľáááľ
- áĽá áááąáá ááá á áŤááľ áĽá á á¤áľ ááľáĽ á¨á°á°áŠ ááááá˝á á áá áá áááááá á¨ááἠááł áĽá á¤áá ááá˝á áĽááľ á áĽááľ á¨áááááľ á˝ááł (áá á á áŁá ááá áá)á˘
Apache á¨á á¨á áá°áľá áĽáá°áá¨á°áá áĽáá áááá-
- áá¨áá á¨á°ááŤáŠ áááŽá˝ áĽáá°á áľáŁáá (áĽá á¨SQL Server áĽá PostgreSQL ááłááá˝áŁ á¨á°ááŤáŠ á¤áá ááá˝ á¨á áááŹá˝á ááľáŞááľáŁ 1C áĽááłá) á DWH áĽá ODS (Vertica áĽá Clickhouse á áá)á˘
- áá áŤá
á á¨áá
cron
á ODS áá á¨ááἠáá áá¨áŞáŤ áá°áśá˝á á¨áááá¨á áĽá áĽááá¸ááá á¨áá¨áłá°á.
áĽáľá¨ á áἠáá áľá¨áľáŁ ááááłá˝á 32 áŽá áĽá 50 áᢠáŤá áŁáá á ááľ áľáá˝ á áááá á°á¸áááᢠá á á¨á áá°áľ ááľáĽ áá áá°áŤááĄ-
- ᨠ200 áśá (á áĽáááą áľáŤáá˝á á¨áááá áľ á¨áľáŤ áá°áśá˝)
- á áĽáŤááłááą á á ááŤá 70 á°ááŁáŤáľ,
- áá áááŤáááľ ááááŤá (á á°á¨ááŞá á á ááŤá) á á°ááľ á ááľ áá.
áĽá áĽáá´áľ áĽáá°á°áá ᣠá¨áá á áłá˝ áĽá˝ááá ᣠáá á áá á¨ááááłáá ᨠßber-á˝ááá áĽáááááá-
áśáľáľ ááá SQL á°áá¨áŽá˝ á á áĽáŤááłááłá¸á 50 á¨ááἠááłáá˝ - á¨á ááľ ááŽáááľ ááłááá˝ áĽáá°á á°á á°á¨á°áá¸á, á°ááłáłá ááá á á áá¸á (á ááá áŚáł áááľ ááťáá, mua-ha-ha), áá á áááľ áĽáŤááłááłá¸á á¨áľáĽáá á áá á¨áĽ á áá¸á (áĽáá° áĽáľá áá, á¨áá áá á°áá á¨áĽ) áľá áá° ááááá áááľ ááᣠáá˝áá). áĽá á¨á áááááľ ááľáŽá˝á (ááá á ááááᣠá¨ááá áłáłá¤áᣠá¨á˘á˛á¤á á°ááŁá ááłáááŤ) á áá¨á áááĄá ááľá°á á áá ááľ áá° á¨áá˛áŤ áĽááá¨ááŤááá˘
áĽáááľ!
ááá ááá ᣠá°ááŁáŤá (áĽá áľáá˝ á˛ááŞ)
ááááľáá ááĽá (áĽá ááĽááľá)
áááš áľáá
á˛áá áĽá áĽá ááá áá áአSQL
-schik á á ááľ á¨áŠá˛áŤ á˝ááťáŽ ááľáĽ áŁáĽá á¨ááááľá áááľ ááłáŞáŤáá˝á á áá áá á¨á˘á˛á¤á áá°áśá˝á á áá ááĽá°ááá˘
- Informatica á¨ááá ááá¨á - áĽá
á á áŁá á¨á°áľáá áľááᾠᣠáĽá
á á áŁá áá¤áłá ᣠá áŤáą áááľáá ᣠá¨áŤáą áľáŞáľá˘ 1% á á
áá áĽááá áĽáá áá¨áááá á°á áááŠá˘ ááá? á°á
á ᣠá ááááŞáŤ ᣠáá
á áááὠᣠᨠ380 ááš ááᎠᣠá á áĽááŻá˝á áá áŤá ááĽáŻáᢠá ááá°á á°á¨áᣠáá
ááá¨á á¨á°áá°áá áĽá
á á áŁá ááá ááá áá°áśá˝áŁ áᥠá áŤá áááś áĽá
á áá áááá áĽá ááá˝ á áŁá á áľááá ááá-á¨áľáá
áľ-ááłááŤáá˝ ááᢠáľá ááŞá ᣠáĽáá° á¤ááŁáľ AXNUMX / áááľ ááá ᣠááá á ááá á˘
áá áááᣠá á˝á áłá áá˝ áĽááł á¨30 áááľ á áłá˝ á¨áá á°áá˝á á áľááš áááł áá˝ááá˘
- SQL á áááá áá
á°áľ á áááá - áá
áá ááłá˝áá á ááľáŁá˝á á ááŽáááľ áá°áśá˝ ááľáĽ á°á á
ááá áłáᢠá°á
áᣠá áĽáááąáĄ á áľááľáá SQL Server áĽáá ááááᣠáĽá á áá ááአá¨á˘á˛á¤á ááłáŞáŤáášá á ááá áá ááááŤáłá á ááááᢠá ááľáĄ áŤáá ááá áá áĽáŠ áá: áááąá á ááááš áἠáá, áĽá á¨áá°áą áááŁáá˝ ... áá ááá
á áá°áá á¨áśááľáá áááśá˝á á¨áááá°á, áŚá
, ááá
á áá°áá. áľáŞáľ áŤáľáááľ
dtsx
(áá á á¤ááľá¤áá¤á áá á áááá˝ á save áá á¨á°áá áŤá¨á) áĽá áĽáá˝áááᣠáá ááĽáĄ áááľá áá? á ááśáá˝ á¨ááá አá°áá á¨áŚá˝á á¨á ááľ á áááá áá° áá á¨áááľáľ á¨á°ááŁá ááŹá ááľáŤáľáľ? á á ᣠáľááľ áᜠᣠá áááŤá˝ áŁáľá á¨ááłááľ á ááŤáŠá á á á ááľá¨á á¨á፠áááĽáŤáŽá˝ áááľááᢠáá á áĽááá áááľ á¨á áá áá˝á áááľáá
á áĽááἠáá፠ááááśá˝á áááá áá áᢠááłá áĽááłá áŤá á á áŤáą áá° á°áťá á¨SSIS áĽá á áááŹá°á ááŁ...
... áĽá á¨á፠á á˛áľ áĽáŤ á ááá. áĽá Apache á¨á á¨á áá°áľ á áአáá á°á¨á°áá˘
á¨á˘á˛á¤á áá°áľ ááááŤáá˝ ááá á¨áááá áŽáľ áááá¸áá áłáá áá°áľáł á áá¨áááŠáᢠá¨áá¨á áĽá¨áśá˝ á¨á°áá¨áŠáľ áĽá á¨á°á¨ááááľ á áá ááááľ áá áĽá á ááľ áá á ááá á áŤáá¸áá á°áá á¨áŚá˝ á ááśáá˝ á¨ááá አá¨ááἠááłáá˝ áá° á ááľ á˘áá ááá°áľ á á ááľ áĽá á°áŠá ááá áááľ 13 âáľááŞáá˝ ááľáĽ á¨áááá áŽáľ ááłá áááá˘
áááľá°á áá°áĽá°áĽ
áá ááá áá áá áááľ á áááá ᣠáĽá áĽáá áá ááá ááá á¨áá áááŽá˝ áĽááłáááááᣠáááłá á¨á á¨á áá°áľáŁ á¨áá¨áĄáľ áłáłá¤áᣠá´á᪠áĽá ááá˝ á ááľá¨áŤáá˝ ááľáĽ á¨á°áááš ááłáŽá˝á˘
áá˛áŤáá áá¨áŤáá˝á áĽááááŤáá, áĽá ááľá á ááŁá docker-compose.yml
á¨áľáá ááľáĽ:
- á áľááá áĽáááł á¨á á¨á áĽáá
áľáá´: áááááĽá, Webserver. á¨á´á᪠áľáŤáá˝á ááá¨áłá°á á á áŁá áĽá፠áá˝á¨á¨á¨áŤá (ááááŤáąá á áľááľá áá° ááľáĽ áľáá°áá ááá˘
apache/airflow:1.10.10-python3.7
áá ááá á áá°ááá) - PostgreSQLá¨á á¨á áá°áľ á¨á áááááľ áá¨ááá (á¨áááááĽá á ááŁáĽ áá¨á ᣠá¨á áááá áľáłá˛áľá˛áᾠᣠááá°) á¨áá˝áá áľ áĽá á´á᪠á¨á°á ááá á°ááŁáŤáľá ááááľ áŤá°ááá á˘
- Redisáá´áá¨áŞ á°ááŁá á°áá áá á¨ááŤáááá;
- á¨á°áἠá°áŤá°á, áá á á°ááŁáŤáľá á ááĽáł á áááá áá á¨áá°ááŤ.
- áá° á áá
./dags
ááááťá˝áá á¨áłááľ ááá፠áá áĽáá¨ááŤáá ᢠá á á¨áŤ áá ááá°áłá, áľááá á¨áĽáŤááłááą ááľáá áľ á áá áááá ááá ááá á áŤáľáááá.
á á ááłááľ áŚáłáá˝, á ááłáááš ááľáĽ áŤáá áŽáľ áá á áá á ááłáá (á˝ááá áááá¨ááá ), ááá áá á¨áá áŚáł á áá°áą ááľáĽ á°áľá°áŤááá. á¨á°áá á¨áľáŤ áŽáľ ááłááá˝ á áá áŤáááŤá ááľáĽ áááá
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
ááľáłááťáá˝
- á á
áá
áą áľáĽá°áŁ ááľáĽ, á á áĽááá á áłááá ááľá áá á°ááľááťáá
puckel / docker-á¨á á¨á áá°áľ - áĽáąá áá¨áááĽáá áĽááá á áááᢠááááŁáľ á á áááľá ááľáĽ áá ááá ááá á áŤáľááááľá. - ááá á¨á á¨á áá°áľ á
áá
áśá˝ á¨ááááľ á á áŠá áĽáť á áá°ááá˘
airflow.cfg
, ááá áá á á áŤáŁá˘ á°ááááŽá˝ (áááá˘áá˝ ááľáá áááŁá) áĽá á á°ááŽá á°á á ááŤáá. - á á°ááĽáŽ ᣠááááľ ááá á áá°áá-áá áĽáŹ á¨áἠááśá˝ á áĽá ááŤáŁáá˝ áá á ááľáááĽáŠá ᣠáá°á áááľ á ááľá¸áá¨ááᢠááá áá áááŤáŞááťá˝á á¨ááľáááá á ááľá°ááá á á°á¨ááá˘
- á áľáłááľ á áľááł:
- á¨áłá á ááá ááááąá áááááĽá á ááŞá áĽá áá áŤá°ááš á°á°áŤá˝ ááá á áá áľá˘
- á ááá á¨áśáľá°á ááá á¤á°-ááťáááľ ááá á°ááłáłá áá - ááá á áá áááááĽá áĽá á á áŤá°áá˝ áá˝áá˝ áá ááŤá á ááŁá¸áá˘
á°á áᣠá áá ááá áááĄ-
$ docker-compose up --scale worker=3
ááá ááá á¨á°ááł á áá á¨áľá á áááá˝á áá¨áľ áá˝áá-
- á¨á á¨á áĽáá
áľáá´:
http://127.0.0.1:8080/admin/ - á á áŁáĄ
http://127.0.0.1:5555/dashboard
áá°á¨áłá á˝áá°-ááłáŚá˝
á áĽááá áá âáłááľâ ááľáĽ ááá ááá áŤáá°á¨áłá ᣠá áá áááá áááľ áĽáá á ááĄ-
- áááááĽá - á á á¨á áá°áľ ááľáĽ á áŁá á áľáááá á áᾠᣠáŽáŚáśá˝ á ááá¨á áĽáá°áá አᣠáĽá á°á á ááááá á áááŁá á: á¨áá á°ááłáá áááŁá áŤá ᣠááááá˝á áŤáťá˝áá ᣠá°ááŁáŤáľá ááááŤáá˘
á á á ááá ᣠá á áŽá áľáŞáśá˝ ᣠáĽáą á¨ááľáłááľ á˝áá áá á¨á áľ (á á ᣠá¨áááłáľ á˝áá á áá°áá ᣠáá ááá°áľ) áĽá á¨á ááľ áááŞáŤá á áá áŽá˝ ááľáĽ áĽááłá áááˇáá˘
run_duration
- áĽáá°áá á¨ááááá áľ ááᢠá áá áá ááá ááá á°á á ááᢠ- DAG (á ááá "áłá") - "ááĽá°á á á˛ááá ááŤá", ááá áá áĽáá˛á
ááááą ááş ááĽááľ á°áá˝ ááááŤá, ááá áá á áĽáááą áĽááľ á ááľ ááľá°ááĽá á¨áááĽáŠ á°ááŁáŤáľ ááŤáŁ áá (á¨áá
á áłá˝ áááá¨áą) ááá á SSIS ááľáĽ áŤáá á¨áĽá
á á ááá áĽá á¨áľáŤ áá°áľ Informatica .
á¨áłááľ á á°á¨ááŞáŁ á ááá áááľ áłáá˝ ááአáá˝ááᣠááá áá áĽá á á áĽááá á áá°ááľáŁá¸ááá˘
- DAG áŠáŤ - á¨áááť áłá, áĽáąá á¨áŤáą á¨á°áá°á
execution_date
. á°ááłáłáŠ áłááŤááľ á áľáአáá°áŠ áá˝áá (á°ááŁáŽá˝áá á ááŤáŤ áŤá°á¨ááľ)ᢠ- áľáá¨á á ááľ á¨á°áá°á á°ááŁá ááá¨ááá áááááľ áŤáá¸á á¨áŽáľ áááĽáŤáŽá˝ áá¸áᢠáśáľáľ á áááľ áŚááŹá°áŽá˝ á ááĄ-
- áĽááááĽáá° á¨áĽá á°ááłá
PythonOperator
áááááá (á¨áá°áŤ) á¨áááá áŽáľ ááá˝á á¨áá˝á; - áááááá¨áá á¨áŚáł áá° áŚáł á¨ááŤááááŁ
MsSqlToHiveTransfer
; - áłáłá˝ á áá á áŠá, á ááľ ááľá°áľ áĽáľáŞá¨á°áľ áľá¨áľ ááá˝ áĽáá˛á°áĄ ááá á°á¨ááŞáá á¨áłáá á áááá áĽáá˛áááą áŤáľá˝áááłá.
HttpSensor
á¨á°ááá¸áá á¨áá¨á¨áť ááĽáĽ ááłáĽ áá˝áá, áĽá á¨ááááá ááá˝ á˛á áĽá , ááááŠá ááááŠGoogleCloudStorageToS3Operator
. á áŤá á áĽááŽáĄ- âááá? á°ááá á áŚááŹá°áŠ ááľáĽ áľáááážá˝á ááľá¨á áá˝áá! áĽá á¨á፠ᣠá¨á°áá áá á áŚááŹá°áŽá˝ áá á¨á°ááŁá áááłáá ááááááľá˘ áłáłáš á¨áááĽáá áá¨áŤ á ááľ ááááŤáᣠáááľáťá áĽá áááłáá˘
- áĽááááĽáá° á¨áĽá á°ááłá
- á°ááŁá - á¨áłáá áŚááŹá°áŽá˝ ááá ááááľ ááááľ áłááá áĽá á¨áłá áá á°áŤááá áá° á°ááŁá á°á¨á á¨á áĽááá á˘
- á¨á°ááŁá ááłá - á á ááá áĽá
áľ á ááŞá á á áááá-á áŤá°áá˝ áá á°ááŁáŤáľá áá° áŚáááľ áááá ááá áĽáá°áá á˛ááľá (á áŚáłá áá ᣠá¨á°á áááá áľ)
LocalExecutor
ááá á ááłáŠ áá áá° áŠá ááľááá ááááľCeleryExecutor
) ááĽááą á ááľ áááľáŁá (áááľá á¨á°ááááŽá˝ áľáĽáľáĽ - á¨á áááá áááŞáŤáá˝) ᣠá¨áľááá ááá á¨áĽáŤá á áĽááśá˝á áŤá°áá áĽá áŤá áŤá áá¸ááá˘
áľáŤáá˝á áĽáááĽáŤáá
á ááááŞáŤáŁ á¨áśáťá˝áá á á ááá áĽá áľ áĽáááááᣠá¨ááŤá áá° ááááŽáš á¨á áá áĽáá áááᣠááááŤáąá á ááłááľ ááá áŤááá áááľááá˝á áĽáá°ááĽáŤááá˘
áľááá ᣠá áŁá ááá á áá ááአᣠáĽáá˛á ááááą áłá áĽáá°áá áááľáá
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
áĽáľá˛ áĽáááá¨áľ
- á ááááŞáŤ, á áľááá á¨áááľá áá˘áľ áĽááľááŁáá áá ááá;
sql_server_ds
ááList[namedtuple[str, str]]
á¨á á¨á áá°áľ áááááśá˝ áááááśá˝ áľáá˝ áĽá áłá áá˝áá á¨ááááľáľáŁá¸á á¨ááἠááłáá˝ áá;dag
- á¨ááľ ááľáĽ ááá áŤáá áľ á¨á áłá ááľáłáááŤglobals()
á áá áá፠á¨á á¨á áá°áľ á áŤááááᢠáłá áĽáá˛á áááľ á áá áľáĄ-- áľá áá áá
orders
- áá áľá á áľá á áááá˝ ááľáĽ ááłáŤá ᣠ- á¨ááá áľáááľ áá áĽáŠá áááľ ááᎠáĽáá°áá ፠áŁ
- áĽá á ᨠ6 á°ááą ááŽáĽ á áá áľ (á áá
áááł áá ááŤáŤ á°áá˝)
timedelta()
á°ááŁáááľ áŤáácron
- ááľáá0 0 0/6 ? * * *
, ááľáá˝ á áŞá - áĽáá° á áááá˝@daily
);
- áľá áá áá
workflow()
áááá áľáŤ áá°áŤá, á áá áá á áá°áá. áá ááᣠá ááłá˝áá áá° áááἠááľáłááťá ááľáĽ áĽááľááŁáááá˘- áĽá á áá á°ááŁáŽá˝á á¨ááá á ááá á áľááľ-
- á áááŽáťá˝á áĽááŽáŁáá;
- ááľááá
PythonOperator
, áá á á¨áĽáá dummy áŤáľáá˝ááworkflow()
. á¨á°ááŁáŠá áአ(á áłá ááľáĽ) áľá ááĽááľ áĽá áľáŤášá áĽáŤáą áá°á á áááąá˘ áŁáá˛áŤprovide_context
á áááš, á°á¨á᪠ááááŽá˝á áá° á°ááŁá áŤááłá, á áĽááá á°á á áá áĽáá°á áľáŁáá**context
.
áá ááᣠ፠áĽáť ááᢠáŤáááááĄ-
- á áľá á áááá˝ ááľáĽ á á˛áľ áłá áŁ
- á áľáአá¨áá¨ááá á ááľ ááś á°áŠá á°ááŁáŤáľ (á¨á á¨á áá°áľ ᣠá¨á´á᪠á áá áśá˝ áĽá á¨á áááá á á á á¨ááá°)á˘
á°á á ᣠááŁá áááľ ááťááá˘
áĽááášá áá ááááá?
áá
á áá ááá ááááá ááľá¤ ááŁá docker-compose.yml
ááááŁá á requirements.txt
á ááá á áááá˝ áá.
á áá á ááˇááĄ-
ááŤáŤ áŤáŹáá˝ á áá á°ááłá á¨áá¨ááá á¨á°ááŁá ááłááá˝ áá¸áá˘
áľáá˝ áĽáá áĽáááᣠá°ááŁáŽáš á á°áŤá°áá˝ á°ááľá°áááĄ-
á á¨ááá´ááš áĽááἠáá, áĽáŤá¸áá á á°áłáŤ áááł á á áááá. áá ááá á áŁá áľáŹáłá á áá°áá.
á áááŤá˝á áá á áááłá˝á áá ááá á áá á¨ááá˘
./dags
, á áá˝áá˝ ááŤá¨á ááá áááłá°á á¨áá - ááá áłáá˝ á°áá°áágit
á áĽá Gitlab ááᣠáĽá Gitlab CI á˛áááą ááááá˝á áá° áá˝áá˝ áŤá°áŤáŤáá˘master
.
áľá á á ᣠáľáá˝
á°áŤá°ááš á¨áĽáá ááááá¨á áĽá¨á°ááą áłáᣠá ááľ ááá ááŤáłá¨á á¨áá˝á áá ááłáŞáŤ áĽááľáłááľ - á á áŁá˘
á á áŤá°á á áááá˝ áá áá áá፠áá¨á áŤáá á¨ááááŞáŤá áá˝áĄ-
áá° áĽáŤ á¨ááą á°ááŁáŤáľ áá á áŁá ááááá áá˝áĄ-
á¨á°ááá˝á áááł áá á áŁá á á°ááş á¨ááá áá˝áĄ-
á áŁá áĽáŠá áá˝ á¨á°ááŁá áááł ááŤáá˝ áĽá á¨á áááá áááŤá¸á áááĄ-
á¨á°áŤááá áĽáááááá
áľááá , ááá á°ááŁáŤáľ á°á¨ááááá, á¨áá°ááľá ááá°áľ áá˝áá.
áĽá áĽá ááľááá - á á ááľ ááá á áá ááááŤáľá˘ á¨á á¨á áá°áľ áľáááá á á áááá á á°ááá¨á° áĽááá áŤáŹáá˝ áá¨áá á áĽááá áááľ áĽááłáá°á¨á° áŤááááłáá˘
áááἠááľáłááťáá áááá¨áľ áĽá á¨áá°ááľá á¨á°ááŁá ááłááá˝á áĽáá°áá ááľááá áŤáľáááááłáá˘
á ááááá áŤáŹ áá á á á ááľá¨á ááĽá á¨ááááľá áľáááśá˝ áĽááŤáááĄ-
ááľá°á á¨áá°ááá á á˝áł ááľá¨á áľá˝ááá ᢠáŤá áááľ á ááľ ááá áĽá፠áĽááłáá°áłáŤ áĽáá¨áłááá, áĽá á°ááłáłá ááłá á°ááŁá áá° áááááĽá á ááŞá áááłá.
á ááá áá áŤáŹáá˝ á ááłááľ áá
á ááľá¨á á áŁá á°áĽá á áĽááłááá ááá˝ áá - áá
á¨á á¨á áá°áľ á¨ááá áĽáá á áá°áá. á á°ááĽáŽáŁ á¨á
áá á¨áŤá˝ ááŁáŞáŤáá˝ á áááĄ- Browse/Task Instances
áááá ááá á á ááľ áá áĽááááĽá áá° áᎠáĽáá°áá áĽááľáááᣠáľáááááá ááĽá á á áŤáľáááĄ-
áŤá¸áą á áá áłáá˛ááťá˝á áá á áááľáá (á¨áá áááááĽá á áľáááá á áľááľáá áĽá¨á á á áá¸á)
áááááśá˝, ááá ááá˝ áĽá ááá˝ á°ááááŽá˝
á¨áááĽááá DAG ááááá¨áľ ááá á áá áá, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐžŃпОда Ń
ĐžŃĐžŃио, ĐžŃŃĐľŃŃ ĐžĐąĐ˝ĐžĐ˛ĐťĐľĐ˝Ń"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐĐ°ŃĐ°Ń, ĐżŃĐžŃŃпаКŃŃ, ĐźŃ {{ dag.dag_id }} ŃŃОниНи
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
ááá á°á á¨áŞáááľ ááťáťáŤ á áľááá? áá áĽáá°áá áĽáˇ ááľ: áá¨ááá á¨á¨áľ ááááľ áĽáá°áá˝á áááŽá˝ áááá á á; á¨ááááĽá áľ áááá á á; ááá ááá á˛á¨á°áľ ááá á˛á°á á áá°ááľáá á áááą (á°á á ᣠáá áľá áĽá á áá°áá ᣠá áááá)á˘
áĽáá°áá áá° ááá áĽáááľ áĽá á á˛áľ ááá˝ áŤááá áááŽá˝á áĽáááĄ-
from commons.operators import TelegramBotSendMessage
- áá° áŤáá°á¨áá¨á áááááśá˝ áááá áľáá˝ áá á á፠á ááľáŤáľ á¨áŤáłá˝áá áŚááŹá°áŽá˝ áĽááłáá ፠á¨áá¨áááá ááá ááá á¨ááᢠ(áľááá áŚááŹá°á á¨áá á áłá˝ á¨á áá áĽáááááŤáá);default_args={}
- áłá á°ááłáłá ááááŽá˝á áááá áŚááŹá°áŽá˝ áá°áŤá¨áľ áá˝áá áŁto='{{ var.value.all_the_kings_men }}'
- ááľáto
áĽá áááľ áŽáľ á ááá¨ááᣠááá áá á á°áááá á¨ááᨠááá áĽá á¨á˘áááá˝ áááá áŤáá á°áááá ááᣠáá á á áĽááá áŤáľááŁááľAdmin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
- áŚááŹá°áŠá ááááá áááł. á áĽá áááł, á°áĽáłá¤á áá° á áááš á¨áá áá ááá áĽááá˝ á¨á°áŠ áĽáť áá á á°áłáŤ áááł;tg_bot_conn_id='tg_main'
- ááááŽá˝conn_id
á¨ááááĽáŤá¸áá á¨áááááľ ááłáááŤáá˝á á°áá áAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- á á´áááŤá ááľáĽ áŤá áááĽááśá˝ á¨áá áŠáľ á¨áá°á á°ááŁáŤáľ áŤá áĽáť áá á˘task_concurrency=1
- á¨á ááľ á°ááŁá á ááŤáł á¨á°ááŁá ááłááá˝á á á ááľ áá ááľáááá áĽáá¨áááááᢠá áá áá፠á¨áĽááá˝á á á ááľ áá ááľááá áĽáááááVerticaOperator
(á á ááľ á á¨á´á áá áááá¨áľ);report_update >> [email, tg]
- áááVerticaOperator
á°áĽáłá¤áá˝á áĽá áááááśá˝á áááá áá°áŁá አáŁ
ááá áá á áłáá áŚááŹá°áŽá˝ á¨á°ááŤáŠ á¨ááľáááŞáŤ áááłáá˝ áľááá¸á á ááľ áĽáť áá á¨áá°áŤáᢠá áá áĽá᳠ᣠááá ááá á áľááš ááľáá áááľáá
áľá áĽááľ áááľ áĽáááŤáá áááŽáá˝ áĽá áá°ááťá¸á - á°ááááŽá˝.
áááŽáá˝ á¨á°ááŤáŠ á áá áá¨ááá˝á áá° áŚááŹá°á ááááŽá˝ á¨áá°áŠ á¨ááá áŚáł áŤá˘áá˝ áá¸áᢠáááłáᣠáĽáá°áá áĄ-
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
áá° á¨á ááľ á°áááá áááśá˝ áá°áá execution_date
á á
áš áá YYYY-MM-DD
: 2020-07-14
. á áŁá áĽáŠá ááá á¨á ááľ á°ááááŽá˝ á á ááľ á¨á°áá°á á°ááŁá ááłá áá á°á¸ááá¨áá (á áá áĽááł ááľáĽ áŤá áŤáŹ) áĽá áĽáá°áá á˛ááá áŚáł áŤá˘ááš áá° á°ááłáłá áĽá´áśá˝ áá°ááá˘
á¨á°áá°áĄáľ áĽá´áśá˝ á áĽáŤááłááą á¨á°ááŁá ááłá áá á¨á°á°áŤáá ááá á áá áá ááłáŠ áá˝ááᢠá°áĽáłá¤ á¨ááá á°ááŁá áĽáá°áá áááĄ-
áĽá áľááá áááĽááľ á ááá á°ááŁá áá-
áá
áἠááá áľáŞáľ á áĽáŽ á¨á°á°áŠ áááŽáá˝ áá áááá áĽáá
ááááá˘
á¨áá á á áá á á°á°áŞáá˝ áĽáá á¨áŤáłá˝áá áááŽáá˝ ááá áĽáá˝áááᣠáá ፠áá áłáŞá ááá˘
á áľááľá á¨á°ááášáľ áááŽá˝ á á°á¨á᪠á¨áĽáá á°ááááŽá˝ áĽá´áśá˝á áá°áŤáľ áĽáá˝ááá (áá
á á¨áá
á áá áŁáá áŽáľ ááľáĽ á áľááľá á°á áááŠá áľ)ᢠááľáĽ áĽááá á Admin/Variables
áááľ áááŽá˝áĄ-
áá áá á¨áá˝ááľ ááá ááá:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
áĽá´áą scalar ááá JSON ááá áá˝ááᢠá JSON ááłááĄ-
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
áá°ááááá ááá á¨áááľá°áá ááááľ áĽáť áá áááĄ- {{ var.json.bot_config.bot.token }}
.
á áĽáŹá á ááľ áá áĽáááŤáá áĽá áľá á ááľ á
á˝á áłá áá˝ áĽááł á áłááťááᢠáááááśá˝. ááá ááá áĽáá
á¨ááááŞáŤ á°á¨á áá: á ááš áá Admin/Connections
áááááľ áĽáááĽáŤáá ᣠá¨ááá˘áŤ / á¨ááá áááťá˝áá áĽá á¨á áá á¨á°áá°á áááŞáŤáá˝á áĽá፠áĽáá¨ááŤáá ᢠáá áĽáá°áá
:
á¨ááá ááá˝ áááľá á áá˝áá (á¨ááŁáŞá á á áá á á°ááĽ) ᣠááá á¨áááááľ á áááľá áá°á áá˝áá (áĽá áĽááłá°á¨ááŠáľ) tg_main
) - áĽáááłá áá á¨ááááśáš áááá á á¤á áá°áľ áá´áá˝ ááľáĽ á¨á°á áá¨á¨ áĽá áá° ááá áŽáśá˝ ááľáĽ áłááᣠáá°á á áá˝áá (á áľáááľ á¨áá ááá ááá áŤáá°á¨áአáĽáŁááá áŤááá) áá ááŹá˛áśá˝á áĽááłááá á¨ááŤáá°á ááá ááá á¨áá ᢠáľá.
á á°ááłáłá áľá áĽá áááááśá˝á ááľá¨á áá˝áá-á áá
áááł, áá´á BaseHook.get_connection()
á áľá á¨ááŤáááá, áá°á áá á áááá° á¨á ááŤáł áľáá˝ (áŽáááľ áŽá˘áá ááľáŤáľ á¨á áá ááááŤáłá áááá, ááá áá á á á¨á áá°áľ ááá˘áá˝ á
áá áá áĽáá°áá).
á°ááááŽá˝ áĽá áááááśá˝ á áĽááá áááľ áĽáŠ ááłáŞáŤáá˝ áá¸á ᣠáá áááá ááááŁáľ á áľááá áá-á¨áľáášá á¨áá°áśá˝áá áááá˝ á áŽáą ááľáĽ áŤá¨ááťá ᣠáĽá á¨áľááš áááá˝ áá á¨á áá°áľ ááá¨ááť áĽáá°áá°áĄá˘ á á ááľ á áŠá áĽá´áąá á ááĽááľ ááááἠááš ááá áá˝áá, áááłá, á¨ááľáł áłáĽá, á UI. á áá á áŠá, áá á ááá áá° á¨ááłááľ á á áł ááááľ áá, áĽá (áĽá) áááľááá°á áááá.
á¨áááááśá˝ áá ááľáŤáľ á ááą á°ááŁá ááᢠááá ááá˝. á á á ááá á¨á á¨á áá°áľ ááá ááá˝ á¨áśáľá°á ááá á áááááśá˝ áĽá á¤á°-ááťáááľ áá ááááááľ ááĽáŚá˝ áá¸áᢠáááłáᣠJiraHook
á¨á፠áá áĽááľáááá á°áá áá áá¨ááłá (á°ááŁáŤáľá áá° áá áĽá áá° ááľ ááááłááľ áá˝áá) áĽá á áĽáá SambaHook
á¨á áŤáŁá˘ áááá áá° áá ááŤá áá˝ááᢠsmb
- ááĽáĽ.
áĽá áŚááŹá°áá áá°áá°á
áĽá áĽáá´áľ áĽáá°á°á°áŤ ááá¨áľ á°ááá ááᢠTelegramBotSendMessage
áŽáľ commons/operators.py
á¨áĽááá°áá áŚááŹá°á áá;
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
áĽáá ᣠáá á á á¨á áá°áľ ááľáĽ áĽááłá ááá áááᣠááá ááá á áŁá ááá ááá˘
- á¨á°áá¨á°á
BaseOperator
áĽááľ á¨á á¨á áá°áľ-á°áŽá áááŽá˝á á¨áá°áá á (á¨áĽá¨ááľ áááá áááá¨áą) - á¨áłáá ááľáŽá˝
template_fields
ááá áááááŁá á áááŽáá˝á á¨ááááá áľá˘ - áľáááá ááááŽá˝á á ááá
áˇáá˘
__init__()
, á áľááá áá á˛áá ááŁáŞáá˝á áŤááá. - áľá á áľá á áŤáľ á ááááá á áá¨áłááá˘
- á°ááłá ááá ááá á¨ááˇáá˘
TelegramBotHook
á¨áĽáą á¨á°áá á ááá á°ááĽáá. - á¨á°áťá¨ (á¨á°ááá¸) áá´
BaseOperator.execute()
áŚááŹá°áŠá ááááá ááá á˛á°ááľ á¨áľáá á¤ááá áááá ááŁá - á áĽáą ááľáĽ áááŁáľá á áááłáľ áááá á°ááŁá áĽáá°ááĽáŤáá. (á áááŤá˝á áá á áľááá ááĽá°áástdout
иstderr
- á¨á á¨á áá°áľ áááá ááá áŤá áá ᣠá ááŤáá áááł áá áááá ᣠá áľááá á áááá áľ áá áŤá áá¸ááá˘)
áŤááá áĽáá commons/hooks.py
. á¨ááá á¨ááááŞáŤ áááᣠááá áá áŤáąáĄ-
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
áĽáá áá ááĽáŤáŤáľ áĽááłááĽá áĽááłá á ááá áᣠá áľááááášá ááĽáŚá˝ áĽáť á áľá°ááŤáááĄ-
- áĽá áĽááááłáá ᣠáľá ááááŽáš á áľáĄ - á á áĽááááš ááłáŽá˝ á ááľ áááá
conn_id
; - áá°á á áá´áá˝á áááᥠáĽáŤá´á ááľá¤á ááá˘
get_conn()
, á áá ááľáĽ á¨áááááľ áááŞáŤáá˝á á áľá á áááá áĽá áááá áĽáť á ááááextra
(áá á¨JSON ááľá áá)ᣠáĽá (á áŤá´ áááŞáŤ áá°á¨áľ!) á¨á´áááŤá áŚáľ ááľáá°áŤ á áľáááĽáŠáĄ{"bot_token": "YOuRAwEsomeBOtToKen"}
. - á¨áĽáá ááłá áĽááĽáŤáá
TelegramBot
, á¨á°áá°á ááááľ á ááľá áľ.
ááźá áá. á áá áá á°áá áá á¨ááá á ááááľ áá˝ááᢠTelegramBotHook().clent
ááá TelegramBotHook().get_conn()
.
áĽá á¨ááá ááá°á ááá ᣠáá´áááŤá REST á¤áá á á¨áááᎠáá á
á፠á¨áá°áŤá ᾠᣠá°ááłáłá ááááá°áľ python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
áľááááá ááááľ áááá áá¨á áá-
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- á á°á°áŞá ááľáĽ ᣠá¨á áἠáá¨ááť ááľáĽ áŤáľáᥠáĽá ááááľ ááá ááľáĄáľá˘
áá á áá áĽáŤá áá áłá á¨á ááᣠááťáťáŤ á á°áłáŤ áááł áłááłáŤ áááˇá áĽá á á°áᥠáá á¨áľá á°áľ áááĽááľ ááŠáᢠáľá á°áľ áááá ááá¨áľ áĽáááŤáá...
á áĽá áśá
ááľáĽ á¨áá ááá á°á°á á¨! áľáá áĽáá á¨áá á¨á ááá á ááá á¨á? á áľááá!
ááľááľ áá?
á¨áá ááá áĽáá°áááá áá°áááłá? á¨SQL á°áá¨á áá° áŹáá˛áŤ ááááá áá á¨áᣠáááľáá á¨áá ááľáś á¨áááą áᣠááŤáł!
áá á á¨áááŤá áľáááľ áá á°áĽá á¨á°ááá áá áᣠá ááá á ááłááľ á¨áááľ á ááŁáŚá˝á áááłáľ áá á¨áĽáᢠá áá á¨á áá áááľ áá˝áá.
áĽá áłá˝á áá áá ááĄ-
- áłá á áľáá
- á°ááŁáŤáľá ááá á
- ááá ááá áĽáá´áľ ááá áĽáá°áá áááá¨áą
- áááááľ á¨ááá-áá ááĽáŽá˝á áááľáĄ
- á¨SQL á áááá ááἠáŤáá
- áááĽá áá° Vertica áŤáľááĄ
- áľáłá˛áľá˛ááľá áá°áĽáľáĄ
áľááá
ᣠáá
áá áá áááľáŹáľáŁ á áĽá áá áľáá˝ áá᪠á áľáááŤáá docker-compose.yml
:
áśá¨á-á áááĽá.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
áĽá፠áĽáááłáá-
- Vertica áĽáá° á áľá°ááá
dwh
á áŁá á ááŁáŞ á áá áśá˝ ᣠ- áśáľáľ á ááŁááὠᨠSQL á áááá áŁ
- á áááá ááľáĽ á¨ááἠááłáá˝á á á ááłááľ áá¨ááá˝ áĽááááá (á ááá áááł á áááá¨áą
mssql_init.py
!)
á¨áŁááá áá á á áá áľáá˝ á á°ááłá°á áľáĽáá á ááłáá áááá áááŤá áááŽá˝ áĽááľáááŤááá˘
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
á¨áĽá á°á áá áŤáśááá á¨áá á¨á ᣠááĽáá áá áá áá˝ááᢠData Profiling/Ad Hoc Query
:
ááá ááá áá°ááłáá˝ ááłá¨áľ á áá°áá
áá á áĽáŤáŤ á¨á˘á˛á¤á ááá áááá˝ á ááááá ᣠáĽá፠ááá ááá ááá áá-áá á¨áąá áĽáá°áŤáá ᣠá áĽáą ááľáĽ ááááľ á á ᣠáááá ááá á¨á ááľ á áľá°áłáłáŞ áá áĽáá ááááá ᣠáĽá á áá áá áá áĽáá°áááá
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
ááá áá.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
ááá á°ááˇá á¨áĽáá ááἠá°áĽáľáĽ á¨á ááľ ááś á°áŠá á á¨á´áááťá˝á. á áŁá áľááá á ááá¸á ááľááŽá˝ áĽááłáł áá áá áĽááľááá-
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- á ááá á áĽááłáł á¨á á¨á áá°áľ áĽááááá
pymssql
- áááááľ - á áĽáŤáá ááľáĽ á áá ááá áĽááłá áĽáá°áŤ - á á áĽááľ áá°áŠ áá° á°ááŁáŠ ááŁááá˘
- áĽáŤááŤá˝áá ááááĽ
pandas
áá áŤááááDataFrame
- ááá°ááą áá á ááá.
ááľá áĽá¨á°á ááአááá˘
{dt}
á¨áĽáŤá áááŞáŤ ááá%s
áĽá áá áááşáŽ áľáááአáłááá áľápandas
áááá á áá˝áápymssql
áĽá á¨áá¨á¨áťáá áŤáá¸áŤáľáłáparams: List
áĽáą á áĽáááľ á˘ááááá˘tuple
.
áĽáá˛áá ááá˘áá áἠáá ápymssql
áĽáąá áááá°áá áá°á áĽá ááááá ááá á áá ááá˘pyodbc
.
á¨á á¨á áá°áľ á¨á°ááŁáŤá˝áá ááááŽá˝ á áá áĽáá°ááá áĽáááá¨áľáĄ-
ááá ááἠá¨áá, á¨á፠áááá á ááá áááł á¨ááá. ááá áá ááááľá á á°áłáŤ áááł ááá á áĽáááł ááá áá. áá áá áľá á°áľ á áá°áá. á -á á -á á ᣠáá áá°á¨á?! áĽá áá áĽáá°áá áĽáááĄ-
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
ááá áľá
á°áśá˝ áĽáá°áá áá á¨á áá°áľ áááá¨áá ᣠáá á°ááŁáŠá áĽááááááᢠá ááááš á á¨ááá´ ááá áá áŤáŹ á ááá¨áá, áá áŽá.
áááŁá˝áá áĽááá¨áá á ááŤáł áááśá˝:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
á¨ááłááá-
- áľáĽááá á¨áá°áľáá áľ áłáłá¤ááŁ
- á¨ááá ááá áááŤá˝á ááłáá፠(á¨á°áᨠááááᢠááĽáŤááłááą á°ááŁá),
- á¨ááአáĽá á¨áľááá ááłáá፠áá˝ - á áá¨á¨áťá á¨ááἠááł (ááá ááá á á ááľ á á¨á´á ááľáĽ á¨áááľá áľ) áአá¨áľááá ááłáá፠á áá á˘
á¨áá¨á¨áťá á°á¨á áááŤá: áááá ááá áá° áŹáá˛áŤ áŤááľáą. áĽáᣠá áááá áááłáŁ áá á áááľá¨á á áŁá á áľá°áá áĽá áááŁá á¨áá ááááśá˝ á ááą á CSV á áŠá áá!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- áአá°ááŁá áĽá¨á°áŤá ááá˘
StringIO
. pandas
á á°áááľ á¨áĽáá áŤáľáááŁáá˘DataFrame
áCSV
- ááľááŽá˝.- á¨á°ááłá áŹáá˛áŤ áá áááááľá á ááá á áĽááááľá˘
- áĽá á áá á áĽááłáł
copy()
áááŁá˝áá á ááĽáł áá° á¨áá˛áŤ áááŠ!
áá áŤá á ááľááŽá˝ áĽáá°á°áá á¨ážáአáĽáááľáłáá áĽá áááá-ááá á áľá°áłáłáŞ ááá ááá á°á á áĽáá°áá áĽááááŤááá˘
session.loaded_rows = cursor.rowcount
session.successful = True
ááźá áá.
á á˝áŤá áá, á¨áłáááá á°áá á áĽá áĽáááĽáŤáá. áĽáá ááŤá´ áľáá˝ áá˝á áááľáŠááĄ-
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
áĽá¨á°á ááአááá˘
VerticaOperator()
á¨ááἠááł ááľá áĽá á áá á¨áĽ áĽááĽáŤáá (á áĽááἠáĽááą á¨áá). ááá ááá áĽáááášá á áľááá áááááľ áá-
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
áá ááá
- á°á á, - áľááš á áἠá á, - á áá°áá, á áá
áĽá á áŤáŤ ááľáĽ á áŁá á áľá᪠áĽááľáł áĽáá°ááአáĽááá á áááľ?
áá፠áśáááľá°á ᣠááŠáá
áĽá áĽáá°ááľá á áĽáá áŁáá°á¨áŚáź ááľáľá á˘áŤááĽááᥠáá á ááĽááľ á¨á˘á˛á¤áá áá°áľ á¨áŁáś á¨áááĽá áĽá á¨ááááᥠáĽááą á SSIS áĽá á ááłááľ áĽá áĽá á¨á á¨á áá°áľ áá...áĽáá á¨áĽááá áááááľ áĽáááťá˝áŤáá ááᣠá ááá á á áŁáŤ áĽáá°áá°á áľáŁá¸á á¨ááľáľáá áááľááá!
áľáá˝ á¨á áá á áá ááá á¨áá, á¨ááŤá Apache Airflow - áá°áśá˝á á ááŽááŤá áŽáľ ááá á áááá˝ - áĽáŤáŹá á áŤá. áĽá á¨á áá ááš áĽá á áľá°áłá˝á˘
á ááľáĄ áŤáá°áá°á extensibility, áááąá á°á°áŞáá˝ áĽá scalability áŤááá áááŁá á ááá, á ááááá á áŤáŁá˘ áááľ ááťáá á¨á á¨á áá°áľ ááá áá áĽáľá áá°áĽááłá: áĽááłá, á¨áá°áĽá°áĽ, á¨áááááľ áĽá ááἠáá°áľ áá áá°áľ ááľáĽ, áŽáŹáśá˝á (áá° áááľ, ᨠáŽááľ)á˘
á¨áá¨á¨áťá ááá, ááŁááť áĽá áá¨á
áá áá° á¨á°á á°áĽááá á áá°á á°ááŤ
start_date
. á áᣠáá á áľááľá á¨á áŤáŁá˘ áľááľáł ááᢠá áśá áá áááástart_date
ááá áŤáááᢠá á ááŠáŁ á¨ááášstart_date
á¨á áá áá, áĽáschedule_interval
- á ááľ áá, á¨ááŤá DAG áá ááá áá°á áĽá ááááŤá.start_date = datetime(2020, 7, 7, 0, 1, 2)
áĽá ááá á°á¨á᪠á˝ááŽá˝ á¨áá.
á¨áĽáą áá á¨á°áŤáŤá áá á¨á ááľ áá áľá á°áľ á ááĄ-
Task is missing the start_date parameter
, áá á áĽááá áá á¨áłá áŚááŹá°á áá ááŤáŤáá áĽáá°á¨áą áŤááááłá.- ááá á á ááľ áá˝á áá. á á ᣠáĽá áá°á¨áśá˝ (á¨á á¨á áá°áľ áĽáŤáą áĽá á¨áĽá á˝áá) ᣠáĽá á¨áľá á áááá ᣠáĽá á¨áá á°ááł á á᪠ᣠáĽá á°áŤá°áá˝á˘ áĽá áĽáá˛áŤáá á°ááˇá. ááá áá á¨áá á áá á¨á áááááśá˝ áĽááľ áĽá¨á¨áᨠáĽá PostgreSQL ᨠ20 ms ááá á 5 á°á¨ááľ ááľáĽ áá ááá ááá˝ ááľá áľ á˛ááá, ááľá°á áá°áľáá.
- á¨á áŤáŁá˘ á áľááá. á áá áá á áአáá á°ááá áá, áĽá ááľááá áá° áá°á áŤá á°áá°áá. á¨á áŤáŁá˘ á áľááá áĽáľáŤáá á á áááááᣠá áá áá á˘áŤááľ á¨á ááľ á°áŤá°á áá á¨ááá°áá áľ áá ááᣠáĽá áá° CeleryExecutor ááááľ á ááá¨á ááľáŤáľ á ááĽáᢠáĽá á á ááľ áá˝á áá á¨áĽáą áá á áĽáŽ ááľáŤáľ á¨ááťáá á ááťá á´ááŞá á á áááአáá áĽááłá á¨áá áá á¨ááŤááľááľ ááá á¨áá ᣠáá á âá áĽááἠᣠá áĽááṠᣠá ááŤá˝ áá° áááľ á áááľá!â
- áŤááá áá á áĽáŽááἠááłáŞáŤáá˝:
- áááááśá˝ á¨á áááááľ ááľááááśá˝á ááá¨áá¸áľ áŁ
- SLA ááá á á°ááą ááá°á አáĽáŤáá˝ ááá˝ ááľá áľ áŁ
- xcom áááłáłáł áááἠ(á ááŠáľ ááłáááĽ!) á áłá á°ááŁáŤáľ ááŤá¨áá˘
- á¨á°áĽáłá¤ á áááŁáĽ áá ááᢠá°á á, áá áááľ áĽá˝ááá? áááá á¨áá°á á°ááŁáŤáľ áľáááážá˝ ááááŤáá˝ á°ááá á°ááᢠá áá á¨áĽá áľáŤ Gmail>90k á˘áááá˝ á¨á á¨á áá°áľ á ááᣠáĽá á¨áľá ááá á ááá á á ááľ áá á¨100 á áá ááááłáľ áĽá ááá°á¨á ááá°á á ááááá˘
á°á¨á᪠ááĽááśá˝:
Apache á¨á á¨á áá°áľ ááľááśá˝
á°á¨á᪠á ááśáá˛á ááłáŞáŤáá˝
á áĽáá˝á áłááá á ááá ááłá˝á áĽááľáá°áŤ á¨á á¨á áá°áľ áá áá á ááá áśááááĄ-
ᨠREST ᤠá á á - áĽáą á ááá á¨áá¨áŤ á°á¨á á áá, áĽáąá áĽááłáá ፠á áŤáá°áá. á áĽáą á ááŤáááľ áľá áłááľ áĽá á°ááŁáŤáľ áá¨áá áĽáť áłááá áááá / áááá, DAG Run ááá áááł ááá á áá˝áá.CLI - áĽá ááłáŞáŤáá˝ á áľááá ááľáአá áŠá á WebUI ááá áá á¨ááááš áĽáť áłááá á á á ááá á¨áááá áá¸áᢠáááłá:backfill
á¨á°ááŁá áááłáá˝á áĽáá°áá áááľááá áŤáľááááá˘
áááłáᣠá°ááłáá˝ ááĽá°á áĽáá˛á á ááĄ- âáĽá á áá° áá°áᣠá¨ááá᪠1 áĽáľá¨ 13 áŁáá áá¨á áá áľááá á¨áá˝ ááá á áá ! á áľá°áŤááᣠá áľá°áŤááᣠá áľá°áŤááᣠá áľá°áŤáá! áĽá áĽááľá áĽáá°áá á áááľ áἠáááľ:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- á¨áá á¨áľ á áááááľ;
initdb
,resetdb
,upgradedb
,checkdb
. run
, áá á á ááľ ááłá á°ááŁáá áĽáá˛áŤáŤááą áĽá á ááá áĽááááśá˝ áá áĽááłá áá¤áľ áĽáá˛áŤáᥠáŤáľá˝áááłá. á á°á¨ááŞá, á á áŠá áááľ áá˝ááLocalExecutor
á¨á´áá¨á áááľá°á á˘ááááľá.- á áŁá á°ááłáłá ááá áŤá°ááá
test
, áĽáť á°áá á¤á ááľáĽ ááá á áá˝áá. connections
á¨á áááą áĽá áááááśá˝á áááá á áŤáľá˝ááá˘
áááśá á¤.á.á á. - áááááá˝ á¨áłá°á áĽá á áľáá˝ áĽáá˝ á¨ááááĽá á¨áááľáŽá á¨áááááľ ááááľá˘ áá áĽááłáááľ áá á¨áá¨ááá˘/home/airflow/dags
ᣠáŠáĄipython
áĽá áááŁá¨á ááá? áááłá áááá áááááśá˝ á áá¨á°áá áŽáľ áá° áá ááá áá˝áááĄfrom airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- á¨á á¨á áá°áľ ááłáłáłá¤á áá á áááááľ ááᢠáĽáá˛á˝áááľ á ááááá ááá áá áá°ááŤáŠ áአáááŞáŤáá˝ á¨á°ááŁá áááłáá˝á ááááľ á¨ááááá á¤áá ááá˝ á¨á áá ááŁá áĽá ááá ááá áá˝ááá˘
áĽáá á ááá á°ááŁáŽáťá˝á áááá á áá°áá ᣠáá á ááłááľ áá áááľá áá˝áá ᣠáĽá áá á¨á°ááá° ááᢠááá áá áĽááľ áĽááłáá˝ ááľááá á á áŤáŁáŞ áá¸á, áĽá ááá°á˝ á áľááá áááá.
á°á ááá SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ááŁááťáá˝
áĽá á áĽáááĽáŁ ᨠGoogle áááŁáľ á¨ááááŞáŤááš á áľá á áááá˝ á¨á á¨á áá°áľ á ááá áááśá˝ á¨áááŁáśá˝ áá¸áá˘
Apache á¨á á¨á áá°áľ á°ááľ - áĽááἠáá, á¨á˘áŽá áááá á ááĽá. á°ááśá˝ ᣠáá áááŞáŤáášá áá áŤááŁá¸áá?ááἠáááśá˝ - á°á á, á˘áŤááľ á¨ááŁáŞáá˝ á¨á°á°áĄáľá áááŽá˝ áŤááĽáĄ.á¨á á¨á áá°áľ áŠá á - á áŁá á áá-á áľááá˝ ááľáĽ á¨á°á áá á áááá˝á¨ Apache Airflow ááá á áá° ááłáŚá˝á áá¨áłáľ - áá°á¨áłá á áá°-ááłáŚá˝ á á°áἠá°áĽáŤáá°áá, á¨áá (á áľáááľ!) á¨áĽá á¨áá ááá á áááŁá á.á¨á˛áŤáááá áĽáá â á¨á á¨á áá°áľ á áááá/áááľá°á áĽáá´áľ áĽáá°áááᣠáááŞáŤ - á¨á á¨á áá°áľ áááľá°á ááááá á áá áááŞáŤá˘Apache á¨á á¨á áá°áľ á áááľ áá á áááľ áá - á¨áá áá°á á°ááłáłá áłá˘ á˝áá, ááááŁáľ á°á¨á᪠formalism, áĽá áĽááľ ááłááá˝ á áľá°áá.Apache á¨á á¨á áá°áľ á á´á᪠á°áŤá°áá˝ áá áľáŤáá˝á áĽáá´áľ áĽáá°ááŤá°áŤá - á¨á´áá¨áŞ áá á áá°áŁá á áľáááĽáŤáľ.á á ááź á¨á á¨á áá°áľ ááľáĽ DAG ááťá ááἠáááśá˝ - áľá á°ááŁáŤáľ á á á ᣠá¨áá ááá ááłáá፠ááŤá ᣠáááἠᣠá¨ááá á áááá áĽá ááá˝ á áľá°áłá˝ áááŽá˝á˘á Apache á¨á á¨á áá°áľ ááľáĽ áĽáááá˝á ááľá°áłá°á - á ááá áá áĽáť á¨á ááľáŠáľ á¨á°ááŁáŽá˝ áĽááááľ áĽá ááľáá˝ á°ááĽá˘á¨á á¨á áá°áľáĄ- á¨áĽááľá DAG á¨áááááĽá ááᣠá áŁá á˛áá - á áá áááááĽá ááľáĽ á ááłááľ "áĽáá°áłá°á á áá°áŤá" áĽáá´áľ áá¸áá áĽáá°ááťá, á¨á ááá ááἠááŤá áĽá á áľá፠ááľá áľ.á Apache Airflow á áá á¨SQL áá ááá˝ - á áá á¨SQL áĽáŤááá˝ áá á¨á áá°áľ ááłáłáłá˘á Apache Airflow á¨áľáŤ áá°áśá˝á ááłá á áááአ- áĽá áłáłá˝ áľáááá á á áá ááá á áá˘Fetchr Data Science Infra á AWS áá á¨ááŹáľáś áĽá á¨á á¨á áá°áľ áá ááááŁáľ - á AWS ááłáł áłáááľ áá á¨á° áááľ áľáááááŁáľ á áľá°áłá˝ á áá ááľáłááťá˘á¨á á¨á áá°áľ DAGáá˝á á˛áłá¨á 7 á¨á°áááą áľá á°áśá˝ - á¨á°áááą áľá á°áśá˝ (á ááľ á°á á ááá áááŞáŤáá áłáŤáἠá˛áá).Apache Airflow á áá áá á¨ááá áá áŤá¨ááš áĽá ááľá¨áą - ááá áĽááłá áááááśá˝á áĽáť áá áá á¨ááľá˝á á˘ááá á°áá˝ á¨ááá ááá˝á áĽáá´áľ áĽáá°ááŤá¨ááťá ááá áá áá˘á¨áá áŚá Python áĽá Apache á¨á á¨á áá°áľ - áľáá á¨DAG ááľá°áááᣠááá°-á˝áá á°ááŁáŤáľá ááŁáᣠáĽáá°áá áľá áĽááá˝ áĽá áĽáá˛áá á¨á°ááŁá á ááá áľá ááááá˘á¨á á¨á áá°áľáĄ áĽáá áŤááłáá á áá áááŽá˝áŁ áá´áá˝ áĽá ááἠáááśá˝ - áľá á á ááádefault arguments
иparams
á á áĽááľ, áĽáá˛áá á°ááááŽá˝ áĽá áááááśá˝.á¨á á¨á áá°áľ áááááĽáá ááľá°ááá áĽá áľ á ááŞá áá á¨á áá°áľ 2.0 áĽáá´áľ áĽá¨á°ááá áĽááłá áłáŞáá˘Apache Airflow ᨠ3 Selery á áŤá°áá˝ áá á áśáá°á-áŽááá - á¨áĽáá ááá áá° ááľáĽ áľááá°ááŤáľ áľáá˝ áá áŤááá áľ ááŁáĽádocker-compose
.á¨á á¨á áá°áľ á ááľ á áá áá 4 á°ááŁáŤáľá áá á¨á˝ - á áĽááśá˝á áĽá á ááľ ááľá°áááá á áá áá á°áááá á°ááŁáŤáľá˘á á á¨á áá°áľ ááľáĽ áŤá á¨áľá á°áľ ááłáááŤáá˝ - áá°á á áĽá áĽá ááłáááŤáá˝ á ááľáł áĽá á Slacká˘á¨á á¨á áá°áľ á áá° áĽááľáĄ ááľáĽáľáĽ DAGs áŤá ááŤáá˝ - á¨á áááŤá áľáŤáá˝, áááŽáá˝ áĽá XCom.
áĽá á á áááš ááľáĽ áĽá á áá á¨áá á áááá˝-
ááᎠááŁááť - á á áĽááľ ááľáĽ áĽá á áá á¨ááá áŚáłáá˝ áŤá˘áá˝á˘á¨á°áááą ááĽááśá˝-á¨á á¨á áá°áľ - áłáá˝á á˛ááĽáŠ á¨á°áááą áľá á°áśá˝.puckel/docker-á¨á á¨á áá°áľ: Docker Apache á¨á á¨á áá°áľ -docker-compose
ááá¨áŤ, ááá¨á áĽá áááá˝á.python-telegram-bot/python-telegram-botáĄ- áĽáᢠá¨ááľá˝ááá áá á á፠á ááá á°áá - á¨áááá áá á á፠áá´áááŤá REST APIá˘
ááá: hab.com