αα½ααααΈ αααα»αααΊ Dmitry Logvinenko - αα·ααααααα·ααααααααααΆααααααΆααα·ααΆααααααα»ααααα»αα αα»α Vezet α
αααα»αααΉαααααΆααα’αααα’αααΈα§αααααααα’ααα αΆααααααααΆαααααααΎαααααΎαααΆα ETL - Apache Airflow α ααα»αααα Airflow ααΆαααΆαα αααα»α αα·αα αααΎααααΆααααα’ααααα½ααααα·αα·αααααΎαααΆα±ααααΆαααααα·αααα αααααΈααΆα’ααααα·αααΆαααααααααΉαααα αΌααα·ααααααααααα ααα»ααααα αΆαααΆα αααααΌαα αΆααααααΎαααααΎαααΆαααΆαα½αααΆαααααΆαα αα·αααΆαααΆαααΆαααααα·ααααα·αααααα½αααα
α αΎαααΆα, αααα»αααΉααα·αααααΉαααααααΆαα, ααα»αααααααααα αΆαααααα: αααααα·ααΈαααααΆαααΌαααΆα αααΎα, ααΌαααα’αααααααα·αα’αα»ααΆαααα
α’αααΈβαααβα’αααβααΎαβααΆβααααααΆβαα
βαααβαααβα’ααα Google ααΆααα Airflow / Wikimedia Commons
ααΆααΆαααΆαα·ααΆ
ααα ααααΈααααΆα αααααααααΆαα ααΆααααααα (αα·αααααΉααααΈααααα·α ) α ααα»α’αααΈααΆαααΆααΎα (αα·αα’ααα) ααΆααααααΌααααα»αα ααααα αααα·αααΆααΌαααααΆα ααΎααααααΎαααΆααα·α αα ααααα·α α’αααΈααααΆ ααΎααααα»αααΎααααααα ααΆααααααΆαα ααααα αα·αα’αααααααααααα ααΆααααααααα·ααααα·ααααααΆαααααα½α ααΎα’αααααΉαα αΆαααα? αααααα
αααααα α»αααααα α―αααΆαααα αα·αααααααΆα ααα ααααΈααα
ααα ααααΈααααΆα
Apache Airflow ααΊααΌα ααΆ Django αααα
- αααααααΆ python
- ααΆαααααΆαααααααααααααα’ααα αΆααα
- α’αΆα αααααΈαααΆααααααααΆαααααα
- αααααΎαααΆα α αΎαααΆααααΌαααΆααααααΎαα‘αΎααααα»αααααααααα»αααααΆααΆαααααα»α αααααΊ (ααΌα αααααΆααααΌαααΆαααααααα»αααΆααΆ)α
- αααα»αααααΎαααΆα αα·ααααα½ααα·αα·ααααα·α αα ααΆααα ααΎαααΆαααΈαα ααα½αααααΆααααααααα (ααΌα ααΆ Celery / Kubernetes ααΆα αααΎα αα·ααααα·ααΆαααααα’αααααΉαα’αα»ααααΆαα±ααα’ααα)
- ααΆαα½αααΉαααΆααααααΎαααα αΌαααΆαααΆαααΆαααααααΈααΆαααΆααααα½ααααα»αααΆαααααα αα·ααααααΌα Python
- αα·ααααααααΆααααα»αααΆαααααΆααααΌαααααΆααα·αααααα αα·α APIs ααΆαα½αααααΆαααααααΎαααΆαααΆαα»αααααααααα½α ααΆααααα αα·ααααααα·ααΈαααα½ααααααα·ααα αααα (αααααΆαααααααα»α)α
ααΎαααααΎ Apache Airflow ααΌα αααα
- ααΎααααααΌααα·ααααααααΈααααααααααα (α§ααΆα ααα SQL Server αα·α PostgreSQL ααΆα αααΎα APIs ααΆα αααΎααααααΆααααααα·ααΈααΆααααα ααΌααααΈαα 1C) αα αααα»α DWH αα·α ODS (ααΎαααΆα Vertica αα·α Clickhouse)α
- ααααα·αααΆ
cron
αααα αΆααααααΎαααααΎαααΆαααααα½αααααα½ααα·αααααααα ααΎ ODS αα·ααααα½ααα·αα·αααααΆαααααΆααααααα½ααααααααα
αα αΌαααααααααααααΈαααα αααααΌαααΆαααααααΎαααααΌαααΆαααααααααααααααααΆαααΈαααααΌα αα½ααααααΆα 32 cores αα·α 50 GB αα RAM α αα αααα»α Airflow ααΆααααΎαααΆαα
- Π±ΠΎΠ»Π΅Π΅ 200 ααΆα (ααΆααα·αααα αΌαααΆαααΆααααααΎαααΆαααααααα·α αα ααΆα)
- ααΆαααααααΈαα½αα 70 αα·α αα ααΆα,
- ααΆαααα’αααα αΆααααααΎα (ααΆαααααααααα) αααααααα»ααα½ααααα.
α αΎαα’αααΈαααααααααΎααααααΈα αααα»αααΉααααααααΆαααααα ααα»ααααα₯α‘αΌαααα α αΌαααααααααα αΆ ΓΌber αααααΎαααΉααααααααΆαα
ααΆαααααα SQL Servers α ααα½α 50 αααααΈαα½ααααΆα XNUMX databases - instances of one project αααααααΆαα½αααααΆααα ααΆααααααααααΌα ααααΆ (ααααΎααααααααααΈαααααα mua-ha-ha) αααααΆααααααΆααΈαα½ααααΆαααΆααΆα Orders (ααΆααααΆαααα’ ααΆααΆααααααΆαααα αααααα’αΆα ααααΌαααΆααααα»ααα αααα»αα’αΆααΈαααααααΆαα½α) α ααΎααααα·αααααααααααααααααΆαααααΆαααα (αααΆαααΈαααααααα ααΌαααααΆααα·ααααααααααα ααααααααΆαααα·α αα ααΆα ETL) α αΎααααααΆαααα₯αααΆααααα αα·ααΆαααΆ Vertica α
ααααα !
αααααααααΆαα ααΆααααααα (αα·αααααΉααααΈααααα·α )
α ααα»α’αααΈααΆαααΆααΎα (αα·αα’ααα)
αα
ααααααααΎαααΎααα αΎααααα»αααΆαααα SQL
-schik αα
αααα»αααΆααααααΆααααααα»αααααΈαα½α ααΎαααΆααααααααΆααααααΎαααΆα ETL aka ααα αΌααα·αααααααααααααΎα§αααααααΈααααααΆααααααΆαααα½αααΎαα
- αααααααααααΆααα Informatica - ααααααααααΈαααΆαααΆαααααΆαα ααα·αααΆαααααΆαα ααΆαα½αααΉααααααααΉαααααΆαααααα½α ααααααααΆαααααα½αααααααΆα αααα»αααΆαααααΎααααα αΆαααΆαα 1% αααααααααΆαααααααΆα α ααα»α’αααΈ? ααΆααααΌα α
ααα»α
αααααΆααααα ααααααααΆαα½αααΈααααααααααααΆα 380 ααΆαααΆαααααααΆαααααΌαα
α·αααααΎααΎαα ααΈααΈα ααΆααααααΆαααααααααΌαααΆααα
ααΆα‘αΎααααααΆααααααΎαααΆαααααααα·ααααα»α ααΆαααααΎααααΆααα‘αΎααα·αααΌααααΆαααΆαα»ααααΉααααααΆα αα·ααααα·α
αα ααααΆαααααΆαααααααααα α’αααΈα’αααΈαααααΆααΆααααααααΌα
ααΆααααΆααααα Airbus AXNUMX / ααααΆαααΎαααΉααα·ααα·ααΆαα’αααΈααΆααα’ααα
ααΌααααααααα ααΌαααα’ααααααα’αΆα αααααΆαααααααα»αααα’αΆαα»ααααα 30 ααααΆαααααα·α
- SQL Server Integration Server - ααΎαααΆαααααΎαααα·αααααααα
αααα»αααα αΌαααΆααααα»αααααααααααααΎαα ααΆααα·αα ααΎαααααΎ SQL Server αα½α
α αΎα α αΎαααΆααΉααα·αααα ααα»αααααααα»αααΆααα·αααααΎα§ααααα ETL ααααααΆα α’αααΈααααααααΆααα
αααα»αααΆααΊααα’: ααΆααα
ααα»α
αααααΆααααΊαααααααα’αΆα, αα·ααααΆαααΆααααααααααΆα ... ααα»ααααααααα·ααααααΆααΌαα ααα»αααααΎααααα‘αΆααααα·ααααααααα·ααΈ, α’αΌαα·αααααααααΆαααααα ααααααΆα
dtsx
(αααααΆ XML αααααΆαααααΆααααΆαααα ααΎαααααΆαα»α) ααΎαα’αΆα ααααΎααΆα ααα»ααααααΎα’αααΈαα ααΆα ααα»α ? ααΎααααΎααΌα ααααα α’αααΈααΆααααααΎααααα ααααΆααα·α αα αααααΉαα’αΌαααΆααΆαααΆααααααΈαααΆαααΈααααα½ααα αα½αααα? ααΆα, α’αααΈααααα½ααα, ααααΆααααααααααααααααα’αααααΉαααααΆααα α»αααΈααααααααα, α α»α ααΎαααΌαα»αααααα»αα ααα»ααααααΆαα·αααΆααΎααα ααΆααααααααΆααααα
ααΎααα·αααΆααΆααααααααααααΌαα ααα ααααΈααΌααααΈαα ααααΎααα ααΆαααααααααΆαααΈααααααΎααααα αα SSIS ααααααααααααααα½αα―α...
α αΎααααααΆααααααΆαααΆαααααΈααΆαααααΎααααα»αα α αΎα Apache Airflow ααΆααααααααααα»αα
αα αααααααααα»αααΆαααααΎαααΆααΆααα·αααααΆααααΎαααΆα ETL ααΊααΆααΌα Python ααΆαααα αααα»αααααΆαααααα·αααΆαααΆαααΎααααΈααΆαααΈαααΆαα αααααΆαααααααααΆαααααααΈααα·ααααααααααΌαααΆααααα αα·ααα»αααααΆ α αΎαααΆααααα αΌαααΆααΆααααααΆααα ααΆαααααααααααα½αααΈααΌαααααΆααα·ααααααααΆαααααα αααα»αααααα αααα½αααΆαααααΆαααΆαααα αΆααααΌα Python αα αααα»αα’αααααααα½ααααααα¬ααΈα 13 "α
ααΆααααααΌααααα»αα ααααα
α αΌαααΎααα»ααααα αααΆααΆααααααααααΆαααααα»α α αΎααα»ααα·ααΆαα’αααΈα’αααΈαααααΆαααααααααΆαααααα»ααα ααΈααα ααΌα ααΆααΆαααα‘αΎα Airflow ααΌαααααΆααα·αααααααααα’αααααΆαααααΎαααΎα Celery αα·αααααΈαααααααααααααΆααα·αααααΆαα αααα»αα αα
ααΎααααΈα²ααααΎαα’αΆα
α
αΆααααααΎαααΆααα·αααααααααΆαα αααα»αααΆαααΌαααΆα docker-compose.yml
ααααααα»ααααα
- α
αΌαααΎαααΎααααΆααα·αααααΆαα ααα αΌαβαααααα αααααα·ααΈαααααααα, Webserver α ααααΆααααΉααα·ααα
ααΈααα ααΎααααΈααΆαααΆααα·α
αα
ααΆα Celery (αααααααΆααααΌαααΆααα»αα
αΌααα½α
α αΎα
apache/airflow:1.10.10-python3.7
ααα»ααααααΎααα·ααααααΆαααα) - PostgreSQLααααααα»αααα Airflow ααΉααααααααααααΆαααααΆααααααααααΆ (αα·αααααααααααα·ααΈαααααααα αααα·αα·ααααα·ααααα·ααα) α αΎα Celery ααΉααααααΆαααα·α αα ααΆααααααΆααααα ααα
- RedisαααααΉαααΎααα½ααΆαααα½ααααααΆααα·α αα ααΆααααααΆαα Celery;
- αααααα CeleryαααααΉαα αΌααα½ααααα»αααΆαα’αα»ααααααΆααα·α αα αααααααΆααα
- αα
ααα―αααΆα
./dags
ααΎαααΉαααααααα―αααΆαααααααΎαααΆαα½αααΉαααΆααα·αααααΆα’αααΈ dags α αα½αααααΉαααααΌαααΆαααΎαααααΆαα ααΌα αααααα·αα αΆαααΆα αααααααααΆααααΌααα αααααΆααααΈαααααΆαααααααα
αα αααααααααα ααΌααααα»αα§ααΆα ααααα·αααααΌαααΆααααα αΆαααΆαααααα»ααα (ααΎααααΈαα»αα±αααααααΆαα’ααααα) ααα»ααααααααααααΆαα½αααΆααααΌαααΆααααααααααα»αααααΎαααΆαα α§ααΆα αααααΌαααΆαααΆαααααααα’αΆα ααααΆααα αααα»αααααΆαα
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
α αααΆα:
- αα
αααα»αααΆααα½αααααα»αααααΆαααααΆαααΆααααα»αααΆαα
αααΎαααΉαααα’ααααΎααΌαααΆαααααΈ
puckel/docker-ααα αΌαααααα - ααααΌαααααΆααααΆαα·αα·αααααΎαααΆα αααα ααααΆα’ααααα·αααααΌαααΆαα’αααΈαααααααααα αααα»αααΈαα·αααααα’αααααα - ααΆααααααααα αΌααααααααΆααα’ααα’αΆα
ααααΎααΆααα·αααααΉαααααΆαααα
airflow.cfg
ααα»ααααααΆααααα’αααααα·ααααΆα (α’ααα»αα ααααα’αααα’αα·αααααα) ααααααα»αααΆαααΆααααααααααααααααααΆααΆαα - ααΆαααααααΆαα· ααΆαα·ααααααΆααα·ααααααααααααααα½α ααΆααααα ααα αααα»ααα·αααΆαα ααααΆαα·αααΆαααααααΌααα ααΎαα»ααα αααα»ααα·αααααΆαααααα·αα»αααα ααα»αααααααα»αααΆαααααΎα’αααααααΆαααααααααααΆααα’ααααα·αααααααααααΎαα
- α
αααΆαβααΆ:
- αα Dag ααααΌαααα’αΆα α αΌαααααΎααΆαααΆααα’ααααααααααα αα·αααααααα
- ααΌα ααααΆααααααα’αα»ααααα αααααααααΆαααααΆααΈααΈααΈααΆααα’αα - αα½αααααΆααα’ααααααΌαααααααΌαααΆαααα‘αΎααα ααΎαααΆαααΈααααααΆααααααα·ααΈαααααααα αα·αααααααα
α₯α‘αΌααααααΆααΆααααα αΎαα
$ docker-compose up --scale worker=3
αααααΆααααΈα’αααΈαααΎαα‘αΎα α’αααα’αΆα ααΎαα ααα»α αααααΆααααα αααααα
- ααα αΌαβααααα:
http://127.0.0.1:8080/admin/ - ααααΆα
http://127.0.0.1:5555/dashboard
αααα·αααΆααΌαααααΆα
ααααα·αααΎα’ααααα·ααααα’αααΈααΆααα’αααα αααα»α "dags" ααΆαααααααααα αααααΊααΆαα ααΆαα»ααααααααΈαα½αα
- αααααα·ααΈααααααααααααΆ - ααΌααααΆαααααα»ααα
αααα»α Airflow ααααααααααααααΆααα»αααααααααααΎααΆααααΆαααααΆα α αΎααα·ααααααΆααα»αααααα ααΆαααΆαααΆααα·ααΆα ααααΎαα
αα
α»ααααααααΆα dags α
αΆααααααΎααα·α
αα
ααΆαα
ααΆααΌαα αα αααα»αααααα αΆαα ααΆααααΆααααα αΆααΆαα½αααΉαααΆαα αα αΆα (αα αα·ααααααΆαααααα ααααΆαααα ααα»ααααααΆαααα ααααΆα) α αΎααααΆαααΆαααααααααααα·αααααααα ααααΆααα αααα»αααΆααααααα
run_duration
- α ααααααααα αΆααααααΎαα‘αΎααα·αααααααΆα ααα»ααααα₯α‘αΌααααα’αααΈαααΊααα’α - DAG (aka "dag") - "ααααΆα αααααααΉαααΆαααα acyclic" ααα»αααααα·αααααααααααααΉαααααΆααααα»ααααα·α
αα½α
ααα»ααααααΆααα·αααΆααΊααΆαα»αααΊααααααααΆαααα·α
αα
ααΆααααααΆααααααααΆαα
αα·ααα
αα (ααΌαααΎαααΆαααααα) α¬ analogue αααααα
αααα
αααα»α SSIS αα·αααα αΌαααΆαααΆααα
αααα»α Informatica .
ααααααααΈααΎ dags α’αΆα αα ααααΆα subdags ααα»ααααααΎαααααααΆααΉααα·αααα½αααΆααα½αααΆααα
- DAG ααα - ααΎαααΆα
αααααααΌαααΆαα
αΆααααΆααααααααα½αααΆααααΆαα
execution_date
. Dagrans αα dag ααΌα ααααΆα’αΆα ααααΎαααΆαααααααααΆ (ααααα·αααΎα’αααααΆαααααΎα±αααα·α αα ααΆαααααα’ααααα·αααΆααααααΆαα αα·αααΆαα)α - ααααα·ααααα·αα ααΊααΆαααααααααΌααααααα½ααα»αααααΌααααα»αααΆαα’αα»αααααααααααΆαααΆααααΆαααα½αα ααΆαααααα·ααααα·ααααΈααααααα
- αααααααΆαααΌα
ααΆααααααααααααΎαα
PythonOperator
αααα’αΆα ααααα·ααααα·ααΌα Python ααΆαα½α (ααααΉαααααΌα) α - ααΆααααααααααΆαααααααΉααααααΌααα·ααααααααΈαααααααα½ααα
αααααααα½α αα·ααΆαααΆ
MsSqlToHiveTransfer
; - α’αααααΆα αααααΆααα·αααα ααΆααΉαα’αα»ααααΆαα±ααα’αααααΆαααααα·αααα α¬αααααααααΏαααααΆαααααα·ααααα·ααααααααααα αΌααααααααΉαααα·ααΆααααα½αααΎαα‘αΎαα
HttpSensor
α’αΆα ααΆαα ααα»α αααα αααααααΆααααααΆαα α αΎααα ααααααααΆαααααΎααααααα ααααΆααααα»ααααα αΆα α αΆααααααΎαααΆααααααGoogleCloudStorageToS3Operator
. α α·ααααααα ααααΉαα ααααΎαααΉααα½αααΆ βα ααα»α’αααΈ? αααΆαααΆαα·α α’αααα’αΆα ααααΎααΆαααααααααα αααα»αααααα·ααααα·αα!β α αΎααααααΆααααααΎααααΈαα»αα±ααααααα’αΆαααααΆααα·α αα ααΆαα½αααααα·ααααα·αααααααα’αΆαα α§αααααα αΆαααααααΆα αΆααααααΎα αα·αα·ααα αα·αααΆαα αα»ααααααΆαααα»ααααααΎααααααα
- αααααααΆαααΌα
ααΆααααααααααααΎαα
- αα·α αα ααΆαα - ααααα·ααααα·αααααααΆααααααΆα ααααα·ααα·αααΈαααααα αα·αααααΆααααΆαα½αααΆα ααααΌαααΆααααααΎαααΆααααΆααα·α αα α
- α§ααΆα αααααΆααα·α
αα
- αα
ααααααα’ααααααα
ααααααΆαααΌαα
ααΆααααααα
α
α·αααααΆααΆαααααααααααααΌααααααΌαααΆααα·α
αα
αα
αααα»ααααααΌαα·ααΎα’αααααααα (αα
ααΉααααααα ααααα·αααΎααΎαααααΎ
LocalExecutor
α¬αα ααααΆααααΈα ααααΆααααα»αααααΈCeleryExecutor
) ααΆαααααααα·ααααααα½ααα (α§. αααα»αααα’ααα - αααΆαααΆααααααααααα·ααααα·) αααααΈαααΆααααααααΆ α¬ααααΌαααα½α α αΎααααα αΌααα½αααΆα
ααΎααααααΎαααΆααα·α αα
ααΆααααΌα α αΌαααΎαααΌααααααΆααα’αααΈαααααααΆαααααΌαα αα doug ααααααΎα α αΎααααααΆααααααΎαααΉαα αΌααα αααα»αααααααΆααααα’α·αααΆααααα αααΎαα‘αΎαα ααΈαααααααΎαα’αα»αααααααααααααΆαααααα·αααααΆαααα½αα ααα½αα
ααΌα αααα αααα»αααααααααααΆαααααααα»αααααααΆ ααΆαααααααααΉαααΎααα ααΌα αααα
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
α αΌαααΎααααααααΆαααΆα
- ααααΌαααΎαααΆαα αΌα libs α αΆαααΆα ααα·α α’αααΈβαααααααα;
sql_server_ds
- αααβααΊααΆList[namedtuple[str, str]]
ααΆαα½αααΉααααααααααΆααααααΆααααΈ Airflow Connections αα·αααΌαααααΆααα·αααααααααααΎαααΉαααα αΆαααααααΎα;dag
- ααα ααααΈαααααΆαα’αααΈααΆαααααααΎα αααααααΌαααα αΆαααΆα ααα αααα»αglobals()
ααΎαα·αααΌα αααααα Airflow ααΉααα·ααααααααααΆααα Doug ααααααΌααα·ααΆαααΆα- ααΎβααΆααβαααααβα’αΈ
orders
- αααααΆααααααααααααααΉααααα αΆααα αααα»αα ααα»α αααααΆαααααααΆα - ααΆααΆααααΉαααααΎααΆαα αΆααααΈααΆαααααααΆαα’ααααΆαααααααααΈααααΆαααΈααααααααΆα
- α αΎαααΆαα½αααααΎαααΆααααα αααααααΆαα 6 αααααααα (αααααΆαααα»ααααΉαααΌααα
ααΈααααααα½αα±αα
timedelta()
α’αΆα ααα½αααααΆαcron
- αααααΆαα0 0 0/6 ? * * *
, αααααΆαααααααΆαααα·α - ααααααααΌα ααΆ@daily
);
- ααΎβααΆααβαααααβα’αΈ
workflow()
ααΉαααααΎααΆαααΆαααααΆαα ααα»αααααα·ααααα₯α‘αΌααααααα αααααΆαααααααα ααΎαααΉααααααΆααααα·ααααααααΎααα αααα»ααααααα ααα»α- α αΎαα₯α‘αΌαααααααααααααΆααααααααΆααααααΎαααΆααα·α
αα
:
- ααΎαααααΎαααΆαααΆαααααααααααααααΎα;
- α
αΆααααααΎα
PythonOperator
αααααΉαααααα·ααααα·α’ααα αααααααααααΎααworkflow()
. αα»αααααα αααααΆααααααααααα½αααα (αα αααα»αααΆα) αααα·α αα ααΆα α αΎαα αααΆαααααααα½αα―αα αααααΆαα·provide_context
αα αααα»ααααααΉαα αΆααα’αΆαα»ααααααααααααα αααα»ααα»αααΆααααααΎαααΉααααααΌαααααααα»ααααααααααααααααΎ**context
.
αααααΆαααααααα αααα αΎαααΆα’αααΈααΆααα’ααα α’αααΈαααααΎαααα½αααΆαα
- dag ααααΈαα αααα»αα ααα»α αααααΆαααααααΆα,
- αα·α αα ααΆααα½ααα·αααααααααααααΉαααααΌαααααα·ααααα·ααααααααΆ (ααααα·αααΎ Airflow, ααΆαααααα Celery αα·ααααααααΆααααΆαααΈαααα’αα»ααααΆα)α
αα·αααΆαα αΎαα
ααΎα’αααααΆααΉαααα‘αΎαααΆαα’αΆααααα?
ααΎααααΈβααααα½αβααΏαβααΆααβα’ααβααα αααα»αβααΆαβα
αΌα docker-compose.yml
ααααΎαααΆα requirements.txt
αα
ααΎααααΆααααΆααα’ααα
α₯α‘αΌαβααΆααβα αΎαα
ααΆααααααααααααααΊααΆαα·α αα ααΆααααααααΎαααΆαααααααααα·ααΈααααααααα
ααΎαβαααα αΆαβααααα·α ααΆαααΆαβααααΌαβααΆαβα αΆααβααβαααβααααααα
αααα αα·αααΆααααΆααααα ααααΆαααΆααααααα½ααααααααααααα α ααααααα ααα·αααΌαααααααααα
ααααα·ααΈααααα·αααΆααααα ααΎααα·αααααααααΎαααα
./dags
αα·αααΆαααΆαααααΎααααΆααααααααΆααααΆαααΈααα - dags ααΆααα’αααααα·ααα αααα»αgit
αα ααΎ Gitlab ααααααΎα α αΎα Gitlab CI α ααα αΆαααΆαα’αΆααααααα ααΆαααααΆαααΈααα ααααααα αΌαα αΌαααααΆαmaster
.
ααααα·α α’αααΈααααΆ
αααβαααβαααβααααααβαααα»αβααΆαβααααΆαβαααΆαααΈαβαααααΆααβααααβααΎα ααΌαβα αα αΆαβα§αααααβαα½αβαααβαααβα’αΆα βαααα αΆαβα’αααΈβαα½αβαααβααΎαβααΊ ααααΆα
αααααααΈαα½ααααααΆαααααααΆαααααααα’αααΈααααΆααααααααα
ααααααααααααΆαααααα»αααΆαα½αααΉαααΆααα·α αα αααααΆααα ααααΎααΆαα
αααααααααα½αα±αααα»αααααΆαααααα»αααΆαα½αααΉαααααΆαααΆααααααα½ααααααΆαααααααΎα:
αααααααααΊαααα»αααΊααΆαα½αααΉαααααΆα ααααααΆαααΆαααΆααα·α αα αα·ααααααααΆααααα·ααααα·αααααα½αααα
ααΎααααα»αααΎααααααα
ααΌα αααα αα·α αα ααΆαβααΆααβα’ααβααΆαβαααααα βα αΎα α’αααβα’αΆα βααβα’αααβααα½αβα ααβααΆαα
α αΎαααΆαα’αααααααα½αααΆα αααΎα - αααααΆααα ααα»αααα½αα¬ααααααααα αααα»αααααΈααααΆαααααΎααααΆααααααΉαααααΌααα Airflow ααΆαααααΆααααααααα αΆαααΆαα·αααααααα·αααΆαα·αααΆααααααααα
α’αααβααααΌαβααΎαβαααααβα ααα» α αΎαβα αΆααβααααΎαβαα·α αα ααΆαβαααβααααΆααβα α»αβα‘αΎαβαα·αα
αααα α»α ααΎααΆαααααΆαα½α ααΎαααΉαααΎααααααααΆααααααΆααααααΆααααΎαα
α’αααα’αΆα αααα·αααααΎα±αααααααααΆααα½ααααα αααααΊααΎαααααα ααΆααΆαα’αααΈαα½αααΆααααΆααααα ααΈααα α αΎααα·α αα ααΆαα§ααΆα αααααΌα ααααΆααΉααα ααΆαααααααα·ααΈααααααααα
ααΆα
αααΆααααΆααααΆααΆαααααΎαααααααααααααΎααααα»αααΆαα½αααΉαααΆααααααα αααΆααα’ααααΊαα·αααΆαααα»αααααααα - ααααα·ααααααΆα’αααΈαααααΎαααααΉαααΈ Airflow ααα ααΆαααααααΆαα· ααΎαααΆαα’αΆαα»ααααααααααα Browse/Task Instances
αααβααααΎαααΎαβα’αααΈβαααααβαααΆαβαααα»αβαααβααβαα½α α αΎαβαααααβα‘αΎαβαα·αβαα βααΌααα α α»α βααΆαα»βαααβααααΉαααααΌαα
αααααΆααααΈαααα’αΆααα½α α‘αΆαααΆαααααΈααααααΎαααΎααα ααΌα ααα (αα½ααααααα»ααααα αΆαα’ααααααα αααΆααα·ααΆααα½α α αΎα)α
ααΆααααααΆαα ααααα αα·αα’αααααααααααα
ααΆαααααααααααααΌαααΎα DAG αααααΆαα update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ
ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
ααΎα’αααααΆααα’ααααααΆααααΆααααααΎαα αα α»ααααααααΆααααΆαααΆααααα? αααααΊααΆααααααΆαααααααα: ααΆααααααΈαααααααΈαααααααααααααΌααααα·αααααα; ααΆααααααΈαααααααααααααΌαααΆαα; αα»αβααααα βααΎαβααααΎαβαααβα’αααΈαβααΆαβααΎαβα‘αΎαβα¬βαααααΆααβααααΆ (ααα’ αααβαα·αβαααβα’αααΈβααΎαβαα)α
αααβα αΌαβαα βααΎαβα―αααΆαβααααβαααβα αΎαβααΎαβααΏαβαα·αβα αααΆααβααΆααβααααΈα
from commons.operators import TelegramBotSendMessage
- ααααΆαα’αααΈααΆααΆααααΎαααΈααΆααααααΎαααααα·ααααα·ααααααΆαααααα½αααααααΎα αααααΎαααΆαααααααααααααΈααΆααααααΎααααα ααααΌα αα½ααααααΆααααααΎααΆααα ααΆαα Unblocked α (ααΎαααΉααα·ααΆααααααααααα’αααΈααααα·ααααα·αααααααΆαααααα);default_args={}
- dag α’αΆα α ααα αΆαα’αΆαα»αααααααΌα ααααΆαα ααααα·ααααα·ααααΆααα’ααααααααΆαto='{{ var.value.all_the_kings_men }}'
- ααΆαto
ααΎαααΉααα·αααΆαααΆααααααααΌαααΉααα ααα»ααααααααΌαααΆααααααΎααααααΆααααααααααααΎ Jinja αα·αα’αααααΆαα½αααΉααααααΈα’ααΈααα ααααααα»αααΆαααΆααααααααα»ααααααααααAdmin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
- αααααααααααααΆααααΆαα αΆααααααΎαααααα·ααααα·ααα αααα»αααααΈααααααΎα αααα»αααααΉαα ααα αΎααα ααΆαααα αααα»αααααΆααααΆαα’αΆαααααααΆααα’ααααΆαααααΎαααΆα ααααααααα;tg_bot_conn_id='tg_main'
- α’αΆαα»αααααconn_id
ααα½αααααααααααΆααααΆααααααΆαααααααΎααααααΎαAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- ααΆααααα»α Telegram ααΉαα ααα αΎααα ααααΆα αα»αααααΆααααΆααα·α αα ααΆαααααΆαααtask_concurrency=1
- ααΎαα αΆαααΆααααΆαααΎαααααΎαααΆααααα»ααααααααΆαααααΆαααα·α αα ααΆαααΆα αααΎααααα·α αα ααΆααα½αα ααΎαα·αααΌα αααααα ααΎαααΉαααα½αααΆαααΆαααΎαααααΎαααΆααααα»ααααααααΆαααααΆααΆα αααΎααVerticaOperator
(ααΎαααΆααΆααα½α);report_update >> [email, tg]
- ααΆααα’ααVerticaOperator
αα½αβαααα αΌαβααααΆβαααα»αβααΆαβααααΎβαααα»ααα αα·αβααΆαβααΌα βαααα
ααα»ααααα αΆααααΆααααΈααααα·ααααα·ααααΌαααααΉαααΆαααααααααααααΆαααΎαααααΎαααΆααααααααααΆ ααΆααααα½αααα»ααααααααααΉαααααΎαααΆαα αα αααα»ααα·αααααΆααααααΆα α’αααΈαααΎααα α αΆααααΌα ααΆαα·αααΌαα αααΆααααααα·α α
αααα»αααΉααα·ααΆαααΆαααααΈαααΈα’αααΈ αααΆααααΌ αα·ααα·αααααααα·αααααα½ααα - α’ααα.
Macros ααΊααΆααααααααΆαα Jinja αααα’αΆα αααα½αααααααΆαααΆααααααααααααααααα αααα»αα’αΆαα»αααααααααααααα·ααααα·ααα α§ααΆα αααααΌα αααα
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
ααΉααααααΈααα
ααΆαα·ααΆααα’αααααα·αα execution_date
αααα»ααααααααααΆα YYYY-MM-DD
: 2020-07-14
. αααααααααα’αααα»αααΊααΆα’αααααα·ααααααΌαααΆαααααΆαααα
ααΉαα§ααΆα ααααα·α
αα
ααΆαααΆααααΆαααα½α (ααΆααααααα»ααα·αααααΆααααααΆα) α αΎααα
αααα
αΆααααααΎαα‘αΎααα·α ααααααααΆααααΉααααααΈααα
αααααααΌα
ααααΆα
ααααααααααΆααααααα’αΆα ααααΌαααΆαααΎααααααααΎαααΌαα»ααααα αΆααα ααΎα§ααΆα αααααΆααα·α αα ααΈαα½ααα αααααΆαααααααααΆααα·α αα ααααΎαααα»αααα
α αΎαααΌα αααααα α―ααΆααα·α αα ααΆαα½αααΉαααΆαααααΎααΆααα½α:
αααααΈαααααααααααΆααααΌαααααααΆααααααΆαα½ααααααΆααααααα
α»αααααααααα»αα’αΆα
ααααΆααα
ααΈαααα
ααΎαααΈααααα ααα αααααΆααααα½αααΈαααααα·ααΈαααα½α ααΎαα’αΆα αααααΆααααΆααααΌααααΆαααααα½αααααααΎα ααα»αααααααααΆααΏααα½ααααα
ααααααααΈααΎααααα»αααααΆααααααααΆαα»α ααΎαα’αΆα
αααα½ααααααααα’αααααααααΎα (αααα»αααΆαααααΎααΆαα½α
α αΎααα
αααα»αααΌαααΆαααΎ)α α
αΌαααΎααααααΎααα
αααα»α Admin/Variables
ααΈαααΈααΏαα
α’αααΈααααααααΆααααα’αααα’αΆα ααααΎα
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
αααααα’αΆα ααΆααΆαααααααΆα α¬ααα’αΆα ααΆ JSON αααααα αααα»αααααΈ JSONα
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
ααααΆααααααααΎααααΌααα
ααΆαααααααΉααααα
ααααΆαα {{ var.json.bot_config.bot.token }}
.
αααα»αααΉααα·ααΆαααΆααααα½αααααΆ αα·ααααα αΆαααΌαααα’αααααααα½αα’αααΈ ααΆααααααΆαα. α’αααΈααααααααΆαααΊααααΆαααα
ααΈααα: αα
ααΎααααα Admin/Connections
ααΎααααααΎαααΆααααααΆαα ααααααααΆαα
αΌα/ααΆααααααααΆααααααααΎα αα·ααααΆαααΆααααααααΆααααΆααααααααααααα
ααΈαααα ααΌα
αααα
ααΆαααβαααααΆααβα’αΆα
βααααΌαβααΆαβα’αα·αααααΈα (βααα’α·αααα’ααβααΆαβααααΆαααΎαβ) α¬βα’αααβα’αΆα
βαα»αβααααααβααΆαβαααααΆαα (ααΌα
βαααβαααα»αβααΆαβααααΎβαααααΆαα tg_main
) - ααΆααα·αααΊααΆαααααΈααααααααααΊααΉααα
αααα»ααααΌααα Airflow α αΎααα·αα’αΆα
αααααΈαααααα·αα
αΆαααΆα
αα
αΌααα
αααα»αααΌαααααααα (ααααα·αααΎααααΆααααααααα»ααα·αααΆα Google α’αααΈαα½α ααΌααααααααΌααααα»α) ααα»ααααααααΆαα’αααΈααΉαααΆααΆααααΎαααΈααΆαααα½αααΆαααααααΈααααααααΆαααα αααααα
α’αααααα’αΆα
ααααΎααΆααααααΆααααΆα
αααΎααααααΆααααααααΌα
ααααΆ: αααα»αααααΈααααα·ααΈααΆααααα BaseHook.get_connection()
αααααααΎα±ααααΎαααααΆααααααΆααααααααΆααααααααΉααααααα±αα α
ααααα ααΈαααααααΆα
αααΎα (ααΆααΉαααα ααα»ααααΆααααα»αααΆααααααΎα Round Robin ααα»ααααααΌααα»αααΆαα
ααΎαααα·ααΆαααααα’αααα’αα·αααααα Airflow)α
Variables αα·α Connections ααΊααΆα§ααααααααααααΆαα ααα»ααααααΆααααΆααααΆααααααα·αααααΌαααΆααααααααα»αααα ααΎαααααααΆαα½αααααα αΌαααααα’ααααααα’ααααααααΆαα»ααα αααα»αααΌααααα½αα―α αα·ααααααααΆαα½ααααα’ααααααααα±αα Airflow αααααΆαααααα»αα αααααΆααα·αααα ααΆα’αΆα ααΆαααΆαααΆααααα½ααααα»αααΆαααααΆααααααΌαααααααααΆαααΆαααα αα α§ααΆα ααα αααα’αααααα»αααααΆαααα UI α αααααΆααα·αααα ααααα ααααΆααΆααααααααα ααΆαα α»α ααααα»α αααααΎα (αααα»α) α αααααα αΆααα
ααΆαααααΎααΆαααΆαα½αααΆααααααΆααααΊααΆαα·α
αα
ααΆααα½αα ααααα. ααΆααΌαα
ααααα Airflow ααΊααΆα
ααα»α
αααααΆααααααΆααααΆαα
ααααΆααααααΆααΈααΈααΈ αα·ααααααΆαααα α§. JiraHook
ααΉαααΎααααΆαααΈαααααααααααΆαααα½αααΎαααΎααααΈααααΎα’ααααααααααΆαα½α Jira (α’αααα’αΆα
ααααΆααααΈααΆααα·α
αα
αα
αα) αα·ααααααΆααααα½αααΈ SambaHook
α’αααα’αΆα
αα»αα―αααΆαααΌαααααΆααα
smb
- α
ααα»α
α
ααΆααααααααα·ααααα·ααααααΆαααααα½α
α αΎαβααΎαβαα·αβαα
βαα·αβααΎααααΈβααΎαβααΈβααααβαααβααΆβααααΌαβααΆαβααβααα·α TelegramBotSendMessage
αααααΌα commons/operators.py
ααΆαα½αααααα·ααααα·αααα·αααααΆααα
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
αα ααΈααα ααΌα ααΆα’αααΈααααααααααα αααα»α Airflow α’αααΈααααααααΆαααΊααΆααααααΆααα
- ααα½αααααααΈ
BaseOperator
αααα’αα»ααααααΏαααΆααααΆαααα½αα ααα½αααααα αΌαααααα (ααΎαααΆααααααΆαααααααα’ααα) - ααΆααααααΆααααααΆα
template_fields
ααααααα»αααα Jinja ααΉαααααααααααΆααααΌααΎααααΈααααΎαααΆαα - ααΆααααα
αα’αΆαα»αααααααααΉαααααΌααααααΆαα
__init__()
αααααααααΆαααΎαααΆααααααΌαααΆαα - ααΎααααα·αααααα α’αααΈααΆαα αΆααααααΎααααα»ααααα»αααααα
- ααΆαααΎαααααααααααααΌαααααΆα
TelegramBotHook
ααΆαααα½αααααα»α’αα·αα·ααααΈααΆα - αα·ααΈααΆαααααααα·ααα (αααααα‘αΎααα·α)
BaseOperator.execute()
ααα Airfow ααΉαααααα αααααααααααΎαααααΎαααΆαααααα·ααααα·αα - αα αααα»αααΆααΎαααΉαα’αα»αααααααααααΆαα αααααααααααα α αΌαα (ααΎαα αΌα ααααα·ααΈααα α αΌαstdout
ΠΈstderr
- ααα αΌααααααααΉαααααΆααα αΆααα’αααΈαααααααααΆα αα»αααΆα±ααααα’αΆα αααααααΆααΆααααααΌαααΆαα )
ααΌαααΎαα’αααΈαααααΎαααΆα commons/hooks.py
. αααααααααΌαααα―αααΆααααααΆαααααααααα½αααΆα
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
αααα»αβαα·αβααΉαβααΆβααααΌαβααααααβαααΆαβααΆβαα βααΈβααα αααα»αβααααΆααβααβαααβαααααΆααβα ααα»α βααααΆαααα
- ααΎαααα½ααααα, αα·αα’αααΈα’αΆαα»ααααα - αααα»αααααΈααΆαα
αααΎαααΆααΉαααΆααααα½α:
conn_id
; - ααα·ααααα·ααΈααΆααααααααααααΆαα αααα»αααΆαααααααααα½αα―α
get_conn()
ααααααα»αααααααα»αααα½αααΆααααΆαααΆααααααααααΆααααααΆααααΆααααααα αΎαααααΆααααααα½αααΆααααααextra
(αααααΊααΆααΆα JSON) ααααααα»α (ααααα ααΆαααΆαααααΆαααααΆαααααα½ααααααααα»α!) ααΆαααα·αα·ααααααααΆ Telegram bot:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - αααα»ααααααΎαα§ααΆα αααααααααΎαα
TelegramBot
αααααα±ααααΆααΌααααααΆαααααΆααααΆααααΆαααα½αα
α’ααα αΎαα α’αααα’αΆα
ααα½αααΆαα’αα·αα·ααααΈααααααααααααΎ TelegramBotHook().clent
α¬ TelegramBotHook().get_conn()
.
α αΎααααααααΈααΈαααα―αααΆαααααααα»ααααααΎα microwrapper αααααΆαα Telegram REST API ααΎααααΈαα»αα±ααα’αΌαααΌα
ααααΆ python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
αα·ααΈααααΉαααααΌαααΊααααΌαααααααααΆααΆααα’ααα
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- αα αααα»ααααααα·ααΈαααα½α ααΆαααααα»αααααΆααααΆααΆααα α αΎααααααα±ααααΆαα αααααααΎαα αα α
αααααααααααΎααααα»ααα·ααααΆααΆααα’ααααα ααΆαααααΎαα αα α»ααααααααΆααααΆαααΆαααααααααΎαααΆααααΆαααααααααααα α αΎαααααΎααΆαααα α»ααααααα»ααα αααα»αααα»αααα·αα αααα»αααΉααα·αα·αααααΎαααΆααΎααΆαα»α ...
α’αααΈαα½αααΆααααα»ααα
αααα»αααααααααααααααΎα! ααΎααααα·ααααααΆα’αααΈαααααΎαααααΉααα»ααα? αααΆαβαα·αααααΆαα!
ααΎα’αααααΉαα αΆαααα?
ααΎα’αααααΆαα’αΆααααααααΆαααα»αααΉαα’αααΈαα½ααα? ααΆα αΆααααΈααΌα ααΆααΆααααΆααααααΆααΆααΉαααααααα·ααααααααΈ SQL Server αα Vertica α αΎααααααΆααααααΆααααΆαααααΆα αα α αΎαααααΆααααααΌααααααΆαααααα ααΊα’αααααΎαααΆα!
α’αααΎβααααα βαααβααΊβααΆαβα ααααΆ αααα»αβααααΆααβααβααβααααΆαβααΆαααβαα½αβα ααα½αβαααααΆααβα’αααα α₯α‘αΌααααα’αααα’αΆα αα αααααααααα
αααααΆαααααααΎαααΊαααα
- ααααΎααΆα
- αααααΎααα·α αα ααΆα
- ααΎαβααΆβααα’αΆαβααα»ααααΆβαα β
- ααααααααααααααΎααααΈααααα
- ααα½αααΆααα·ααααααααΈ SQL Server
- ααΆαααα·αααααααα αααα»α Vertica
- αααααΌααααα·αα·
ααΌα
αααα ααΎααααΈβαααααα
βααΆαβααΆααβα’ααβααα αααα»αβααΆαβααααΎβααΆαβααααααβαα½αβα
ααα½αβααΌα
βαα
βααΉαβααααβααΎα docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
αα ααΈαααααΎαααΎαα
- Vertica ααΆααα
αΆαααααα
dwh
ααΆαα½αααΉαααΆααααααααααΆαααΎααααα»α - α§ααΆα αααααΈαα SQL Server,
- ααΎααααααααΌαααααΆααα·αααααααα
ααααααααααΆαα½αααΉααα·αααααααα½αα
ααα½α (αααα»αααααΈααΆααααααα»ααα·αα·αααααΎα
mssql_init.py
!)
ααΎαα αΆααααααΎαααααΎαααΆαααα’ααΆααα’αα αααααΆααααα½αααΈααΆααααααααΆααααααα»αααααΆαααΆαααααα»αααααα·α α
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
α’αααΈαααα’ααααΌαα ααα»α
αααααααααααΎαααΆααααααΎα α’αααα’αΆα
ααααΎααΆαα» Data Profiling/Ad Hoc Query
:
ααΏαααααΆααααΊαα·αααααΌααααα αΆαααΆαααα’ααααα·ααΆαααα
αααα’α·ααα ααΎ αααα ETL αααα»αααΉααα·αα’αΈαα α’αααΈααααααααΆαααΊααΌα ααΆα αα ααΈαααα ααΎααααααΎαααΌαααααΆααα½α ααΆααααααΆαα½ααα αααα»αααΆ ααΎααα»αα’αααΈααααααααΆαααΆαα½αα’ααααααααααααααα·αα α αΎαα₯α‘αΌααααααΎαααααΎααΌα αααα
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
αααααααΆβααΆαβαααααβα αΎα αααααΌααα·ααααααααααααΎαα ααΈαα»αα½ααααααααααααΎαα α αΌαααααΎααΌα ααααααααΆααααα½αααΈαααααΆααααααα·ααα½αα±ααααΏαααα»α:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- αααααΆααααα½αααΈααααααα½αααΎαααα½αααΆαααΈ Airflow
pymssql
- ααααΆαα - α αΌααααα½αααΆαααΉααααααΉααααα»αααααααααααΆαααα·α αααααα αααα»αααααΎ - ααΆααΉαααααΌαααΆααααα αΌααα αααα»ααα»αααΆαααααααΆαααΈαααααΌα
- α
α·ααα
αΉαααΆαααααΎααααααΎαα
pandas
ααΎα’αααααΆααΉαααα½αααΆαααΎαDataFrame
- ααΆααΉαααΆααααααααααααααΆααααΎαααΆαααα’ααΆααα
αααα»ααααα»αααααΎααΆααααα½α
{dt}
αααα½αα±αααααΆαααΆααααααααααΎαα»α%s
αα·αααααααααΆααααααα»αααΆ Pinocchio α’αΆαααααααααα ααα»αααααααααΆαααpandas
αα·αα’αΆα αααααααΆαααΆααααpymssql
α αΎααα’α·αα α»ααααααparams: List
αααααΈααΆααΆαααα·αααΆα ααααΆαααααααtuple
.
α αααΆααααααααΆα’αααα’αα·ααααααpymssql
αααααα α α·ααααα·αααΆααααααΆααααααα α αΎαααααααααααΌαααΎα ααα αΎααpyodbc
.
αααααΎαα’αααΈααα Airflow αααα αΌαα’αΆαα»ααααααααα»αααΆαααααααΎαααΆαα½αα
ααααα·αααΎααααΆααα·αααααααα αααααααΆαα ααα»α α’αααΈαααα»αααΆαααααααα ααα»ααααααΆααα αααααααααααα»αααΆααα·α αΆαααΆααΆααΆααααααααα½αααΆαααααααα ααα»ααααααααα·ααααααΆααα α»αααα α’αΆ-α’αΆ-α’αΆ ααααΎα’αΈ?! α αΎααααααΆα’αααΈα
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
ααΉαααααΆαα Airflow ααΆαα·αααΆαααα α»ααα ααα»ααααααΎααααααα·α
αα
ααΆαα α
ααα»α
αααααΆααααΉααα·αααΆαααΆααααααααααα¬αααα αααααα»αααααααααααΆααΌαα
αααααααα·ααααααααααααΎαα αα½αααα αααΎαα:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
αααααα
- ααΌαααααΆααα·αααααααααααΎαααΆααααααΆαα·α
- ααααααααΆααααααααααΉααααααααααααΎα (ααΆααΉααα»αααααΆ αααααΆααααΆαααα·α αα ααΆα),
- ααααααααΆααααΈααααα αα·αααααΆααααααααααΆαα - ααΌα αααααα αααα»αααΌαααααΆααα·ααααααα α»αααααα (αααα’αααΈαααααΌαααΆαα αΆααα αΌααα αααα»αααΆααΆααααα½α) ααΎαααΆαααααααααΆααααααΆαααααα½ααααα
ααα αΆαα α»αααααααα ααααΆα: α αΆααα’αααΈααααααααΆααα αααα»α Vertica α α αΎααα½αα±ααα αααααααΆαα αα·ααΈααα’ααα αΆααα αα·αααΆαααααα·αααααΆααααα»ααααα»αααΆαααααΎαααααΊααΆαααα CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- ααΎααααα»ααααααΎαα’αααααα½ααα·ααα
StringIO
. pandas
ααΉαααΆααααααααΎααDataFrame
αα αααα»ααααα»ααααααCSV
- αααααΆααα- αααααΎαααΆααααααΆαααα Vertica ααααααααααααΎααααααααΎαααααα
- α αΎαα₯α‘αΌααααααΆαα½αααΉααααα½α
copy()
ααααΎαα·ααααααααααααΎααααααααΆαααα Vertika!
ααΎαβααΉαβααβααΈβα’αααβαααααΆβααΈβα ααα½αβαααααΆααβαααβααααΌαβααΆαβααααα α αΎαβααααΆααβα’αααβαααααβααααβααααβααΆβαααααβαααΆαβααΊβαα·αβα’αΈβααα
session.loaded_rows = cursor.rowcount
session.successful = True
α’ααα αΎαα
αα ααΎααΆαααα ααΎααααααΎαα αΆαααααα αααααα αα ααΈααααααα»αααΆαα’αα»ααααΆαα±αααααα½ααααα»αααΌααααΆαααΈαααΌα αα½α:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
αααα»αβαααα»αβααααΎ
VerticaOperator()
αααα»ααααααΎααααααααΆαααααΌαααααΆααα·αααααα αα·αααΆααΆα (ααααα·αααΎαα½ααααα·αααΆααααΆααα αα·αααΆαα)α ααΏαα αααααΊαααα αα±ααααΆαααααΉαααααΌαααΌαααΆαα’αΆααααα:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
αααααα
- αααα αΎα - ααΌαααααα»αααΌα ααΆααα·ααΆαααΆ - αααααα₯α‘αΌαααα
ααΎα’αααααΏααΆαααα»αααΆααααααααα½αα±ααααααΆα αααα»ααα αααα»ααααα?
Julia Donaldson, The Gruffalo
αααα»ααα·αααΆααααα·αααΎαα αααα·ααααααααα»α αα·ααααα»αααΆαααΆαααααα½ααααααααα½αα ααΎα’αααααΆααΉααααααΎα αα·αα αΆααααααΎαααααΎαααΆα ETL αααΆαααΆαααα ααααΈααααΌαα αα½αααααΆαα½αααΉα SSIS αα·αααααα»ααααααα½ααα αα·ααααα»αααΆαα½αααΉα Airflow ... α αΎααααααΆααααααΎαααααΉαααααααααααΆαααΆααααα½αααααΆαααααΆαααααα ... α’αΈαααΆ αααα»αβαα·αβααΆβα’αααβααΉαβαααααααβααΆβαααα»αβααΉαβααΆαβαα½αβααβαα βααΎβαα»αβααΆααβα’αα!
ααααα·αααΎαααααααααααΆααααααααα·α ααα Apache Airflow - ααααααααΆα’αααΈααααΎαααΆααααα»αααααααααΆααΌααααααα·ααΈ - ααΆαααααΎααΆαααΆααααααααα»α α αααΎα ααΆααααααΆαααΆαα»αααΆα αα·αααΈαααΆαα
ααΆααααααΈαααααΆαααααααααααααααΆ ααΆαααα αααα»ααααααααααααααααα·ααΈαααα½α αα·αααααααα ααααΆαααααΎααΆαααααααΆα αααααα±ααα’αααααΌαα±ααΆαααΎααααΈααααΎααααΆαα Airflow αα ααααΎαααααααααααααα ααΌααααΈαααα αααα»αααααααααααααααΆααααααΌα αααα α αα·αααααΎαααΆααα·αααααα ααΌααααΈαααα αααα»αααΆαααΆαααααα ααααα»ααααα (αα ααΆααααααααα’ααααΆα αα αααααα·ααααΆ) α
αααααα α»αααααα α―αααΆαααα αα·αααααααΆα
αα»ααα½α αααααΎααααααΌαααΆααααααΆααα’ααα
start_date
. ααΆα αααβααΆ meme αααα»ααααα»αβαα½α βα αΎαα ααΆααααα’αΆαα»αααααα αααααααα Dougstart_date
ααααααΆααααΆααα’ααα ααααααααααα·αααΎα’ααααααααΆαααα αααα»αstart_date
ααΆαααα·α αααααα αα α»αααααα αα·αschedule_interval
- αααααα½ααααααΆαααα DAG ααΉαα αΆααααααΎααα ααααααα’αααα·ααα»ααααstart_date = datetime(2020, 7, 7, 0, 1, 2)
α αΎααααααΆααααα αΆαααα αΎαα
ααΆαβααα α»αβαααβαααβαα½αβαααβαααβααΆααααβααΉαβααΆα
Task is missing the start_date parameter
αααααΆαα αααΎααααα αΆαααΆα’αααααααα α αααααΆααααΆαα½αααααα·ααααα·αα dag α- ααΆααα’αααα ααΎαααΆαααΈααααα½αα ααΆα/α αΆα αα·αααΌαααααΆα (Airflow αααα½αααΆααααΆαα αα·αααααΆαααΌαααααααΎα) αα·ααααΆαααΈαααααα ααααα αα·αα’ααααααααααα αα·αααααααα α αΎαααΆαααααΆααααααΎαααΆααααααα ααα»ααααααΌα α αα α ααα½αααααΆααα·α αα αααααΆααααααΆααααααΆαααΎαα‘αΎαα αΎααα αααααα PostgreSQL α αΆααααααΎαααααΎααααα ααΉαααααααααααααα»ααααααα 20 αα·ααΆααΈαααα½αα±αα 5 ms ααΎαααΆαααααΆαα ααααΆαα
- ααααα·ααααα·αααααα»ααααα»αα ααΆα ααΎαβαα βααβα’αααα»αβααΎβααΆ α αΎαβααΎαβααΆαβααβαααβααΆααβαααα βα αΎαα LocalExecutor ααΊαααααααααΆαααααααΆααααΎααα αΌαααααααααααα ααα»ααααα₯α‘αΌααααααΆαααααααααααααΌααααααΈαααΆαα½ααα»ααααα·ααααΆαα αα ααΆααααααΆαα α αΎαααΎαααΉαααααΌααα·αααααααΉααααααααΎααααΈααααΆααααΈαα CeleryExecutor α α αΎααα αααα»ααα·αααααΆαααααΆααα·ααααααΆα’αααα’αΆα ααααΎααΆαααΆαα½αααΆαα ααΎαααΆαααΈααααα½αααααΆαα’αααΈααΆααΆααα’αααααΈααΆαααααΎααααΆαα Celery ααΌααααΈαααα ααΎαααΆαααΈαααααα "ααΆααΆααα·αααΆααααΉααα·αα αΌααα αααα»αααΆαααα·αααααααααααααα!"
- αα·αααααΎ α§ααααααααααααΆααααααΆαα½α:
- ααΆααααααΆαα ααΎααααΈαααααΆαα»αααααααΆααααααΆααααααΆαααα,
- SLA ααΉα ααΎααααΈααααΎααααα ααΉααα·α αα ααΆαααααα·αααΆααααααα ααΆαααααααααΆα
- xcom αααααΆααααΆαααααΆααααααΌααα·ααααααααααΆ (αααα»αααΆααα·ααΆα ααααΆαα·αααααα!) αααΆαααΆαααΆα dag α
- ααΆαααααΆααααα»αααα α’ααα αΉαααΎαααα»αα’αΆα αα·ααΆαα’αααΈααΆα? ααΆαααΌαβααααΉαβααααΌαβααΆαβαααααΎαβα‘αΎαβαααααΆααβααΆαβααααΎβαααααβααβαα·α αα ααΆαβαααβααΆαβααααΆααβα α»αα α₯α‘αΌαααα Gmail ααΆαααΆααααααααα»αααΆαα’ααΈααα > 90k ααΈ Airflow α αΎα web mail muzzle ααα·ααααα·αααα½ααα αα·ααα»αα αααΎαααΆα 100 αααα»αααααααα½αα
αααααααααΆααα αααΎααααα
Apache Airflow Pitfails
α§αααααααααααααααααα·ααααα αααΎαααα
ααΎααααΈα±ααααΎαααααΎααΆαααΆααααα αααΎααααααααΎααααΆαααααααΎα αα·αααααααααααααααΎα Airflow ααΆααααα ααααααΆααααΎαααΌαα ααα»α αααα
REST API - ααΆαααα ααααΆαααααΆαααΆααααααα·αααααααααα·αααΆααΆααααΆααααΈααΆαααΆαα ααΆαα½αααΆ α’ααααα·αααααΉαααα’αΆα ααα½αααΆαααααααΆαα’αααΈ dags αα·ααα·α αα ααΆαααα»ααααααα ααα»αααααααααΆαααααααα/α αΆααααααΎα dag αααααΎα DAG Run α¬α’αΆααCLI - α§αααααααΆα αααΎαα’αΆα ααααΆαααΆαααααααααΆααααΆααααααααΆααααα·αααααΉααααα·αααΆααααα½αααααΎααΆαααα WebUI ααα»ααααααα ααα»ααααααΆααΌαα ααΊα’ααααααΆαα α§ααΆα ααα:backfill
ααααΌαααΆαααΎααααΈα αΆααααααΎαα§ααΆα ααααα·α αα ααΆαα‘αΎααα·αα
ααΆα§ααΆα ααα α’ααααα·ααΆαααΆααα α αΎααα·ααΆαααΆα Β«α αΎααααα·ααα ααΆαααΆαααα ααα»αααααααα»ααα·ααααααα αΆααααΈααααααΈ 1 αααααααααΈ 13 ααααααΆ! αα½ααα»α αα αα½ααα»α αα½ααα»α! α αΎαα’αααααΊααΆα ααααα αααΌαα α·ααααααααα:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- ααααΆααΌαααααΆαα
initdb
,resetdb
,upgradedb
,checkdb
. run
αααα’αα»ααααΆαα±ααα’αααααααΎαααΆααα·α αα ααΆαα§ααΆα ααααα½α α αΎααααααΆααααΆαααα·αααα»ααΎααΆαα’αΆαααααααΆααα’ααα ααΎαααΈααααααα’αααα’αΆα ααααΎαααΆαααΆααΆααααLocalExecutor
αααααΈααΆα’αααααΆαα ααααα Celery αααααα- ααααΎααΏαααΌα
ααααΆα
test
ααΆααααα αααα»αααΌαααααΆααα·ααααααα’αααΈααΆααα’ααα connections
α’αα»ααααΆαα±αααααααΎαααΆααααααΆααααααααΈαααα
python api - αααααααΆααα·ααΆαααΆααααααααΆ αααααααΌαααΆαααααα»ααα»ααααααΆαααααααα·ααΈαααα½α αα·ααα·αααα½ααα αααα»αααΆααααααα·α αα½α α ααα»ααααβα’αααβααΆβααβααΆααΆααβααΎαβαα·αβα²ααβαα/home/airflow/dags
αααipython
α αΎαα αΆααααααΎααααααααα? α§ααΆα ααα α’αααα’αΆα ααΆαα ααααΆααααααΆααααΆααα’αααααααααΎααΌαααΆαααααααfrom airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- αααα»αααααΆαααα
ααΌαααααΆααα·ααααααααααΆααα αΌααααααα αααα»αβαα·αβααααΆαβα²ααβαααααβαα
βααΆβαα ααα»ααααβααΆαβααα½αβααΆαβααααΆαααΆαβαα·α
αα
ααΆαβαααααΆααβααααααβααΆααααΆααβααααααβα’αΆα
βααΏαβααΆαβαα·αβααΆααααα½αβααΆαβααΆαβααααΎ APIs ααΆαα½αα
α§αααΆααΆ αα·α αα ααΆαααΆααα’ααααααααΎααα·αα’ααααααΆαααα ααα»αααααααααααααΆα’αΆα ααααΆααα α»α α αΎααααααΆααΏαααααααΆα ααα»ααααααΆααααααα½αα ααα½αααΆαααΆααααααααα½α α αΎα α αΎαα αΆαααΆα αααααΌααα·αα·αααα
SQL αααααααα!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
ααα ααααΈααα
α αΎαααΆααΆααα·αααΆαα αααααααΆαααααααααΌαααΈααΆαα αααααα Google ααΊααΆααΆαα·ααΆαααα Airflow ααΈα αααΆααααααααα»αα
α―αααΆα Apache Airflow - ααΆααΆααα·αααΆααααΎαααααΌαα αΆααααααΎαααΆαα½αααΆαα·ααΆαααα α―αααΆα ααα»ααααα’αααααΆα’αΆαααΆαααααΆα?ααΆαα’αα»ααααααα’αααα»α - ααΆααΆααααααΎαααΆαα αααΆαα αα ααΆααα’αΆαααΆαααααΆαααΈα’ααααααααΎααUI ααα αΌαααααα - ααΆαα αΆααααααΎαααααΌα: α ααα»α αααααΆααα’αααααααΎαα αααα»αααΌαααΆαααααααααααΈααααααα·αααααΆααααααα Apache Airflow - αααα·αααΆααΌαααααΆαααααΌαααΆααα·αααααΆαααΆαααα’ααααα·αααΎ (ααααΆαα!) α’ααααα·ααααα’αααΈαα½αααΈαααα»αααααα»ααααα Tianlong β ααααα»αααααααααααΈααΈαααααααααΎααααΆαααΈαααααα αΌαααααα - ααααα»αααααααααααΈααααααΆααααΆαααα‘αΎα Airflow cluster αααααΎαααΆα Apache Airflow αα Lyft - ααααΎαααα’αααααααααα½αα±ααα αΆααα’αΆααααααααΌα ααααΆ ααΎαααααα αααα ααααΆααααΌαααΆα αα·αα§ααΆα ααααα·α ααΆααααααααα Apache Airflow α ααα αΆαααΆαααΆαααΎαααααα Celery - α’αααΈααΆαααααΎααΆαααΆαα½α Celery αDAG αααααααΆαα’αα»ααααααα’αααα»ααα αααα»α Apache Airflow - α’αααΈααΆαααααΆααααααααΆαααααΆααα·α αα ααΆααααα»ααααααααααααΆαααααα½αα±ααααΆαααα·α αααα ααΆαααααΆααααααΌα αα ααΆααααααααα―αααΆα αα·αα’αααΈααααα½αα±ααα αΆααα’αΆαααααααααααααααααΆααααααααααααΆαα’αΆααααααα αααα»α Apache Airflow - ααΆαα’αΆαααααααααΆααα·α αα αα·αα αααΆαααααααΉα ααααααα»αααΆαααΎαα‘αΎααααααα»αααΆαααααααΆααααα»ααααααααα αΌααααααα αα αααααα DAG ααααα’ααααα ααααΆαααΈααΆααα·ααΆα - ααααααααααααΎ "ααΆαααΆαααΌα αααα" αα½αα ααα½ααα αααα»ααααααα·ααΈαααααααα αααα»ααα·αααααααααααΆααααα αα·ααααααα’αΆαα·ααΆααα·α αα ααΆαααααα½α SQL ααΆααααααααααααααΆαα Apache Airflow - αααα½α SQL ααΆαααααααααα αααααα·ααααααααααΆααα αΌαααααααα αΆααααααΎααααααΎαααα αΌαααΆαααΆαααΆαα½α Apache Airflow - ααΆααααααααΆαααααααααα’αααΈααΆααααααΎαα§αααααα αΆαααααααΆααααΆαααααα½ααααΆααααΆα Fetchr Data Science Infra αα ααΎ AWS ααΆαα½α Presto αα·α Airflow - αααααα αααΆαααααΈαα½αα±ααα αΆααα’αΆααααααα’αααΈααΆααααΆαα αααααΆαα ααΆαααααααααα ααΎ AWS αααααΆαααα·ααααΆααΆααααααα·αααααααααα α»αααΌαα α ααα½α 7 ααΎααααΈαα·αα·αααααΎααα αααααααΆααααα α»α DAGs ααα αΌαααααα - ααα α»αααΌαα (αα ααααααααααΆααααΆαααα αααα·αα’αΆαααΆαααααΆα) ααααααΆαα»α αα·αα αΌαααααΎααΆααααααααΆαααααααααΎ Apache Airflow - αααΉαααΈαααααααααα»αααα αΆααααααααααααΆαα αααααΈα’αααααααΆααααα’αΆα ααααΎ Connections ααααααZen αα Python αα·α Apache Airflow - ααΆααααααΌααααα DAG ααααααααα ααΆααααα ααααα·αααα αααα»ααα»αααΆα ααΆααααΈαααααααα’αααΈααΆαα’αΆααααα αα·αα’αααΈααΆαααααααΆαα αΆααααααΎααα·α αα ααΆααααα αΌααααααα αααααΉαααααΈα αααα·α αα·αααΆαα’αα»ααααααα’αααα»α - α’αααΈααΆαααααΎααααΆααdefault arguments
ΠΈparams
αα αααα»αααααΌ ααααΌα ααΆα’ααα αα·αααΆααααααΆααααααααβαααααβαααααα·ααΈβαααααβαααβααα αΌαβααααα - ααΏαα’αααΈαααααααα’ααααααα ααααααΆααααα»ααααα ααααααΆαα Airflow 2.0 αApache Airflow ααΆαα½ααα»ααααα·α Celery 3 ααΆαααααα»α docker-compose - α’αααααα α½αααααααααα·α α’αααΈααΆααααααΆαα αααααααααααΎααα αααα»αdocker-compose
.4 ααΆαααΆαααααΌαααααααΎααα·ααααα αΌαααααα - αα·α αα ααΆαααΆααααααααααααΎααααΌ αα·αααΆααααααΌαααααααα·αααααΆαααΌαααααΉαα’αααΈαααα αΆαα αααα»αααα αΌαααααα - ααΆαααΌαααααΉαααΆααααααααΆα αα·αααααΆαααααα½αααΆααααα»ααα αα·α Slack ααα·ααααΆααΆααΆααα αΌααααααα DAGs αααα»αααααΆααααααααΆαααΎα αααα - αα·α αα ααΆαααΆααΆ αααΆααααΌ αα·α XCom α
αα·ααααααααΆαααααααααΎαααα»αα’αααααα
ααα ααααΈααααααΆααααΌ - ααααααααΆααααΆααααααΆααααααΎαααα»αααααΌαα§αααααααΌαα - ααα αΌαααααα - ααα α»αααΌαα αα ααααααααΎα dags αpuckel/docker-airflowα Docker Apache Airflow -docker-compose
αααααΆααααΆααα·ααααα ααΆαααααΆααααα α»α αα·αα αααΎαααααpython-telegram-bot/python-telegram-botα ααΎαβααΆαβαααααΎαβα’αααβααΆβα’αααβαα»αβαααβα’αααβαα·αβα’αΆα βααα·ααα - Python wrapper αααααΆαα Telegram REST API α
ααααα: www.habr.com