ΠΡΠΈΠ²Π΅Ρ, Ρ ΠΠΌΠΈΡΡΠΈΠΉ ΠΠΎΠ³Π²ΠΈΠ½Π΅Π½ΠΊΠΎ β Data Engineer ΠΎΡΠ΄Π΅Π»Π° Π°Π½Π°Π»ΠΈΡΠΈΠΊΠΈ Π³ΡΡΠΏΠΏΡ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΠΉ Β«ΠΠ΅Π·ΡΡΒ».
Π― ΡΠ°ΡΡΠΊΠ°ΠΆΡ Π²Π°ΠΌ ΠΎ Π·Π°ΠΌΠ΅ΡΠ°ΡΠ΅Π»ΡΠ½ΠΎΠΌ ΠΈΠ½ΡΡΡΡΠΌΠ΅Π½ΡΠ΅ Π΄Π»Ρ ΡΠ°Π·ΡΠ°Π±ΠΎΡΠΊΠΈ ETL-ΠΏΡΠΎΡΠ΅ΡΡΠΎΠ² β Apache Airflow. ΠΠΎ Airflow Π½Π°ΡΡΠΎΠ»ΡΠΊΠΎ ΡΠ½ΠΈΠ²Π΅ΡΡΠ°Π»Π΅Π½ ΠΈ ΠΌΠ½ΠΎΠ³ΠΎΠ³ΡΠ°Π½Π΅Π½, ΡΡΠΎ Π²Π°ΠΌ ΡΡΠΎΠΈΡ ΠΏΡΠΈΡΠΌΠΎΡΡΠ΅ΡΡΡΡ ΠΊ Π½Π΅ΠΌΡ Π΄Π°ΠΆΠ΅ Π΅ΡΠ»ΠΈ Π²Ρ Π½Π΅ Π·Π°Π½ΠΈΠΌΠ°Π΅ΡΠ΅ΡΡ ΠΏΠΎΡΠΎΠΊΠ°ΠΌΠΈ Π΄Π°Π½Π½ΡΡ , Π° ΠΈΠΌΠ΅Π΅ΡΠ΅ ΠΏΠΎΡΡΠ΅Π±Π½ΠΎΡΡΡ ΠΏΠ΅ΡΠΈΠΎΠ΄ΠΈΡΠ΅ΡΠΊΠΈ Π·Π°ΠΏΡΡΠΊΠ°ΡΡ ΠΊΠ°ΠΊΠΈΠ΅-Π»ΠΈΠ±ΠΎ ΠΏΡΠΎΡΠ΅ΡΡΡ ΠΈ ΡΠ»Π΅Π΄ΠΈΡΡ Π·Π° ΠΈΡ Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ΠΌ.
Π Π΄Π°, Ρ Π±ΡΠ΄Ρ Π½Π΅ ΡΠΎΠ»ΡΠΊΠΎ ΡΠ°ΡΡΠΊΠ°Π·ΡΠ²Π°ΡΡ, Π½ΠΎ ΠΈ ΠΏΠΎΠΊΠ°Π·ΡΠ²Π°ΡΡ: Π² ΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΠ΅ ΠΌΠ½ΠΎΠ³ΠΎ ΠΊΠΎΠ΄Π°, ΡΠΊΡΠΈΠ½ΡΠΎΡΠΎΠ² ΠΈ ΡΠ΅ΠΊΠΎΠΌΠ΅Π½Π΄Π°ΡΠΈΠΉ.
Π§ΡΠΎ ΠΎΠ±ΡΡΠ½ΠΎ Π²ΠΈΠ΄ΠΈΡΡ, ΠΊΠΎΠ³Π΄Π° Π³ΡΠ³Π»ΠΈΡΡ ΡΠ»ΠΎΠ²ΠΎ Airflow / Wikimedia Commons
ΠΠ³Π»Π°Π²Π»Π΅Π½ΠΈΠ΅
ΠΠ²Π΅Π΄Π΅Π½ΠΈΠ΅ Π§Π°ΡΡΡ ΠΎΡΠ½ΠΎΠ²Π½Π°Ρ, ΠΏΡΠ°ΠΊΡΠΈΡΠ΅ΡΠΊΠ°Ρ (ΠΈ Π½Π΅ΠΌΠ½ΠΎΠ³ΠΎ ΡΠ΅ΠΎΡΠ΅ΡΠΈΡΠ΅ΡΠΊΠ°Ρ) ΠΠ°ΡΠ΅ΠΌ ΠΎΠ½ΠΎ Π½Π°ΠΌ (ΠΈ Π²Π°ΠΌ) Π‘ΠΎΠ±ΠΈΡΠ°Π΅ΠΌ ΠΊΠ»Π°ΡΡΠ΅Ρ ΠΡΠ½ΠΎΠ²Π½ΡΠ΅ ΠΏΠΎΠ½ΡΡΠΈΡ ΠΠ΅Π½Π΅ΡΠΈΡΡΠ΅ΠΌ ΡΠ°ΡΠΊΠΈ ΠΠ΅ΠΌΠ½ΠΎΠ³ΠΎ ΠΎ Flower ΠΠΎΠ³ΡΡΠΆΠ°Π΅ΠΌ Π½Π΅Π΄ΠΎΠ³ΡΡΠΆΠ΅Π½Π½ΠΎΠ΅ Π‘ΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ, Ρ ΡΠΊΠΈ ΠΈ ΠΏΡΠΎΡΠΈΠ΅ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΠ΅ Π Π°Π·Π±ΠΈΡΠ°Π΅ΠΌ ΠΊΠ°ΡΡΠΎΠΌΠ½ΡΠΉ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ ΠΠ°Π»ΠΈΠ²Π°ΡΡ-ΡΠΎ Π±ΡΠ΄Π΅ΡΡ? ΠΠΎΠ΄Π²ΠΎΠ΄ΠΈΠΌ ΠΈΡΠΎΠ³ΠΈ
Π§Π°ΡΡΡ Π·Π°ΠΊΠ»ΡΡΠΈΡΠ΅Π»ΡΠ½Π°Ρ, ΡΠΏΡΠ°Π²ΠΎΡΠ½ΠΎ-ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΠΎΠ½Π½Π°Ρ Π‘ΡΡΠ»ΠΊΠΈ
ΠΠ²Π΅Π΄Π΅Π½ΠΈΠ΅
Apache Airflow β ΠΎΠ½ ΠΏΡΡΠΌΠΎ ΠΊΠ°ΠΊ Django:
- Π½Π°ΠΏΠΈΡΠ°Π½ Π½Π° Python,
- Π΅ΡΡΡ ΠΎΡΠ»ΠΈΡΠ½Π°Ρ Π°Π΄ΠΌΠΈΠ½ΠΊΠ°,
- Π½Π΅ΠΎΠ³ΡΠ°Π½ΠΈΡΠ΅Π½Π½ΠΎ ΡΠ°ΡΡΠΈΡΡΠ΅ΠΌ,
β ΡΠΎΠ»ΡΠΊΠΎ Π»ΡΡΡΠ΅, Π΄Π° ΠΈ ΡΠ΄Π΅Π»Π°Π½ ΡΠΎΠ²ΡΠ΅ΠΌ Π΄Π»Ρ Π΄ΡΡΠ³ΠΈΡ ΡΠ΅Π»Π΅ΠΉ, Π° ΠΈΠΌΠ΅Π½Π½ΠΎ (ΠΊΠ°ΠΊ Π½Π°ΠΏΠΈΡΠ°Π½ΠΎ Π΄ΠΎ ΠΊΠ°ΡΠ°):
- Π·Π°ΠΏΡΡΠΊ ΠΈ ΠΌΠΎΠ½ΠΈΡΠΎΡΠΈΠ½Π³ Π·Π°Π΄Π°Ρ Π½Π° Π½Π΅ΠΎΠ³ΡΠ°Π½ΠΈΡΠ΅Π½Π½ΠΎΠΌ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²Π΅ ΠΌΠ°ΡΠΈΠ½ (ΡΠΊΠΎΠ»ΡΠΊΠΎ Π²Π°ΠΌ ΠΏΠΎΠ·Π²ΠΎΠ»ΠΈΡ Celery/Kubernetes ΠΈ Π²Π°ΡΠ° ΡΠΎΠ²Π΅ΡΡΡ)
- Ρ Π΄ΠΈΠ½Π°ΠΌΠΈΡΠ΅ΡΠΊΠΎΠΉ Π³Π΅Π½Π΅ΡΠ°ΡΠΈΠ΅ΠΉ workflow ΠΈΠ· ΠΎΡΠ΅Π½Ρ Π»Π΅Π³ΠΊΠΎΠ³ΠΎ Π΄Π»Ρ Π½Π°ΠΏΠΈΡΠ°Π½ΠΈΡ ΠΈ Π²ΠΎΡΠΏΡΠΈΡΡΠΈΡ Python-ΠΊΠΎΠ΄Π°
- ΠΈ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡΡ ΡΠ²ΡΠ·ΡΠ²Π°ΡΡ Π΄ΡΡΠ³ Ρ Π΄ΡΡΠ³ Π»ΡΠ±ΡΠ΅ Π±Π°Π·Ρ Π΄Π°Π½Π½ΡΡ ΠΈ API Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΠΊΠ°ΠΊ Π³ΠΎΡΠΎΠ²ΡΡ ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½ΡΠΎΠ², ΡΠ°ΠΊ ΠΈ ΡΠ°ΠΌΠΎΠ΄Π΅Π»ΡΠ½ΡΡ ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ² (ΡΡΠΎ Π΄Π΅Π»Π°Π΅ΡΡΡ ΡΡΠ΅Π·Π²ΡΡΠ°ΠΉΠ½ΠΎ ΠΏΡΠΎΡΡΠΎ).
ΠΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ Apache Airflow ΡΠ°ΠΊ:
- ΡΠΎΠ±ΠΈΡΠ°Π΅ΠΌ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΡΠ°Π·Π»ΠΈΡΠ½ΡΡ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠΎΠ² (ΠΌΠ½ΠΎΠΆΠ΅ΡΡΠ²ΠΎ ΠΈΠ½ΡΡΠ°Π½ΡΠΎΠ² SQL Server ΠΈ PostgreSQL, ΡΠ°Π·Π»ΠΈΡΠ½ΡΠ΅ API Ρ ΠΌΠ΅ΡΡΠΈΠΊΠ°ΠΌΠΈ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ, Π΄Π°ΠΆΠ΅ 1Π‘) Π² DWH ΠΈ ODS (Ρ Π½Π°Ρ ΡΡΠΎ Vertica ΠΈ Clickhouse).
- ΠΊΠ°ΠΊ ΠΏΡΠΎΠ΄Π²ΠΈΠ½ΡΡΡΠΉ
cron
, ΠΊΠΎΡΠΎΡΡΠΉ Π·Π°ΠΏΡΡΠΊΠ°Π΅Ρ ΠΏΡΠΎΡΠ΅ΡΡΡ ΠΊΠΎΠ½ΡΠΎΠ»ΠΈΠ΄Π°ΡΠΈΠΈ Π΄Π°Π½Π½ΡΡ Π½Π° ODS, Π° ΡΠ°ΠΊΠΆΠ΅ ΡΠ»Π΅Π΄ΠΈΡ Π·Π° ΠΈΡ ΠΎΠ±ΡΠ»ΡΠΆΠΈΠ²Π°Π½ΠΈΠ΅ΠΌ.
ΠΠΎ Π½Π΅Π΄Π°Π²Π½Π΅Π³ΠΎ Π²ΡΠ΅ΠΌΠ΅Π½ΠΈ Π½Π°ΡΠΈ ΠΏΠΎΡΡΠ΅Π±Π½ΠΎΡΡΠΈ ΠΏΠΎΠΊΡΡΠ²Π°Π» ΠΎΠ΄ΠΈΠ½ Π½Π΅Π±ΠΎΠ»ΡΡΠΎΠΉ ΡΠ΅ΡΠ²Π΅Ρ Π½Π° 32 ΡΠ΄ΡΠ°Ρ ΠΈ 50 GB ΠΎΠΏΠ΅ΡΠ°ΡΠΈΠ²ΠΊΠΈ. Π Airflow ΠΏΡΠΈ ΡΡΠΎΠΌ ΡΠ°Π±ΠΎΡΠ°Π΅Ρ:
- Π±ΠΎΠ»Π΅Π΅ 200 Π΄Π°Π³ΠΎΠ² (ΡΠΎΠ±ΡΡΠ²Π΅Π½Π½ΠΎ workflows, Π² ΠΊΠΎΡΠΎΡΡΠ΅ ΠΌΡ Π½Π°Π±ΠΈΠ»ΠΈ Π·Π°Π΄Π°ΡΠΊΠΈ),
- Π² ΠΊΠ°ΠΆΠ΄ΠΎΠΌ Π² ΡΡΠ΅Π΄Π½Π΅ΠΌ ΠΏΠΎ 70 ΡΠ°ΡΠΊΠΎΠ²,
- Π·Π°ΠΏΡΡΠΊΠ°Π΅ΡΡΡ ΡΡΠΎ Π΄ΠΎΠ±ΡΠΎ (ΡΠΎΠΆΠ΅ Π² ΡΡΠ΅Π΄Π½Π΅ΠΌ) ΡΠ°Π· Π² ΡΠ°Ρ.
Π ΠΎ ΡΠΎΠΌ, ΠΊΠ°ΠΊ ΠΌΡ ΡΠ°ΡΡΠΈΡΡΠ»ΠΈΡΡ, Ρ Π½Π°ΠΏΠΈΡΡ Π½ΠΈΠΆΠ΅, Π° ΡΠ΅ΠΉΡΠ°Ρ Π΄Π°Π²Π°ΠΉΡΠ΅ ΠΎΠΏΡΠ΅Π΄Π΅Π»ΠΈΠΌ ΓΌber-Π·Π°Π΄Π°ΡΡ, ΠΊΠΎΡΠΎΡΡΡ ΠΌΡ Π±ΡΠ΄Π΅ΠΌ ΡΠ΅ΡΠ°ΡΡ:
ΠΡΡΡ ΡΡΠΈ ΠΈΡΡ ΠΎΠ΄Π½ΡΡ SQL ServerβΠ°, Π½Π° ΠΊΠ°ΠΆΠ΄ΠΎΠΌ ΠΏΠΎ 50 Π±Π°Π· Π΄Π°Π½Π½ΡΡ β ΠΈΠ½ΡΡΠ°Π½ΡΠΎΠ² ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΡΠΎΠ΅ΠΊΡΠ°, ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²Π΅Π½Π½ΠΎ, ΡΡΡΡΠΊΡΡΡΠ° Ρ Π½ΠΈΡ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²Π°Ρ (ΠΏΠΎΡΡΠΈ Π²Π΅Π·Π΄Π΅, ΠΌΡΠ°-Ρ Π°-Ρ Π°), Π° Π·Π½Π°ΡΠΈΡ Π² ΠΊΠ°ΠΆΠ΄ΠΎΠΉ Π΅ΡΡΡ ΡΠ°Π±Π»ΠΈΡΠ° Orders (Π±Π»Π°Π³ΠΎ ΡΠ°Π±Π»ΠΈΡΡ Ρ ΡΠ°ΠΊΠΈΠΌ Π½Π°Π·Π²Π°Π½ΠΈΠ΅ΠΌ ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΡΠΎΠ»ΠΊΠ°ΡΡ Π² Π»ΡΠ±ΠΎΠΉ Π±ΠΈΠ·Π½Π΅Ρ). ΠΡ Π·Π°Π±ΠΈΡΠ°Π΅ΠΌ Π΄Π°Π½Π½ΡΠ΅, Π΄ΠΎΠ±Π°Π²Π»ΡΡ ΡΠ»ΡΠΆΠ΅Π±Π½ΡΠ΅ ΠΏΠΎΠ»Ρ (ΡΠ΅ΡΠ²Π΅Ρ-ΠΈΡΡΠΎΡΠ½ΠΈΠΊ, Π±Π°Π·Π°-ΠΈΡΡΠΎΡΠ½ΠΈΠΊ, ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ETL-Π·Π°Π΄Π°ΡΠΈ) ΠΈ Π½Π°ΠΈΠ²Π½ΡΠΌ ΠΎΠ±ΡΠ°Π·ΠΎΠΌ Π±ΡΠΎΡΠΈΠΌ ΠΈΡ Π², ΡΠΊΠ°ΠΆΠ΅ΠΌ, Vertica.
ΠΠΎΠ΅Ρ Π°Π»ΠΈ!
Π§Π°ΡΡΡ ΠΎΡΠ½ΠΎΠ²Π½Π°Ρ, ΠΏΡΠ°ΠΊΡΠΈΡΠ΅ΡΠΊΠ°Ρ (ΠΈ Π½Π΅ΠΌΠ½ΠΎΠ³ΠΎ ΡΠ΅ΠΎΡΠ΅ΡΠΈΡΠ΅ΡΠΊΠ°Ρ)
ΠΠ°ΡΠ΅ΠΌ ΠΎΠ½ΠΎ Π½Π°ΠΌ (ΠΈ Π²Π°ΠΌ)
ΠΠΎΠ³Π΄Π° Π΄Π΅ΡΠ΅Π²ΡΡ Π±ΡΠ»ΠΈ Π±ΠΎΠ»ΡΡΠΈΠΌΠΈ, Π° Ρ Π±ΡΠ» ΠΏΡΠΎΡΡΡΠΌ SQL
-ΡΠΈΠΊΠΎΠΌ Π² ΠΎΠ΄Π½ΠΎΠΌ ΡΠΎΡΡΠΈΠΉΡΠΊΠΎΠΌ ΡΠΈΡΠ΅ΠΉΠ»Π΅, ΠΌΡ ΡΠΏΠ°ΡΠΈΠ»ΠΈ ETL-ΠΏΡΠΎΡΠ΅ΡΡΡ aka ΠΏΠΎΡΠΎΠΊΠΈ Π΄Π°Π½Π½ΡΡ
Ρ ΠΏΠΎΠΌΠΎΡΡΡ Π΄Π²ΡΡ
Π΄ΠΎΡΡΡΠΏΠ½ΡΡ
Π½Π°ΠΌ ΡΡΠ΅Π΄ΡΡΠ²:
- Informatica Power Center β ΠΊΡΠ°ΠΉΠ½Π΅ ΡΠ°Π·Π²Π΅ΡΠΈΡΡΠ°Ρ ΡΠΈΡΡΠ΅ΠΌΠ°, ΡΡΠ΅Π·Π²ΡΡΠ°ΠΉΠ½ΠΎ ΠΏΡΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡΠ΅Π»ΡΠ½Π°Ρ, ΡΠΎ ΡΠ²ΠΎΠΈΠΌΠΈ ΠΆΠ΅Π»Π΅Π·ΠΊΠ°ΠΌΠΈ, ΡΠΎΠ±ΡΡΠ²Π΅Π½Π½ΡΠΌ Π²Π΅ΡΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ΠΌ. ΠΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π» Ρ Π΄Π°ΠΉ Π±ΠΎΠ³ 1% Π΅Ρ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΠ΅ΠΉ. ΠΠΎΡΠ΅ΠΌΡ? ΠΡ, Π²ΠΎ-ΠΏΠ΅ΡΠ²ΡΡ
, ΡΡΠΎΡ ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡ Π³Π΄Π΅-ΡΠΎ ΠΈΠ· Π½ΡΠ»Π΅Π²ΡΡ
ΠΏΡΠΈΡ
ΠΈΡΠ΅ΡΠΊΠΈ Π΄Π°Π²ΠΈΠ» Π½Π° Π½Π°Ρ. ΠΠΎ-Π²ΡΠΎΡΡΡ
, ΡΡΠ° ΡΡΡΠΊΠΎΠ²ΠΈΠ½Π° Π·Π°ΡΠΎΡΠ΅Π½Π° ΠΏΠΎΠ΄ ΡΡΠ΅Π·Π²ΡΡΠ°ΠΉΠ½ΠΎ Π½Π°Π²ΠΎΡΠΎΡΠ΅Π½Π½ΡΠ΅ ΠΏΡΠΎΡΠ΅ΡΡΡ, ΡΡΠΎΡΡΠ½ΠΎΠ΅ ΠΏΠ΅ΡΠ΅ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠ΅ ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½ΡΠΎΠ² ΠΈ Π΄ΡΡΠ³ΠΈΠ΅ ΠΎΡΠ΅Π½Ρ-Π²Π°ΠΆΠ½ΡΠ΅-ΡΠ½ΡΠ΅ΡΠΏΡΠ°ΠΉΠ·-ΡΠΈΡΠ΅ΡΠΊΠΈ. ΠΡΠΎ ΡΠΎ ΡΡΠΎ ΡΡΠΎΠΈΡ ΠΎΠ½Π°, ΠΊΠ°ΠΊ ΠΊΡΡΠ»ΠΎ Airbus A380/Π³ΠΎΠ΄, ΠΌΡ ΠΏΡΠΎΠΌΠΎΠ»ΡΠΈΠΌ.
ΠΡΡΠΎΡΠΎΠΆΠ½ΠΎ, ΡΠΊΡΠΈΠ½ΡΠΎΡ ΠΌΠΎΠΆΠ΅Ρ ΡΠ΄Π΅Π»Π°ΡΡ Π»ΡΠ΄ΡΠΌ ΠΌΠ»Π°Π΄ΡΠ΅ 30 Π½Π΅ΠΌΠ½ΠΎΠ³ΠΎ Π±ΠΎΠ»ΡΠ½ΠΎ
- SQL Server Integration Server β ΡΡΠΈΠΌ ΡΠΎΠ²Π°ΡΠΈΡΠ΅ΠΌ ΠΌΡ ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π»ΠΈΡΡ Π² ΡΠ²ΠΎΠΈΡ
Π²Π½ΡΡΡΠΈΠΏΡΠΎΠ΅ΠΊΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠ°Ρ
. ΠΡ Π° Π² ΡΠ°ΠΌΠΎΠΌ Π΄Π΅Π»Π΅: SQL Server ΠΌΡ ΡΠΆΠ΅ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ, ΠΈ Π½Π΅ ΡΠ·Π°ΡΡ Π΅Π³ΠΎ ETL-ΡΡΠ»Π·Ρ Π±ΡΠ»ΠΎ Π±Ρ ΠΊΠ°ΠΊ-ΡΠΎ Π½Π΅ΡΠ°Π·ΡΠΌΠ½ΠΎ. ΠΡΡ Π² Π½ΡΠΌ Π² Ρ
ΠΎΡΠΎΡΠΎ: ΠΈ ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡ ΠΊΡΠ°ΡΠΈΠ²ΡΠΉ, ΠΈ ΠΎΡΡΡΡΠΈΠΊΠΈ Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΡβ¦ ΠΠΎ Π½Π΅ Π·Π° ΡΡΠΎ ΠΌΡ Π»ΡΠ±ΠΈΠΌ ΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΠ½ΡΠ΅ ΠΏΡΠΎΠ΄ΡΠΊΡΡ, ΠΎΡ
Π½Π΅ Π·Π° ΡΡΠΎ. ΠΠ΅ΡΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°ΡΡ Π΅Π³ΠΎ
dtsx
(ΠΊΠΎΡΠΎΡΡΠΉ ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»ΡΠ΅Ρ ΡΠΎΠ±ΠΎΠΉ XML Ρ ΠΏΠ΅ΡΠ΅ΠΌΠ΅ΡΠΈΠ²Π°ΡΡΠΈΠΌΠΈΡΡ ΠΏΡΠΈ ΡΠΎΡ ΡΠ°Π½Π΅Π½ΠΈΠΈ Π½ΠΎΠ΄Π°ΠΌΠΈ) ΠΌΡ ΠΌΠΎΠΆΠ΅ΠΌ, Π° ΡΠΎΠ»ΠΊΡ? Π ΡΠ΄Π΅Π»Π°ΡΡ ΠΏΠ°ΠΊΠ΅Ρ ΡΠ°ΡΠΊΠΎΠ², ΠΊΠΎΡΠΎΡΡΠΉ ΠΏΠ΅ΡΠ΅ΡΠ°ΡΠΈΡ ΡΠΎΡΠ½Ρ ΡΠ°Π±Π»ΠΈΡ Ρ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΡΠ΅ΡΠ²Π΅ΡΠ° Π½Π° Π΄ΡΡΠ³ΠΎΠΉ? ΠΠ° ΡΡΠΎ ΡΠΎΡΠ½Ρ, Ρ Π²Π°Ρ ΠΎΡ Π΄Π²Π°Π΄ΡΠ°ΡΠΈ ΡΡΡΠΊ ΠΎΡΠ²Π°Π»ΠΈΡΡΡ ΡΠΊΠ°Π·Π°ΡΠ΅Π»ΡΠ½ΡΠΉ ΠΏΠ°Π»Π΅Ρ, ΡΡΠ»ΠΊΠ°ΡΡΠΈΠΉ ΠΏΠΎ ΠΌΡΡΠΈΠ½ΠΎΠΉ ΠΊΠ½ΠΎΠΏΠΊΠ΅. ΠΠΎ Π²ΡΠ³Π»ΡΠ΄ΠΈΡ ΠΎΠ½, ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½Π½ΠΎ, Π±ΠΎΠ»Π΅Π΅ ΠΌΠΎΠ΄Π½ΠΎ:
ΠΡ Π±Π΅Π·ΡΡΠ»ΠΎΠ²Π½ΠΎ ΠΈΡΠΊΠ°Π»ΠΈ Π²ΡΡ ΠΎΠ΄Ρ. ΠΠ΅Π»ΠΎ Π΄Π°ΠΆΠ΅ ΠΏΠΎΡΡΠΈ Π΄ΠΎΡΠ»ΠΎ Π΄ΠΎ ΡΠ°ΠΌΠΎΠΏΠΈΡΠ½ΠΎΠ³ΠΎ Π³Π΅Π½Π΅ΡΠ°ΡΠΎΡΠ° SSIS-ΠΏΠ°ΠΊΠ΅ΡΠΎΠ²…
β¦ Π° ΠΏΠΎΡΠΎΠΌ ΠΌΠ΅Π½Ρ Π½Π°ΡΠ»Π° Π½ΠΎΠ²Π°Ρ ΡΠ°Π±ΠΎΡΠ°. Π Π½Π° Π½Π΅ΠΉ ΠΌΠ΅Π½Ρ Π½Π°ΡΡΠΈΠ³ Apache Airflow.
ΠΠΎΠ³Π΄Π° Ρ ΡΠ·Π½Π°Π», ΡΡΠΎ ΠΎΠΏΠΈΡΠ°Π½ΠΈΡ ETL-ΠΏΡΠΎΡΠ΅ΡΡΠΎΠ² β ΡΡΠΎ ΠΏΡΠΎΡΡΠΎΠΉ Python-ΠΊΠΎΠ΄, Ρ ΡΠΎΠ»ΡΠΊΠΎ ΡΡΠΎ Π½Π΅ ΠΏΠ»ΡΡΠ°Π» ΠΎΡ ΡΠ°Π΄ΠΎΡΡΠΈ. ΠΠΎΡ ΡΠ°ΠΊ ΠΏΠΎΡΠΎΠΊΠΈ Π΄Π°Π½Π½ΡΡ ΠΏΠΎΠ΄Π²Π΅ΡΠ³Π»ΠΈΡΡ Π²Π΅ΡΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°Π½ΠΈΡ ΠΈ Π΄ΠΈΡΡΡ, Π° ΡΡΡΠΏΠ°ΡΡ ΡΠ°Π±Π»ΠΈΡΡ Ρ Π΅Π΄ΠΈΠ½ΠΎΠΉ ΡΡΡΡΠΊΡΡΡΠΎΠΉ ΠΈΠ· ΡΠΎΡΠ½ΠΈ Π±Π°Π· Π΄Π°Π½Π½ΡΡ Π² ΠΎΠ΄ΠΈΠ½ ΡΠ°ΡΠ³Π΅Ρ ΡΡΠ°Π»ΠΎ Π΄Π΅Π»ΠΎΠΌ Python-ΠΊΠΎΠ΄Π° Π² ΠΏΠΎΠ»ΡΠΎΡΠ°-Π΄Π²Π° 13β ΡΠΊΡΠ°Π½Π°.
Π‘ΠΎΠ±ΠΈΡΠ°Π΅ΠΌ ΠΊΠ»Π°ΡΡΠ΅Ρ
ΠΠ°Π²Π°ΠΉΡΠ΅ Π½Π΅ ΡΡΡΡΠ°ΠΈΠ²Π°ΡΡ ΡΠΎΠ²ΡΠ΅ΠΌ ΡΠΆ Π΄Π΅ΡΡΠΊΠΈΠΉ ΡΠ°Π΄, ΠΈ Π½Π΅ Π³ΠΎΠ²ΠΎΡΠΈΡΡ ΡΡΡ ΠΎ ΡΠΎΠ²Π΅ΡΡΠ΅Π½Π½ΠΎ ΠΎΡΠ΅Π²ΠΈΠ΄Π½ΡΡ Π²Π΅ΡΠ°Ρ , Π²ΡΠΎΠ΄Π΅ ΡΡΡΠ°Π½ΠΎΠ²ΠΊΠΈ Airflow, Π²ΡΠ±ΡΠ°Π½Π½ΠΎΠΉ Π²Π°ΠΌΠΈ ΠΠ, Celery ΠΈ Π΄ΡΡΠ³ΠΈΡ Π΄Π΅Π», ΠΎΠΏΠΈΡΠ°Π½Π½ΡΡ Π² Π΄ΠΎΠΊΠ°Ρ .
Π§ΡΠΎΠ±Ρ ΠΌΡ ΠΌΠΎΠ³Π»ΠΈ ΡΡΠ°Π·Ρ ΠΏΡΠΈΡΡΡΠΏΠΈΡΡ ΠΊ ΡΠΊΡΠΏΠ΅ΡΠΈΠΌΠ΅Π½ΡΠ°ΠΌ, Ρ Π½Π°Π±ΡΠΎΡΠ°Π» docker-compose.yml
Π² ΠΊΠΎΡΠΎΡΠΎΠΌ:
- ΠΠΎΠ΄Π½ΠΈΠΌΠ΅ΠΌ ΡΠΎΠ±ΡΡΠ²Π΅Π½Π½ΠΎ Airflow: Scheduler, Webserver. Π’Π°ΠΌ ΠΆΠ΅ Π±ΡΠ΄Π΅Ρ ΠΊΡΡΡΠΈΡΡΡ Flower Π΄Π»Ρ ΠΌΠΎΠ½ΠΈΡΠΎΡΠΈΠ½Π³Π° Celery-Π·Π°Π΄Π°Ρ (ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎ Π΅Π³ΠΎ ΡΠΆΠ΅ Π·Π°ΡΠΎΠ»ΠΊΠ°Π»ΠΈ Π²
apache/airflow:1.10.10-python3.7
, Π° ΠΌΡ ΠΈ Π½Π΅ ΠΏΡΠΎΡΠΈΠ²); - PostgreSQL, Π² ΠΊΠΎΡΠΎΡΡΠΉ Airflow Π±ΡΠ΄Π΅Ρ ΠΏΠΈΡΠ°ΡΡ ΡΠ²ΠΎΡ ΡΠ»ΡΠΆΠ΅Π±Π½ΡΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ (Π΄Π°Π½Π½ΡΠ΅ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊΠ°, ΡΡΠ°ΡΠΈΡΡΠΈΠΊΠ° Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΡ ΠΈ Ρ. Π΄.), Π° Celery β ΠΎΡΠΌΠ΅ΡΠ°ΡΡ Π·Π°Π²Π΅ΡΡΠ΅Π½Π½ΡΠ΅ ΡΠ°ΡΠΊΠΈ;
- Redis, ΠΊΠΎΡΠΎΡΡΠΉ Π±ΡΠ΄Π΅Ρ Π²ΡΡΡΡΠΏΠ°ΡΡ Π±ΡΠΎΠΊΠ΅ΡΠΎΠΌ Π·Π°Π΄Π°Ρ Π΄Π»Ρ Celery;
- Celery worker, ΠΊΠΎΡΠΎΡΡΠΉ ΠΈ Π·Π°ΠΉΠΌΠ΅ΡΡΡ Π½Π΅ΠΏΠΎΡΡΠ΅Π΄ΡΡΠ²Π΅Π½Π½ΡΠΌ Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ΠΌ Π·Π°Π΄Π°ΡΠ΅ΠΊ.
- Π ΠΏΠ°ΠΏΠΊΡ
./dags
ΠΌΡ Π±ΡΠ΄Π΅Ρ ΡΠΊΠ»Π°Π΄ΡΠ²Π°ΡΡ Π½Π°ΡΠΈ ΡΠ°ΠΉΠ»Ρ Ρ ΠΎΠΏΠΈΡΠ°Π½ΠΈΠ΅ΠΌ Π΄Π°Π³ΠΎΠ². ΠΠ½ΠΈ Π±ΡΠ΄ΡΡ ΠΏΠΎΠ΄Ρ Π²Π°ΡΡΠ²Π°ΡΡΡΡ Π½Π° Π»Π΅ΡΡ, ΠΏΠΎΡΡΠΎΠΌΡ ΠΏΠ΅ΡΠ΅Π΄ΡΡΠ³ΠΈΠ²Π°ΡΡ Π²Π΅ΡΡ ΡΡΠ΅ΠΊ ΠΏΠΎΡΠ»Π΅ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ ΡΠΈΡ Π° Π½Π΅ Π½ΡΠΆΠ½ΠΎ.
ΠΠΎΠ΅-Π³Π΄Π΅ ΠΊΠΎΠ΄ Π² ΠΏΡΠΈΠΌΠ΅ΡΠ°Ρ ΠΏΡΠΈΠ²Π΅Π΄Π΅Π½ Π½Π΅ ΠΏΠΎΠ»Π½ΠΎΡΡΡΡ (ΡΡΠΎΠ±Ρ Π½Π΅ Π·Π°Π³ΡΠΎΠΌΠΎΠΆΠ΄Π°ΡΡ ΡΠ΅ΠΊΡΡ), Π° Π³Π΄Π΅-ΡΠΎ ΠΎΠ½ ΠΌΠΎΠ΄ΠΈΡΠΈΡΠΈΡΡΠ΅ΡΡΡ Π² ΠΏΡΠΎΡΠ΅ΡΡΠ΅. Π¦Π΅Π»ΡΠ½ΡΠ΅ ΡΠ°Π±ΠΎΡΠ°ΡΡΠΈΠ΅ ΠΏΡΠΈΠΌΠ΅ΡΡ ΠΊΠΎΠ΄Π° ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΡΠΌΠΎΡΡΠ΅ΡΡ Π² ΡΠ΅ΠΏΠΎΠ·ΠΈΡΠΎΡΠΈΠΈ
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
ΠΡΠΈΠΌΠ΅ΡΠ°Π½ΠΈΡ:
- Π ΡΠ±ΠΎΡΠΊΠ΅ ΠΊΠΎΠΌΠΏΠΎΠ·Π° Ρ Π²ΠΎ ΠΌΠ½ΠΎΠ³ΠΎΠΌ ΠΎΠΏΠΈΡΠ°Π»ΡΡ Π½Π° ΠΈΠ·Π²Π΅ΡΡΠ½ΡΠΉ ΠΎΠ±ΡΠ°Π·
puckel/docker-airflow β ΠΎΠ±ΡΠ·Π°ΡΠ΅Π»ΡΠ½ΠΎ ΠΏΠΎΡΠΌΠΎΡΡΠΈΡΠ΅. ΠΠΎΠΆΠ΅Ρ, Π²Π°ΠΌ Π² ΠΆΠΈΠ·Π½ΠΈ Π±ΠΎΠ»ΡΡΠ΅ Π½ΠΈΡΠ΅Π³ΠΎ ΠΈ Π½Π΅ ΠΏΠΎΠ½Π°Π΄ΠΎΠ±ΠΈΡΡΡ. - ΠΡΠ΅ Π½Π°ΡΡΡΠΎΠΉΠΊΠΈ Airflow Π΄ΠΎΡΡΡΠΏΠ½Ρ Π½Π΅ ΡΠΎΠ»ΡΠΊΠΎ ΡΠ΅ΡΠ΅Π·
airflow.cfg
, Π½ΠΎ ΠΈ ΡΠ΅ΡΠ΅Π· ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΠ΅ ΡΡΠ΅Π΄Ρ (ΡΠ»Π°Π²Π° ΡΠ°Π·ΡΠ°Π±ΠΎΡΡΠΈΠΊΠ°ΠΌ), ΡΠ΅ΠΌ Ρ Π·Π»ΠΎΡΡΠ½ΠΎ Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π»ΡΡ. - ΠΡΡΠ΅ΡΡΠ²Π΅Π½Π½ΠΎ, ΠΎΠ½ Π½Π΅ production-ready: Ρ Π½Π°ΠΌΠ΅ΡΠ΅Π½Π½ΠΎ Π½Π΅ ΡΡΠ°Π²ΠΈΠ» heartbeats Π½Π° ΠΊΠΎΠ½ΡΠ΅ΠΉΠ½Π΅ΡΡ, Π½Π΅ Π·Π°ΠΌΠΎΡΠ°ΡΠΈΠ²Π°Π»ΡΡ Ρ Π±Π΅Π·ΠΎΠΏΠ°ΡΠ½ΠΎΡΡΡΡ. ΠΠΎ ΠΌΠΈΠ½ΠΈΠΌΡΠΌ, ΠΏΠΎΠ΄Ρ ΠΎΠ΄ΡΡΠΈΠΉ Π΄Π»Ρ Π½Π°ΡΠΈΡ ΡΠΊΡΠΏΠ΅ΡΠΈΠΌΠ΅Π½ΡΠΈΠΊΠΎΠ² Ρ ΡΠ΄Π΅Π»Π°Π».
- ΠΠ±ΡΠ°ΡΠΈΡΠ΅ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅, ΡΡΠΎ:
- ΠΠ°ΠΏΠΊΠ° Ρ Π΄Π°Π³Π°ΠΌΠΈ Π΄ΠΎΠ»ΠΆΠ½Π° Π±ΡΡΡ Π΄ΠΎΡΡΡΠΏΠ½Π° ΠΊΠ°ΠΊ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊΡ, ΡΠ°ΠΊ ΠΈ Π²ΠΎΡΠΊΠ΅ΡΠ°ΠΌ.
- Π’ΠΎ ΠΆΠ΅ ΡΠ°ΠΌΠΎΠ΅ ΠΊΠ°ΡΠ°Π΅ΡΡΡ ΠΈ Π²ΡΠ΅Ρ ΡΡΠΎΡΠΎΠ½Π½ΠΈΡ Π±ΠΈΠ±Π»ΠΈΠΎΡΠ΅ΠΊ β ΠΎΠ½ΠΈ Π²ΡΠ΅ Π΄ΠΎΠ»ΠΆΠ½Ρ Π±ΡΡΡ ΡΡΡΠ°Π½ΠΎΠ²Π»Π΅Π½Ρ Π½Π° ΠΌΠ°ΡΠΈΠ½Ρ Ρ ΡΠ΅Π΄ΡΠ»Π΅ΡΠΎΠΌ ΠΈ Π²ΠΎΡΠΊΠ΅ΡΠ°ΠΌΠΈ.
ΠΡ Π° ΡΠ΅ΠΏΠ΅ΡΡ ΠΏΡΠΎΡΡΠΎ:
$ docker-compose up --scale worker=3
ΠΠΎΡΠ»Π΅ ΡΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ Π²ΡΡ ΠΏΠΎΠ΄Π½ΠΈΠΌΠ΅ΡΡΡ, ΠΌΠΎΠΆΠ½ΠΎ ΡΠΌΠΎΡΡΠ΅ΡΡ Π½Π° Π²Π΅Π±-ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡΡ:
- Airflow:
http://127.0.0.1:8080/admin/ - Flower:
http://127.0.0.1:5555/dashboard
ΠΡΠ½ΠΎΠ²Π½ΡΠ΅ ΠΏΠΎΠ½ΡΡΠΈΡ
ΠΡΠ»ΠΈ Π²Ρ Π½ΠΈΡΠ΅Π³ΠΎ Π½Π΅ ΠΏΠΎΠ½ΡΠ»ΠΈ Π²ΠΎ Π²ΡΠ΅Ρ ΡΡΠΈΡ Β«Π΄Π°Π³Π°Ρ Β», ΡΠΎ Π²ΠΎΡ ΠΊΡΠ°ΡΠΊΠΈΠΉ ΡΠ»ΠΎΠ²Π°ΡΠΈΠΊ:
- Scheduler β ΡΠ°ΠΌΡΠΉ Π³Π»Π°Π²Π½ΡΠΉ Π΄ΡΠ΄ΡΠΊΠ° Π² Airflow, ΠΊΠΎΠ½ΡΡΠΎΠ»ΠΈΡΡΡΡΠΈΠΉ, ΡΡΠΎΠ±Ρ Π²ΠΊΠ°Π»ΡΠ²Π°Π»ΠΈ ΡΠΎΠ±ΠΎΡΡ, Π° Π½Π΅ ΡΠ΅Π»ΠΎΠ²Π΅ΠΊ: ΡΠ»Π΅Π΄ΠΈΡ Π·Π° ΡΠ°ΡΠΏΠΈΡΠ°Π½ΠΈΠ΅ΠΌ, ΠΎΠ±Π½ΠΎΠ²Π»ΡΠ΅Ρ Π΄Π°Π³ΠΈ, Π·Π°ΠΏΡΡΠΊΠ°Π΅Ρ ΡΠ°ΡΠΊΠΈ.
ΠΠΎΠΎΠ±ΡΠ΅, Π² ΡΡΠ°ΡΡΡ Π²Π΅ΡΡΠΈΡΡ , Ρ Π½Π΅Π³ΠΎ Π±ΡΠ»ΠΈ ΠΏΡΠΎΠ±Π»Π΅ΠΌΡ Ρ ΠΏΠ°ΠΌΡΡΡΡ (Π½Π΅Ρ, Π½Π΅ Π°ΠΌΠ½Π΅Π·ΠΈΡ, Π° ΡΡΠ΅ΡΠΊΠΈ) ΠΈ Π² ΠΊΠΎΠ½ΡΠΈΠ³Π°Ρ Π΄Π°ΠΆΠ΅ ΠΎΡΡΠ°Π»ΡΡ Π»Π΅Π³Π°ΡΠΈ-ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡ
run_duration
β ΠΈΠ½ΡΠ΅ΡΠ²Π°Π» Π΅Π³ΠΎ ΠΏΠ΅ΡΠ΅Π·Π°ΠΏΡΡΠΊΠ°. ΠΠΎ ΡΠ΅ΠΉΡΠ°Ρ Π²ΡΡ Ρ ΠΎΡΠΎΡΠΎ. - DAG (ΠΎΠ½ ΠΆΠ΅ Β«Π΄Π°Π³Β») β Β«Π½Π°ΠΏΡΠ°Π²Π»Π΅Π½Π½ΡΠΉ Π°ΡΠΈΠΊΠ»ΠΈΡΠ½ΡΠΉ Π³ΡΠ°ΡΒ», Π½ΠΎ ΡΠ°ΠΊΠΎΠ΅ ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½ΠΈΠ΅ ΠΌΠ°Π»ΠΎ ΠΊΠΎΠΌΡ ΡΡΠΎ ΡΠΊΠ°ΠΆΠ΅Ρ, Π° ΠΏΠΎ ΡΡΡΠΈ ΡΡΠΎ ΠΊΠΎΠ½ΡΠ΅ΠΉΠ½Π΅Ρ Π΄Π»Ρ Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΡΡΡΠΈΡ
Π΄ΡΡΠ³ Ρ Π΄ΡΡΠ³ΠΎΠΌ ΡΠ°ΡΠΊΠΎΠ² (ΡΠΌ. Π½ΠΈΠΆΠ΅) ΠΈΠ»ΠΈ Π°Π½Π°Π»ΠΎΠ³ Package Π² SSIS ΠΈ Workflow Π² Informatica.
ΠΠΎΠΌΠΈΠΌΠΎ Π΄Π°Π³ΠΎΠ² Π΅ΡΠ΅ ΠΌΠΎΠ³ΡΡ Π±ΡΡΡ ΡΠ°Π±Π΄Π°Π³ΠΈ, Π½ΠΎ ΠΌΡ Π΄ΠΎ Π½ΠΈΡ ΡΠΊΠΎΡΠ΅Π΅ Π²ΡΠ΅Π³ΠΎ Π½Π΅ Π΄ΠΎΠ±Π΅ΡΡΠΌΡΡ.
- DAG Run β ΠΈΠ½ΠΈΡΠΈΠ°Π»ΠΈΠ·ΠΈΡΠΎΠ²Π°Π½Π½ΡΠΉ Π΄Π°Π³, ΠΊΠΎΡΠΎΡΠΎΠΌΡ ΠΏΡΠΈΡΠ²ΠΎΠ΅Π½ ΡΠ²ΠΎΠΉ
execution_date
. ΠΠ°Π³ΡΠ°Π½Ρ ΠΎΠ΄Π½ΠΎΠ³ΠΎ Π΄Π°Π³Π° ΠΌΠΎΠ³ΡΡ Π²ΠΏΠΎΠ»Π½Π΅ ΡΠ°Π±ΠΎΡΠ°ΡΡ ΠΏΠ°ΡΠ°Π»Π»Π΅Π»ΡΠ½ΠΎ (Π΅ΡΠ»ΠΈ Π²Ρ, ΠΊΠΎΠ½Π΅ΡΠ½ΠΎ, ΡΠ΄Π΅Π»Π°Π»ΠΈ ΡΠ²ΠΎΠΈ ΡΠ°ΡΠΊΠΈ ΠΈΠ΄Π΅ΠΌΠΏΠΎΡΠ΅Π½ΡΠ½ΡΠΌΠΈ). - Operator β ΡΡΠΎ ΠΊΡΡΠΎΡΠΊΠΈ ΠΊΠΎΠ΄Π°, ΠΎΡΠ²Π΅ΡΡΡΠ²Π΅Π½Π½ΡΠ΅ Π·Π° Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ ΠΊΠ°ΠΊΠΎΠ³ΠΎ-Π»ΠΈΠ±ΠΎ ΠΊΠΎΠ½ΠΊΡΠ΅ΡΠ½ΠΎΠ³ΠΎ Π΄Π΅ΠΉΡΡΠ²ΠΈΡ. ΠΡΡΡ ΡΡΠΈ ΡΠΈΠΏΠ° ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠΎΠ²:
- action, ΠΊΠ°ΠΊ Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ Π½Π°Ρ Π»ΡΠ±ΠΈΠΌΡΠΉ
PythonOperator
, ΠΊΠΎΡΠΎΡΡΠΉ Π² ΡΠΈΠ»Π°Ρ Π²ΡΠΏΠΎΠ»Π½ΠΈΡΡ Π»ΡΠ±ΠΎΠΉ (Π²Π°Π»ΠΈΠ΄Π½ΡΠΉ) Python-ΠΊΠΎΠ΄; - transfer, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΏΠ΅ΡΠ΅Π²ΠΎΠ·ΡΡ Π΄Π°Π½Π½ΡΠ΅ Ρ ΠΌΠ΅ΡΡΠ° Π½Π° ΠΌΠ΅ΡΡΠΎ, ΡΠΊΠ°ΠΆΠ΅ΠΌ,
MsSqlToHiveTransfer
; - sensor ΠΆΠ΅ ΠΏΠΎΠ·Π²ΠΎΠ»ΠΈΡ ΡΠ΅Π°Π³ΠΈΡΠΎΠ²Π°ΡΡ ΠΈΠ»ΠΈ ΠΏΡΠΈΡΠΎΡΠΌΠΎΠ·ΠΈΡΡ Π΄Π°Π»ΡΠ½Π΅ΠΉΡΠ΅Π΅ Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ Π΄Π°Π³Π° Π΄ΠΎ Π½Π°ΡΡΡΠΏΠ»Π΅Π½ΠΈΡ ΠΊΠ°ΠΊΠΎΠ³ΠΎ-Π»ΠΈΠ±ΠΎ ΡΠΎΠ±ΡΡΠΈΡ.
HttpSensor
ΠΌΠΎΠΆΠ΅Ρ Π΄Π΅ΡΠ³Π°ΡΡ ΡΠΊΠ°Π·Π°Π½Π½ΡΠΉ ΡΠ½Π΄ΠΏΠΎΠΉΠ½Ρ, ΠΈ ΠΊΠΎΠ³Π΄Π° Π΄ΠΎΠΆΠ΄Π΅ΡΡΡ Π½ΡΠΆΠ½ΡΠΉ ΠΎΡΠ²Π΅Ρ, Π·Π°ΠΏΡΡΡΠΈΡΡ ΡΡΠ°Π½ΡΡΠ΅ΡGoogleCloudStorageToS3Operator
. ΠΡΡΠ»ΠΈΠ²ΡΠΉ ΡΠΌ ΡΠΏΡΠΎΡΠΈΡ: Β«Π·Π°ΡΠ΅ΠΌ? ΠΠ΅Π΄Ρ ΠΌΠΎΠΆΠ½ΠΎ Π΄Π΅Π»Π°ΡΡ ΠΏΠΎΠ²ΡΠΎΡΡ ΠΏΡΡΠΌΠΎ Π² ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠ΅!Β» Π Π·Π°ΡΠ΅ΠΌ, ΡΡΠΎΠ±Ρ Π½Π΅ Π·Π°Π±ΠΈΠ²Π°ΡΡ ΠΏΡΠ» ΡΠ°ΡΠΊΠΎΠ² ΠΏΠΎΠ΄Π²ΠΈΡΡΠΈΠΌΠΈ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠ°ΠΌΠΈ. Π‘Π΅Π½ΡΠΎΡ Π·Π°ΠΏΡΡΠΊΠ°Π΅ΡΡΡ, ΠΏΡΠΎΠ²Π΅ΡΡΠ΅Ρ ΠΈ ΡΠΌΠΈΡΠ°Π΅Ρ Π΄ΠΎ ΡΠ»Π΅Π΄ΡΡΡΠ΅ΠΉ ΠΏΠΎΠΏΡΡΠΊΠΈ.
- action, ΠΊΠ°ΠΊ Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ Π½Π°Ρ Π»ΡΠ±ΠΈΠΌΡΠΉ
- Task β ΠΎΠ±ΡΡΠ²Π»Π΅Π½Π½ΡΠ΅ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΡ Π²Π½Π΅ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ ΠΎΡ ΡΠΈΠΏΠ° ΠΈ ΠΏΡΠΈΠΊΡΠ΅ΠΏΠ»Π΅Π½Π½ΡΠ΅ ΠΊ Π΄Π°Π³Ρ ΠΏΠΎΠ²ΡΡΠ°ΡΡΡΡ Π΄ΠΎ ΡΠΈΠ½Π° ΡΠ°ΡΠΊΠ°.
- Task instance β ΠΊΠΎΠ³Π΄Π° Π³Π΅Π½Π΅ΡΠ°Π»-ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊ ΡΠ΅ΡΠΈΠ», ΡΡΠΎ ΡΠ°ΡΠΊΠΈ ΠΏΠΎΡΠ° ΠΎΡΠΏΡΠ°Π²Π»ΡΡΡ Π² Π±ΠΎΠΉ Π½Π° ΠΈΡΠΏΠΎΠ»Π½ΠΈΡΠ΅Π»ΠΈ-Π²ΠΎΡΠΊΠ΅ΡΡ (ΠΏΡΡΠΌΠΎ Π½Π° ΠΌΠ΅ΡΡΠ΅, Π΅ΡΠ»ΠΈ ΠΌΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ
LocalExecutor
ΠΈΠ»ΠΈ Π½Π° ΡΠ΄Π°Π»ΡΠ½Π½ΡΡ Π½ΠΎΠ΄Ρ Π² ΡΠ»ΡΡΠ°Π΅ ΡCeleryExecutor
), ΠΎΠ½ Π½Π°Π·Π½Π°ΡΠ°Π΅Ρ ΠΈΠΌ ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡ (Ρ. Π΅. ΠΊΠΎΠΌΠΏΠ»Π΅ΠΊΡ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΡ β ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΠΎΠ² Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΡ), ΡΠ°Π·Π²ΠΎΡΠ°ΡΠΈΠ²Π°Π΅Ρ ΡΠ°Π±Π»ΠΎΠ½Ρ ΠΊΠΎΠΌΠ°Π½Π΄ ΠΈΠ»ΠΈ Π·Π°ΠΏΡΠΎΡΠΎΠ² ΠΈ ΡΠΊΠ»Π°Π΄ΡΠ²Π°Π΅Ρ ΠΈΡ Π² ΠΏΡΠ».
ΠΠ΅Π½Π΅ΡΠΈΡΡΠ΅ΠΌ ΡΠ°ΡΠΊΠΈ
Π‘ΠΏΠ΅ΡΠ²Π° ΠΎΠ±ΠΎΠ·Π½Π°ΡΠΈΠΌ ΠΎΠ±ΡΡΡ ΡΡ Π΅ΠΌΡ Π½Π°ΡΠ΅Π³ΠΎ Π΄Π°Π³Π°, Π° Π·Π°ΡΠ΅ΠΌ Π±ΡΠ΄Π΅ΠΌ Π²ΡΡ Π±ΠΎΠ»ΡΡΠ΅ ΠΈ Π±ΠΎΠ»ΡΡΠ΅ ΠΏΠΎΠ³ΡΡΠΆΠ°ΡΡΡΡ Π² Π΄Π΅ΡΠ°Π»ΠΈ, ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎ ΠΌΡ ΠΏΡΠΈΠΌΠ΅Π½ΡΠ΅ΠΌ Π½Π΅ΠΊΠΎΡΠΎΡΡΠ΅ Π½Π΅ΡΡΠΈΠ²ΠΈΠ°Π»ΡΠ½ΡΠ΅ ΡΠ΅ΡΠ΅Π½ΠΈΡ.
ΠΡΠ°ΠΊ, Π² ΠΏΡΠΎΡΡΠ΅ΠΉΡΠ΅ΠΌ Π²ΠΈΠ΄Π΅ ΠΏΠΎΠ΄ΠΎΠ±Π½ΡΠΉ Π΄Π°Π³ Π±ΡΠ΄Π΅Ρ Π²ΡΠ³Π»ΡΠ΄Π΅ΡΡ ΡΠ°ΠΊ:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
ΠΠ°Π²Π°ΠΉΡΠ΅ ΡΠ°Π·Π±ΠΈΡΠ°ΡΡΡΡ:
- Π‘ΠΏΠ΅ΡΠ²Π° ΠΈΠΌΠΏΠΎΡΡΠΈΡΡΠ΅ΠΌ Π½ΡΠΆΠ½ΡΠ΅ Π»ΠΈΠ±Ρ ΠΈ ΠΊΠΎΠ΅ ΡΡΠΎ Π΅ΡΡ;
sql_server_ds
β ΡΡΠΎList[namedtuple[str, str]]
Ρ ΠΈΠΌΠ΅Π½Π°ΠΌΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡΠΎΠ² ΠΈΠ· Airflow Connections ΠΈ Π±Π°Π·Π°ΠΌΠΈ Π΄Π°Π½Π½ΡΡ ΠΈΠ· ΠΊΠΎΡΠΎΡΡΡ ΠΌΡ Π±ΡΠ΄Π΅ΠΌ Π·Π°Π±ΠΈΡΠ°ΡΡ Π½Π°ΡΡ ΡΠ°Π±Π»ΠΈΡΠΊΡ;dag
β ΠΎΠ±ΡΡΠ²Π»Π΅Π½ΠΈΠ΅ Π½Π°ΡΠ΅Π³ΠΎ Π΄Π°Π³Π°, ΠΊΠΎΡΠΎΡΠΎΠ΅ ΠΎΠ±ΡΠ·Π°ΡΠ΅Π»ΡΠ½ΠΎ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Π»Π΅ΠΆΠ°ΡΡ Π²globals()
, ΠΈΠ½Π°ΡΠ΅ Airflow Π΅Π³ΠΎ Π½Π΅ Π½Π°ΠΉΠ΄Π΅Ρ. ΠΠ°Π³Ρ ΡΠ°ΠΊΠΆΠ΅ Π½ΡΠΆΠ½ΠΎ ΡΠΊΠ°Π·Π°ΡΡ:- ΡΡΠΎ Π΅Π³ΠΎ Π·ΠΎΠ²ΡΡ
orders
β ΡΡΠΎ ΠΈΠΌΡ ΠΏΠΎΡΠΎΠΌ Π±ΡΠ΄Π΅Ρ ΠΌΠ°ΡΡΠΈΡΡ Π² Π²Π΅Π±-ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡΠ΅, - ΡΡΠΎ ΡΠ°Π±ΠΎΡΠ°ΡΡ ΠΎΠ½ Π±ΡΠ΄Π΅Ρ, Π½Π°ΡΠΈΠ½Π°Ρ Ρ ΠΏΠΎΠ»ΡΠ½ΠΎΡΠΈ Π²ΠΎΡΡΠΌΠΎΠ³ΠΎ ΠΈΡΠ»Ρ,
- Π° Π·Π°ΠΏΡΡΠΊΠ°ΡΡ ΠΎΠ½ Π΄ΠΎΠ»ΠΆΠ΅Π½, ΠΏΡΠΈΠΌΠ΅ΡΠ½ΠΎ ΠΊΠ°ΠΆΠ΄ΡΠ΅ 6 ΡΠ°ΡΠΎΠ² (Π΄Π»Ρ ΠΊΡΡΡΡΡ
ΠΏΠ°ΡΠ½Π΅ΠΉ Π·Π΄Π΅ΡΡ Π²ΠΌΠ΅ΡΡΠΎ
timedelta()
Π΄ΠΎΠΏΡΡΡΠΈΠΌΠ°cron
-ΡΡΡΠΎΠΊΠ°0 0 0/6 ? * * *
, Π΄Π»Ρ ΠΌΠ΅Π½Π΅Π΅ ΠΊΡΡΡΡΡ β Π²ΡΡΠ°ΠΆΠ΅Π½ΠΈΠ΅ Π²ΡΠΎΠ΄Π΅@daily
);
- ΡΡΠΎ Π΅Π³ΠΎ Π·ΠΎΠ²ΡΡ
workflow()
Π±ΡΠ΄Π΅Ρ Π΄Π΅Π»Π°ΡΡ ΠΎΡΠ½ΠΎΠ²Π½ΡΡ ΡΠ°Π±ΠΎΡΡ, Π½ΠΎ Π½Π΅ ΡΠ΅ΠΉΡΠ°Ρ. Π‘Π΅ΠΉΡΠ°Ρ ΠΌΡ ΠΏΡΠΎΡΡΠΎ Π²ΡΡΡΠΏΠ΅ΠΌ Π½Π°Ρ ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡ Π² Π»ΠΎΠ³.- Π ΡΠ΅ΠΏΠ΅ΡΡ ΠΏΡΠΎΡΡΠ°Ρ ΠΌΠ°Π³ΠΈΡ ΡΠΎΠ·Π΄Π°Π½ΠΈΡ ΡΠ°ΡΠΊΠΎΠ²:
- ΠΏΡΠΎΠ±Π΅Π³Π°Π΅ΠΌ ΠΏΠΎ Π½Π°ΡΠΈΠΌ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠ°ΠΌ;
- ΠΈΠ½ΠΈΡΠΈΠ°Π»ΠΈΠ·ΠΈΡΡΠ΅ΠΌ
PythonOperator
, ΠΊΠΎΡΠΎΡΡΠΉ Π±ΡΠ΄Π΅Ρ Π²ΡΠΏΠΎΠ»Π½ΡΡΡ Π½Π°ΡΡ ΠΏΡΡΡΡΡΠΊΡworkflow()
. ΠΠ΅ Π·Π°Π±ΡΠ²Π°ΠΉΡΠ΅ ΡΠΊΠ°Π·ΡΠ²Π°ΡΡ ΡΠ½ΠΈΠΊΠ°Π»ΡΠ½ΠΎΠ΅ (Π² ΡΠ°ΠΌΠΊΠ°Ρ Π΄Π°Π³Π°) ΠΈΠΌΡ ΡΠ°ΡΠΊΠ° ΠΈ ΠΏΠΎΠ΄Π²ΡΠ·ΡΠ²Π°ΡΡ ΡΠ°ΠΌ Π΄Π°Π³. Π€Π»Π°Π³provide_context
Π² ΡΠ²ΠΎΡ ΠΎΡΠ΅ΡΠ΅Π΄Ρ Π½Π°ΡΡΠΏΠ΅Ρ Π² ΡΡΠ½ΠΊΡΠΈΡ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡΠ΅Π»ΡΠ½ΡΡ Π°ΡΠ³ΡΠΌΠ΅Π½ΡΠΎΠ², ΠΊΠΎΡΠΎΡΡΠ΅ ΠΌΡ Π±Π΅ΡΠ΅ΠΆΠ½ΠΎ ΡΠΎΠ±Π΅ΡΡΠΌ Ρ ΠΏΠΎΠΌΠΎΡΡΡ**context
.
ΠΠΎΠΊΠ° Π½Π° ΡΡΠΎΠΌ Π²ΡΡ. Π§ΡΠΎ ΠΌΡ ΠΏΠΎΠ»ΡΡΠΈΠ»ΠΈ:
- Π½ΠΎΠ²ΡΠΉ Π΄Π°Π³ Π² Π²Π΅Π±-ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡΠ΅,
- ΠΏΠΎΠ»ΡΠΎΡΡ ΡΠΎΡΠ½ΠΈ ΡΠ°ΡΠΊΠΎΠ², ΠΊΠΎΡΠΎΡΡΠ΅ Π±ΡΠ΄ΡΡ Π²ΡΠΏΠΎΠ»Π½ΡΡΡΡΡ ΠΏΠ°ΡΠ°Π»Π»Π΅Π»ΡΠ½ΠΎ (Π΅ΡΠ»ΠΈ ΡΠΎ ΠΏΠΎΠ·Π²ΠΎΠ»ΡΡ Π½Π°ΡΡΡΠΎΠΉΠΊΠΈ Airflow, Celery ΠΈ ΠΌΠΎΡΠ½ΠΎΡΡΠΈ ΡΠ΅ΡΠ²Π΅ΡΠΎΠ²).
ΠΡ, ΠΏΠΎΡΡΠΈ ΠΏΠΎΠ»ΡΡΠΈΠ»ΠΈ.
ΠΠ°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ ΠΊΡΠΎ Π±ΡΠ΄Π΅Ρ ΡΡΠ°Π²ΠΈΡΡ?
Π§ΡΠΎΠ±Ρ Π²ΡΡ ΡΡΠΎ Π΄Π΅Π»ΠΎ ΡΠΏΡΠΎΡΡΠΈΡΡ Ρ Π²ΠΊΠΎΡΡΡΠΈΠ» Π² docker-compose.yml
ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΡ requirements.txt
Π½Π° Π²ΡΠ΅Ρ
Π½ΠΎΠ΄Π°Ρ
.
ΠΠΎΡ ΡΠ΅ΠΏΠ΅ΡΡ ΠΏΠΎΠ½Π΅ΡΠ»Π°ΡΡ:
Π‘Π΅ΡΡΠ΅ ΠΊΠ²Π°Π΄ΡΠ°ΡΠΈΠΊΠΈ β task instances, ΠΎΠ±ΡΠ°Π±ΠΎΡΠ°Π½Π½ΡΠ΅ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊΠΎΠΌ.
ΠΠ΅ΠΌΠ½ΠΎΠ³ΠΎ ΠΆΠ΄Π΅ΠΌ, Π·Π°Π΄Π°ΡΠΈ ΡΠ°ΡΡ Π²Π°ΡΡΠ²Π°ΡΡ Π²ΠΎΡΠΊΠ΅ΡΡ:
ΠΠ΅Π»Π΅Π½ΡΠ΅, ΠΏΠΎΠ½ΡΡΠ½ΠΎΠ΅ Π΄Π΅Π»ΠΎ, β ΡΡΠΏΠ΅ΡΠ½ΠΎ ΠΎΡΡΠ°Π±ΠΎΡΠ°Π²ΡΠΈΠ΅. ΠΡΠ°ΡΠ½ΡΠ΅ β Π½Π΅ ΠΎΡΠ΅Π½Ρ ΡΡΠΏΠ΅ΡΠ½ΠΎ.
ΠΡΡΠ°ΡΠΈ, Π½Π° Π½Π°ΡΠ΅ΠΌ ΠΏΡΠΎΠ΄Π΅ Π½ΠΈΠΊΠ°ΠΊΠΎΠΉ ΠΏΠ°ΠΏΠΊΠΈ
./dags
, ΡΠΈΠ½Ρ ΡΠΎΠ½ΠΈΠ·ΠΈΡΡΡΡΠ΅ΠΉΡΡ ΠΌΠ΅ΠΆΠ΄Ρ ΠΌΠ°ΡΠΈΠ½Π°ΠΌΠΈ Π½Π΅Ρ β Π²ΡΡ Π΄Π°Π³ΠΈ Π»Π΅ΠΆΠ°Ρ Π²git
Π½Π° Π½Π°ΡΠ΅ΠΌ Gitlab, Π° Gitlab CI ΡΠ°ΡΠΊΠ»Π°Π΄ΡΠ²Π°Π΅Ρ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½ΠΈΡ Π½Π° ΠΌΠ°ΡΠΈΠ½Ρ ΠΏΡΠΈ ΠΌΡΡΠ΄ΠΆΠ΅ Π²master
.
ΠΠ΅ΠΌΠ½ΠΎΠ³ΠΎ ΠΎ Flower
ΠΠΎΠΊΠ° Π²ΠΎΡΠΊΠ΅ΡΡ ΠΌΠΎΠ»ΠΎΡΡΡ Π½Π°ΡΠΈ ΡΠ°ΡΠΎΡΠΊΠΈ-ΠΏΡΡΡΡΡΠΊΠΈ, Π²ΡΠΏΠΎΠΌΠ½ΠΈΠΌ ΠΏΡΠΎ Π΅ΡΠ΅ ΠΎΠ΄ΠΈΠ½ ΠΈΠ½ΡΡΡΡΠΌΠ΅Π½Ρ, ΠΊΠΎΡΠΎΡΡΠΉ ΠΌΠΎΠΆΠ΅Ρ Π½Π°ΠΌ ΠΊΠΎΠ΅-ΡΡΠΎ ΠΏΠΎΠΊΠ°Π·Π°ΡΡ β Flower.
Π‘Π°ΠΌΠ°Ρ ΠΏΠ΅ΡΠ²Π°Ρ ΡΡΡΠ°Π½ΠΈΡΠΊΠ° Ρ ΡΡΠΌΠΌΠ°ΡΠ½ΠΎΠΉ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΠ΅ΠΉ ΠΏΠΎ Π½ΠΎΠ΄Π°ΠΌ-Π²ΠΎΡΠΊΠ΅ΡΠ°ΠΌ:
Π‘Π°ΠΌΠ°Ρ Π½Π°ΡΡΡΠ΅Π½Π½Π°Ρ ΡΡΡΠ°Π½ΠΈΡΠΊΠ° Ρ Π·Π°Π΄Π°ΡΠ°ΠΌΠΈ, ΠΎΡΠΏΡΠ°Π²ΠΈΠ²ΡΠΈΠΌΠΈΡΡ Π² ΡΠ°Π±ΠΎΡΡ:
Π‘Π°ΠΌΠ°Ρ ΡΠΊΡΡΠ½Π°Ρ ΡΡΡΠ°Π½ΠΈΡΠΊΠ° Ρ ΡΠΎΡΡΠΎΡΠ½ΠΈΠ΅ΠΌ Π½Π°ΡΠ΅Π³ΠΎ Π±ΡΠΎΠΊΠ΅ΡΠ°:
Π‘Π°ΠΌΠ°Ρ ΡΡΠΊΠ°Ρ ΡΡΡΠ°Π½ΠΈΡΠΊΠ° β Ρ Π³ΡΠ°ΡΠΈΠΊΠ°ΠΌΠΈ ΡΠΎΡΡΠΎΡΠ½ΠΈΡ ΡΠ°ΡΠΊΠΎΠ² ΠΈ ΠΈΡ Π²ΡΠ΅ΠΌΠ΅Π½Π΅ΠΌ Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΡ:
ΠΠΎΠ³ΡΡΠΆΠ°Π΅ΠΌ Π½Π΅Π΄ΠΎΠ³ΡΡΠΆΠ΅Π½Π½ΠΎΠ΅
ΠΡΠ°ΠΊ, Π²ΡΠ΅ ΡΠ°ΡΠΊΠΈ ΠΎΡΡΠ°Π±ΠΎΡΠ°Π»ΠΈ, ΠΌΠΎΠΆΠ½ΠΎ ΡΠ½ΠΎΡΠΈΡΡ ΡΠ°Π½Π΅Π½ΡΡ .
Π ΡΠ°Π½Π΅Π½ΡΡ ΠΎΠΊΠ°Π·Π°Π»ΠΎΡΡ Π½Π΅ΠΌΠ°Π»ΠΎ β ΠΏΠΎ ΡΠ΅ΠΌ ΠΈΠ»ΠΈ ΠΈΠ½ΡΠΌ ΠΏΡΠΈΡΠΈΠ½Π°ΠΌΠΈ. Π ΡΠ»ΡΡΠ°Π΅ ΠΏΡΠ°Π²ΠΈΠ»ΡΠ½ΠΎΠ³ΠΎ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΡ Airflow Π²ΠΎΡ ΡΡΠΈ ΡΠ°ΠΌΡΠ΅ ΠΊΠ²Π°Π΄ΡΠ°ΡΡ Π³ΠΎΠ²ΠΎΡΡΡ ΠΎ ΡΠΎΠΌ, ΡΡΠΎ Π΄Π°Π½Π½ΡΠ΅ ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½Π½ΠΎ Π½Π΅ Π΄ΠΎΠ΅Ρ Π°Π»ΠΈ.
ΠΡΠΆΠ½ΠΎ ΡΠΌΠΎΡΡΠ΅ΡΡ Π»ΠΎΠ³ ΠΈ ΠΏΠ΅ΡΠ΅Π·Π°ΠΏΡΡΠΊΠ°ΡΡ ΡΠΏΠ°Π²ΡΠΈΠ΅ task instances.
ΠΠΌΡΠΊΠ½ΡΠ² Π½Π° Π»ΡΠ±ΠΎΠΉ ΠΊΠ²Π°Π΄ΡΠ°Ρ, ΡΠ²ΠΈΠ΄ΠΈΠΌ Π΄ΠΎΡΡΡΠΏΠ½ΡΠ΅ Π½Π°ΠΌ Π΄Π΅ΠΉΡΡΠ²ΠΈΡ:
ΠΠΎΠΆΠ½ΠΎ Π²Π·ΡΡΡ, ΠΈ ΡΠ΄Π΅Π»Π°ΡΡ Clear ΡΠΏΠ°Π²ΡΠ΅ΠΌΡ. Π’ΠΎ Π΅ΡΡΡ, ΠΌΡ Π·Π°Π±ΡΠ²Π°Π΅ΠΌ ΠΎ ΡΠΎΠΌ, ΡΡΠΎ ΡΠ°ΠΌ ΡΡΠΎ-ΡΠΎ Π·Π°Π²Π°Π»ΠΈΠ»ΠΎΡΡ, ΠΈ ΡΠΎΡ ΠΆΠ΅ ΡΠ°ΠΌΡΠΉ ΠΈΠ½ΡΡΠ°Π½Ρ ΡΠ°ΡΠΊΠ° ΡΠΉΠ΄Π΅Ρ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊΡ.
ΠΠΎΠ½ΡΡΠ½ΠΎ, ΡΡΠΎ Π΄Π΅Π»Π°ΡΡ ΡΠ°ΠΊ ΠΌΡΡΠΊΠΎΠΉ ΡΠΎ Π²ΡΠ΅ΠΌΠΈ ΠΊΡΠ°ΡΠ½ΡΠΌΠΈ ΠΊΠ²Π°Π΄ΡΠ°ΡΠ°ΠΌΠΈ Π½Π΅ ΠΎΡΠ΅Π½Ρ Π³ΡΠΌΠ°Π½Π½ΠΎ β Π½Π΅ ΡΡΠΎΠ³ΠΎ ΠΌΡ ΠΆΠ΄Π΅ΠΌ ΠΎΡ Airflow. ΠΡΡΠ΅ΡΡΠ²Π΅Π½Π½ΠΎ, Ρ Π½Π°Ρ Π΅ΡΡΡ ΠΎΡΡΠΆΠΈΠ΅ ΠΌΠ°ΡΡΠΎΠ²ΠΎΠ³ΠΎ ΠΏΠΎΡΠ°ΠΆΠ΅Π½ΠΈΡ: Browse/Task Instances
ΠΡΠ±Π΅ΡΠ΅ΠΌ Π²ΡΡ ΡΠ°Π·ΠΎΠΌ ΠΈ ΠΎΠ±Π½ΡΠ»ΠΈΠΌ Π½Π°ΠΆΠΌΠ΅ΠΌ ΠΏΡΠ°Π²ΠΈΠ»ΡΠ½ΡΠΉ ΠΏΡΠ½ΠΊΡ:
ΠΠΎΡΠ»Π΅ ΠΎΡΠΈΡΡΠΊΠΈ Π½Π°ΡΠΈ ΡΠ°ΠΊΡΠΈ Π²ΡΠ³Π»ΡΠ΄ΡΡ ΡΠ°ΠΊ (ΠΎΠ½ΠΈ ΡΠΆΠ΅ ΠΆΠ΄ΡΡ Π½Π΅ Π΄ΠΎΠΆΠ΄ΡΡΡΡ, ΠΊΠΎΠ³Π΄Π° ΡΠ΅Π΄ΡΠ»Π΅Ρ ΠΈΡ Π·Π°ΠΏΠ»Π°Π½ΠΈΡΡΠ΅Ρ):
Π‘ΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ, Ρ ΡΠΊΠΈ ΠΈ ΠΏΡΠΎΡΠΈΠ΅ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΠ΅
Π‘Π°ΠΌΠΎΠ΅ Π²ΡΠ΅ΠΌΡ ΠΏΠΎΡΠΌΠΎΡΡΠ΅ΡΡ Π½Π° ΡΠ»Π΅Π΄ΡΡΡΠΈΠΉ DAG, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ
ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
ΠΡΠ΅ Π²Π΅Π΄Ρ ΠΊΠΎΠ³Π΄Π°-Π½ΠΈΠ±ΡΠ΄Ρ Π΄Π΅Π»Π°Π»ΠΈ ΠΎΠ±Π½ΠΎΠ²Π»ΡΠ»ΠΊΡ ΠΎΡΡΠ΅ΡΠΎΠ²? ΠΡΠΎ ΡΠ½ΠΎΠ²Π° ΠΎΠ½Π°: Π΅ΡΡΡ ΡΠΏΠΈΡΠΎΠΊ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠΎΠ², ΠΎΡΠΊΡΠ΄Π° Π·Π°Π±ΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅; Π΅ΡΡΡ ΡΠΏΠΈΡΠΎΠΊ, ΠΊΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ; Π½Π΅ Π·Π°Π±ΡΠ²Π°Π΅ΠΌ ΠΏΠΎΡΠΈΠ³Π½Π°Π»ΠΈΡΡ, ΠΊΠΎΠ³Π΄Π° Π²ΡΡ ΡΠ»ΡΡΠΈΠ»ΠΎΡΡ ΠΈΠ»ΠΈ ΡΠ»ΠΎΠΌΠ°Π»ΠΎΡΡ (Π½Ρ ΡΡΠΎ Π½Π΅ ΠΏΡΠΎ Π½Π°Ρ, Π½Π΅Ρ).
ΠΠ°Π²Π°ΠΉΡΠ΅ ΡΠ½ΠΎΠ²Π° ΠΏΡΠΎΠΉΠ΄Π΅ΠΌΡΡ ΠΏΠΎ ΡΠ°ΠΉΠ»Ρ ΠΈ ΠΏΠΎΡΠΌΠΎΡΡΠΈΠΌ Π½Π° Π½ΠΎΠ²ΡΠ΅ Π½Π΅ΠΏΠΎΠ½ΡΡΠ½ΡΠ΅ ΡΡΡΠΊΠΈ:
from commons.operators import TelegramBotSendMessage
β Π½Π°ΠΌ Π½ΠΈΡΡΠΎ Π½Π΅ ΠΌΠ΅ΡΠ°Π΅Ρ Π΄Π΅Π»Π°ΡΡ ΡΠ²ΠΎΠΈ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΡ, ΡΠ΅ΠΌ ΠΌΡ ΠΈ Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π»ΠΈΡΡ, ΡΠ΄Π΅Π»Π°Π² Π½Π΅Π±ΠΎΠ»ΡΡΡΡ ΠΎΠ±ΡΡΡΠΎΡΠΊΡ Π΄Π»Ρ ΠΎΡΠΏΡΠ°Π²ΠΊΠΈ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ Π² Π Π°Π·Π±Π»ΠΎΠΊΠΈΡΠΎΠ²Π°Π½Π½ΡΠΉ. (ΠΠ± ΡΡΠΎΠΌ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠ΅ ΠΌΡ Π΅ΡΠ΅ ΠΏΠΎΠ³ΠΎΠ²ΠΎΡΠΈΠΌ Π½ΠΈΠΆΠ΅);default_args={}
β Π΄Π°Π³ ΠΌΠΎΠΆΠ΅Ρ ΡΠ°Π·Π΄Π°Π²Π°ΡΡ ΠΎΠ΄Π½ΠΈ ΠΈ ΡΠ΅ ΠΆΠ΅ Π°ΡΠ³ΡΠΌΠ΅Π½ΡΡ Π²ΡΠ΅ΠΌ ΡΠ²ΠΎΠΈΠΌ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠ°ΠΌ;to='{{ var.value.all_the_kings_men }}'
β ΠΏΠΎΠ»Π΅to
Ρ Π½Π°Ρ Π±ΡΠ΄Π΅Ρ Π½Π΅ Π·Π°Ρ Π°ΡΠ΄ΠΊΠΎΠΆΠ΅Π½Π½ΡΠΌ, Π° ΡΠΎΡΠΌΠΈΡΡΠ΅ΠΌΡΠΌ Π΄ΠΈΠ½Π°ΠΌΠΈΡΠ΅ΡΠΊΠΈ Ρ ΠΏΠΎΠΌΠΎΡΡΡ Jinja ΠΈ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΠΎΠΉ ΡΠΎ ΡΠΏΠΈΡΠΊΠΎΠΌ email-ΠΎΠ², ΠΊΠΎΡΠΎΡΡΡ Ρ Π·Π°Π±ΠΎΡΠ»ΠΈΠ²ΠΎ ΠΏΠΎΠ»ΠΎΠΆΠΈΠ» Π²Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
β ΡΡΠ»ΠΎΠ²ΠΈΠ΅ Π·Π°ΠΏΡΡΠΊΠ° ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠ°. Π Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅, ΠΏΠΈΡΡΠΌΠΎ ΠΏΠΎΠ»Π΅ΡΠΈΡ Π±ΠΎΡΡΠ°ΠΌ ΡΠΎΠ»ΡΠΊΠΎ Π΅ΡΠ»ΠΈ Π²ΡΠ΅ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ ΠΎΡΡΠ°Π±ΠΎΡΠ°Π»ΠΈ ΡΡΠΏΠ΅ΡΠ½ΠΎ;tg_bot_conn_id='tg_main'
β Π°ΡΠ³ΡΠΌΠ΅Π½ΡΡconn_id
ΠΏΡΠΈΠ½ΠΈΠΌΠ°ΡΡ Π² ΡΠ΅Π±Ρ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΡ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΉ, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΌΡ ΡΠΎΠ·Π΄Π°Π΅ΠΌ Π²Admin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
β ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ Π² Telegram ΡΠ»Π΅ΡΡΡ ΡΠΎΠ»ΡΠΊΠΎ ΠΏΡΠΈ Π½Π°Π»ΠΈΡΠΈΠΈ ΡΠΏΠ°Π²ΡΠΈΡ ΡΠ°ΡΠΊΠΎΠ²;task_concurrency=1
β Π·Π°ΠΏΡΠ΅ΡΠ°Π΅ΠΌ ΠΎΠ΄Π½ΠΎΠ²ΡΠ΅ΠΌΠ΅Π½Π½ΡΠΉ Π·Π°ΠΏΡΡΠΊ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΈΡ task instances ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΡΠ°ΡΠΊΠ°. Π ΠΏΡΠΎΡΠΈΠ²Π½ΠΎΠΌ ΡΠ»ΡΡΠ°Π΅, ΠΌΡ ΠΏΠΎΠ»ΡΡΠΈΠΌ ΠΎΠ΄Π½ΠΎΠ²ΡΠ΅ΠΌΠ΅Π½Π½ΡΠΉ Π·Π°ΠΏΡΡΠΊ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΈΡVerticaOperator
(ΡΠΌΠΎΡΡΡΡΠΈΡ Π½Π° ΠΎΠ΄Π½Ρ ΡΠ°Π±Π»ΠΈΡΡ);report_update >> [email, tg]
β Π²ΡΠ΅VerticaOperator
ΡΠΎΠΉΠ΄ΡΡΡΡ Π² ΠΎΡΠΏΡΠ°Π²ΠΊΠ΅ ΠΏΠΈΡΡΠΌΠ° ΠΈ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ, Π²ΠΎΡ ΡΠ°ΠΊ:
ΠΠΎ ΡΠ°ΠΊ ΠΊΠ°ΠΊ Ρ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠΎΠ²-Π½ΠΎΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠΎΠ² ΡΡΠΎΡΡ ΡΠ°Π·Π½ΡΠ΅ ΡΡΠ»ΠΎΠ²ΠΈΡ Π·Π°ΠΏΡΡΠΊΠ°, ΡΠ°Π±ΠΎΡΠ°ΡΡ Π±ΡΠ΄Π΅Ρ ΡΠΎΠ»ΡΠΊΠΎ ΠΎΠ΄ΠΈΠ½. Π Tree View Π²ΡΡ Π²ΡΠ³Π»ΡΠ΄ΠΈΡ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΠΌΠ΅Π½Π΅Π΅ Π½Π°Π³Π»ΡΠ΄Π½ΠΎ:
Π‘ΠΊΠ°ΠΆΡ ΠΏΠ°ΡΡ ΡΠ»ΠΎΠ² ΠΎ ΠΌΠ°ΠΊΡΠΎΡΠ°Ρ ΠΈ ΠΈΡ Π΄ΡΡΠ·ΡΡΡ β ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΡ .
ΠΠ°ΠΊΡΠΎΡΡ β ΡΡΠΎ Jinja-ΠΏΠ»Π΅ΠΉΡΡ ΠΎΠ»Π΄Π΅ΡΡ, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΌΠΎΠ³ΡΡ ΠΏΠΎΠ΄ΡΡΠ°Π²Π»ΡΡΡ ΡΠ°Π·Π½ΡΡ ΠΏΠΎΠ»Π΅Π·Π½ΡΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ Π² Π°ΡΠ³ΡΠΌΠ΅Π½ΡΡ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠΎΠ². ΠΠ°ΠΏΡΠΈΠΌΠ΅Ρ, ΡΠ°ΠΊ:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
ΡΠ°Π·Π²Π΅ΡΠ½Π΅ΡΡΡ Π² ΡΠΎΠ΄Π΅ΡΠΆΠΈΠΌΠΎΠ΅ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΠΎΠΉ ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ° execution_date
Π² ΡΠΎΡΠΌΠ°ΡΠ΅ YYYY-MM-DD
: 2020-07-14
. Π‘Π°ΠΌΠΎΠ΅ ΠΏΡΠΈΡΡΠ½ΠΎΠ΅, ΡΡΠΎ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΠ΅ ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ° ΠΏΡΠΈΠ±ΠΈΠ²Π°ΡΡΡΡ Π³Π²ΠΎΠ·Π΄ΡΠΌΠΈ ΠΊ ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½Π½ΠΎΠΌΡ ΠΈΠ½ΡΡΠ°Π½ΡΡ ΡΠ°ΡΠΊΠ° (ΠΊΠ²Π°Π΄ΡΠ°ΡΠΈΠΊΡ Π² Tree View), ΠΈ ΠΏΡΠΈ ΠΏΠ΅ΡΠ΅Π·Π°ΠΏΡΡΠΊΠ΅ ΠΏΠ»Π΅ΠΉΡΡ
ΠΎΠ»Π΄Π΅ΡΡ ΡΠ°ΡΠΊΡΠΎΡΡΡΡ Π² ΡΠ΅ ΠΆΠ΅ ΡΠ°ΠΌΡΠ΅ Π·Π½Π°ΡΠ΅Π½ΠΈΡ.
ΠΡΠΈΡΠ²ΠΎΠ΅Π½Π½ΡΠ΅ Π·Π½Π°ΡΠ΅Π½ΠΈΡ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΌΠΎΡΡΠ΅ΡΡ Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΠΊΠ½ΠΎΠΏΠΊΠΈ Rendered Π½Π° ΠΊΠ°ΠΆΠ΄ΠΎΠΌ ΡΠ°ΡΠΊ-ΠΈΠ½ΡΡΠ°Π½ΡΠ΅. ΠΠΎΡ ΡΠ°ΠΊ Ρ ΡΠ°ΡΠΊΠ° Ρ ΠΎΡΠΏΡΠ°Π²ΠΊΠΎΠΉ ΠΏΠΈΡΡΠΌΠ°:
Π ΡΠ°ΠΊ Ρ ΡΠ°ΡΠΊΠΈ Ρ ΠΎΡΠΏΡΠ°Π²ΠΊΠΎΠΉ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ:
ΠΠΎΠ»Π½ΡΠΉ ΡΠΏΠΈΡΠΎΠΊ Π²ΡΡΡΠΎΠ΅Π½Π½ΡΡ
ΠΌΠ°ΠΊΡΠΎΡΠΎΠ² Π΄Π»Ρ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅ΠΉ Π΄ΠΎΡΡΡΠΏΠ½ΠΎΠΉ Π²Π΅ΡΡΠΈΠΈ Π΄ΠΎΡΡΡΠΏΠ΅Π½ Π·Π΄Π΅ΡΡ:
ΠΠΎΠ»Π΅Π΅ ΡΠΎΠ³ΠΎ, Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ², ΠΌΡ ΠΌΠΎΠΆΠ΅ΠΌ ΠΎΠ±ΡΡΠ²Π»ΡΡΡ ΡΠΎΠ±ΡΡΠ²Π΅Π½Π½ΡΠ΅ ΠΌΠ°ΠΊΡΠΎΡΡ, Π½ΠΎ ΡΡΠΎ ΡΠΆΠ΅ ΡΠΎΠ²ΡΠ΅ΠΌ Π΄ΡΡΠ³Π°Ρ ΠΈΡΡΠΎΡΠΈΡ.
ΠΠΎΠΌΠΈΠΌΠΎ ΠΏΡΠ΅Π΄ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½Π½ΡΡ
ΡΡΡΠΊ, ΠΌΡ ΠΌΠΎΠΆΠ΅ΠΌ ΠΏΠΎΠ΄ΡΡΠ°Π²Π»ΡΡΡ Π·Π½Π°ΡΠ΅Π½ΠΈΡ ΡΠ²ΠΎΠΈΡ
ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΡ
(Π²ΡΡΠ΅ Π² ΠΊΠΎΠ΄Π΅ Ρ ΡΠΆΠ΅ ΡΡΠΈΠΌ Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π»ΡΡ). Π‘ΠΎΠ·Π΄Π°Π΄ΠΈΠΌ Π² Admin/Variables
ΠΏΠ°ΡΡ ΡΡΡΠΊ:
ΠΡΡ, ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡΡΡ:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
Π Π·Π½Π°ΡΠ΅Π½ΠΈΠΈ ΠΌΠΎΠΆΠ΅Ρ Π±ΡΡΡ ΡΠΊΠ°Π»ΡΡ, Π° ΠΌΠΎΠΆΠ΅Ρ Π»Π΅ΠΆΠ°ΡΡ ΠΈ JSON. Π ΡΠ»ΡΡΠ°Π΅ JSON-Π°:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
ΠΏΡΠΎΡΡΠΎ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ ΠΏΡΡΡ ΠΊ Π½ΡΠΆΠ½ΠΎΠΌΡ ΠΊΠ»ΡΡΡ: {{ var.json.bot_config.bot.token }}
.
Π‘ΠΊΠ°ΠΆΡ Π±ΡΠΊΠ²Π°Π»ΡΠ½ΠΎ ΠΎΠ΄Π½ΠΎ ΡΠ»ΠΎΠ²ΠΎ ΠΈ ΠΏΠΎΠΊΠ°ΠΆΡ ΠΎΠ΄ΠΈΠ½ ΡΠΊΡΠΈΠ½ΡΠΎΡ ΠΏΡΠΎ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ. Π’ΡΡ Π²ΡΡ ΡΠ»Π΅ΠΌΠ΅Π½ΡΠ°ΡΠ½ΠΎ: Π½Π° ΡΡΡΠ°Π½ΠΈΡΠ΅ Admin/Connections
ΡΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠ΅, ΡΠΊΠ»Π°Π΄ΡΠ²Π°Π΅ΠΌ ΡΡΠ΄Π° Π½Π°ΡΠΈ Π»ΠΎΠ³ΠΈΠ½Ρ/ΠΏΠ°ΡΠΎΠ»ΠΈ ΠΈ Π±ΠΎΠ»Π΅Π΅ ΡΠΏΠ΅ΡΠΈΡΠΈΡΠ½ΡΠ΅ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΡ. ΠΠΎΡ ΡΠ°ΠΊ:
ΠΠ°ΡΠΎΠ»ΠΈ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΈΡΡΠΎΠ²Π°ΡΡ (Π±ΠΎΠ»Π΅Π΅ ΡΡΠ°ΡΠ΅Π»ΡΠ½ΠΎ, ΡΠ΅ΠΌ Π² Π²Π°ΡΠΈΠ°Π½ΡΠ΅ ΠΏΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ), Π° ΠΌΠΎΠΆΠ½ΠΎ Π½Π΅ ΡΠΊΠ°Π·ΡΠ²Π°ΡΡ ΡΠΈΠΏ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ (ΠΊΠ°ΠΊ Ρ ΡΠ΄Π΅Π»Π°Π» Π΄Π»Ρ tg_main
) β Π΄Π΅Π»ΠΎ Π² ΡΠΎΠΌ, ΡΡΠΎ ΡΠΏΠΈΡΠΎΠΊ ΡΠΈΠΏΠΎΠ² Π·Π°ΡΠΈΡ Π² ΠΌΠΎΠ΄Π΅Π»ΡΡ
Airflow ΠΈ ΡΠ°ΡΡΠΈΡΠ΅Π½ΠΈΡ Π±Π΅Π· Π²Π»Π΅Π·Π°Π½ΠΈΡ Π² ΠΈΡΡ
ΠΎΠ΄Π½ΠΈΠΊΠΈ Π½Π΅ ΠΏΠΎΠ΄Π΄Π°Π΅ΡΡΡ (Π΅ΡΠ»ΠΈ Π²Π΄ΡΡΠ³ Ρ ΡΠ΅Π³ΠΎ-ΡΠΎ Π½Π΅ Π΄ΠΎΠ³ΡΠ³Π»ΠΈΠ» β ΠΏΡΠΎΡΡ ΠΌΠ΅Π½Ρ ΠΏΠΎΠΏΡΠ°Π²ΠΈΡΡ), Π½ΠΎ ΠΏΠΎΠ»ΡΡΠΈΡΡ ΠΊΡΠ΅Π΄Ρ ΠΏΡΠΎΡΡΠΎ ΠΏΠΎ ΠΈΠΌΠ΅Π½ΠΈ Π½Π°ΠΌ Π½ΠΈΡΡΠΎ Π½Π΅ ΠΏΠΎΠΌΠ΅ΡΠ°Π΅Ρ.
Π Π΅ΡΠ΅ ΠΌΠΎΠΆΠ½ΠΎ ΡΠ΄Π΅Π»Π°ΡΡ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΉ Ρ ΠΎΠ΄Π½ΠΈΠΌ ΠΈΠΌΠ΅Π½Π΅ΠΌ: Π² ΡΠ°ΠΊΠΎΠΌ ΡΠ»ΡΡΠ°Π΅ ΠΌΠ΅ΡΠΎΠ΄ BaseHook.get_connection()
, ΠΊΠΎΡΠΎΡΡΠΉ Π΄ΠΎΡΡΠ°Π΅Ρ Π½Π°ΠΌ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ ΠΏΠΎ ΠΈΠΌΠ΅Π½ΠΈ, Π±ΡΠ΄Π΅Ρ ΠΎΡΠ΄Π°Π²Π°ΡΡ ΡΠ»ΡΡΠ°ΠΉΠ½ΠΎΠ³ΠΎ ΠΈΠ· Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΈΡ
ΡΡΠ·ΠΎΠΊ (Π±ΡΠ»ΠΎ Π±Ρ Π»ΠΎΠ³ΠΈΡΠ½Π΅Π΅ ΡΠ΄Π΅Π»Π°ΡΡ Round Robin, Π½ΠΎ ΠΎΡΡΠ°Π²ΠΈΠΌ ΡΡΠΎ Π½Π° ΡΠΎΠ²Π΅ΡΡΠΈ ΡΠ°Π·ΡΠ°Π±ΠΎΡΡΠΈΠΊΠΎΠ² Airflow).
Variables ΠΈ Connections, Π±Π΅Π·ΡΡΠ»ΠΎΠ²Π½ΠΎ, ΠΊΠ»Π°ΡΡΠ½ΡΠ΅ ΡΡΠ΅Π΄ΡΡΠ²Π°, Π½ΠΎ Π²Π°ΠΆΠ½ΠΎ Π½Π΅ ΠΏΠΎΡΠ΅ΡΡΡΡ Π±Π°Π»Π°Π½Ρ: ΠΊΠ°ΠΊΠΈΠ΅ ΡΠ°ΡΡΠΈ Π²Π°ΡΠΈΡ ΠΏΠΎΡΠΎΠΊΠΎΠ² Π²Ρ Ρ ΡΠ°Π½ΠΈΡΠ΅ ΡΠΎΠ±ΡΡΠ²Π΅Π½Π½ΠΎ Π² ΠΊΠΎΠ΄Π΅, Π° ΠΊΠ°ΠΊΠΈΠ΅ β ΠΎΡΠ΄Π°Π΅ΡΠ΅ Π½Π° Ρ ΡΠ°Π½Π΅Π½ΠΈΠ΅ Airflow. C ΠΎΠ΄Π½ΠΎΠΉ ΡΡΠΎΡΠΎΠ½Ρ Π±ΡΡΡΡΠΎ ΠΏΠΎΠΌΠ΅Π½ΡΡΡ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, ΡΡΠΈΠΊ ΡΠ°ΡΡΡΠ»ΠΊΠΈ, ΠΌΠΎΠΆΠ΅Ρ Π±ΡΡΡ ΡΠ΄ΠΎΠ±Π½ΠΎ ΡΠ΅ΡΠ΅Π· UI. Π Ρ Π΄ΡΡΠ³ΠΎΠΉ β ΡΡΠΎ Π²ΡΡ-ΡΠ°ΠΊΠΈ Π²ΠΎΠ·Π²ΡΠ°Ρ ΠΊ ΠΌΡΡΠ΅ΠΊΠ»ΠΈΠΊΡ, ΠΎΡ ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΌΡ (Ρ) Ρ ΠΎΡΠ΅Π»ΠΈ ΠΈΠ·Π±Π°Π²ΠΈΡΡΡΡ.
Π Π°Π±ΠΎΡΠ° Ρ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡΠΌΠΈ β ΡΡΠΎ ΠΎΠ΄Π½Π° ΠΈΠ· Π·Π°Π΄Π°Ρ Ρ
ΡΠΊΠΎΠ². ΠΠΎΠΎΠ±ΡΠ΅ Ρ
ΡΠΊΠΈ Airflow β ΡΡΠΎ ΡΠΎΡΠΊΠΈ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ Π΅Π³ΠΎ ΠΊ ΡΡΠΎΡΠΎΠ½Π½ΠΈΠΌ ΡΠ΅ΡΠ²ΠΈΡΠ°ΠΌ ΠΈ Π±ΠΈΠ±Π»ΠΈΠΎΡΠ΅ΠΊΠ°ΠΌ. Π ΠΏΡΠΈΠΌΠ΅ΡΡ, JiraHook
ΠΎΡΠΊΡΠΎΠ΅Ρ Π΄Π»Ρ Π½Π°Ρ ΠΊΠ»ΠΈΠ΅Π½Ρ Π΄Π»Ρ Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΠΈΡ Ρ Jira (ΠΌΠΎΠΆΠ½ΠΎ Π·Π°Π΄Π°ΡΠΊΠΈ ΠΏΠΎΠ΄Π²ΠΈΠ³Π°ΡΡ ΡΡΠ΄Π°-ΡΡΠ΄Π°), Π° Ρ ΠΏΠΎΠΌΠΎΡΡΡ SambaHook
ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡΡΠΈΡΡ Π»ΠΎΠΊΠ°Π»ΡΠ½ΡΠΉ ΡΠ°ΠΉΠ» Π½Π° smb
-ΡΠΎΡΠΊΡ.
Π Π°Π·Π±ΠΈΡΠ°Π΅ΠΌ ΠΊΠ°ΡΡΠΎΠΌΠ½ΡΠΉ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ
Π ΠΌΡ Π²ΠΏΠ»ΠΎΡΠ½ΡΡ ΠΏΠΎΠ΄ΠΎΠ±ΡΠ°Π»ΠΈΡΡ ΠΊ ΡΠΎΠΌΡ, ΡΡΠΎΠ±Ρ ΠΏΠΎΡΠΌΠΎΡΡΠ΅ΡΡ Π½Π° ΡΠΎ, ΠΊΠ°ΠΊ ΡΠ΄Π΅Π»Π°Π½ TelegramBotSendMessage
ΠΠΎΠ΄ commons/operators.py
Ρ ΡΠΎΠ±ΡΡΠ²Π΅Π½Π½ΠΎ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠΎΠΌ:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
ΠΠ΄Π΅ΡΡ, ΠΊΠ°ΠΊ ΠΈ ΠΎΡΡΠ°Π»ΡΠ½ΠΎΠ΅ Π² Airflow, Π²ΡΡ ΠΎΡΠ΅Π½Ρ ΠΏΡΠΎΡΡΠΎ:
- ΠΡΠ½Π°ΡΠ»Π΅Π΄ΠΎΠ²Π°Π»ΠΈΡΡ ΠΎΡ
BaseOperator
, ΠΊΠΎΡΠΎΡΡΠΉ ΡΠ΅Π°Π»ΠΈΠ·ΡΠ΅Ρ Π΄ΠΎΠ²ΠΎΠ»ΡΠ½ΠΎ ΠΌΠ½ΠΎΠ³ΠΎ Airflow-ΡΠΏΠ΅ΡΠΈΡΠΈΡΠ½ΡΡ ΡΡΡΠΊ (ΠΏΠΎΡΠΌΠΎΡΡΠΈΡΠ΅ Π½Π° Π΄ΠΎΡΡΠ³Π΅) - ΠΠ±ΡΡΠ²ΠΈΠ»ΠΈ ΠΏΠΎΠ»Ρ
template_fields
, Π² ΠΊΠΎΡΠΎΡΡΡ Jinja Π±ΡΠ΄Π΅Ρ ΠΈΡΠΊΠ°ΡΡ ΠΌΠ°ΠΊΡΠΎΡΡ Π΄Π»Ρ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠΈ. - ΠΡΠ³Π°Π½ΠΈΠ·ΠΎΠ²Π°Π»ΠΈ ΠΏΡΠ°Π²ΠΈΠ»ΡΠ½ΡΠ΅ Π°ΡΠ³ΡΠΌΠ΅Π½ΡΡ Π΄Π»Ρ
__init__()
, ΡΠ°ΡΡΡΠ°Π²ΠΈΠ»ΠΈ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ, Π³Π΄Π΅ Π½Π°Π΄ΠΎ. - ΠΠ± ΠΈΠ½ΠΈΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΠΈ ΠΏΡΠ΅Π΄ΠΊΠ° ΡΠΎΠΆΠ΅ Π½Π΅ Π·Π°Π±ΡΠ»ΠΈ.
- ΠΡΠΊΡΡΠ»ΠΈ ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²ΡΡΡΠΈΠΉ Ρ
ΡΠΊ
TelegramBotHook
, ΠΏΠΎΠ»ΡΡΠΈΠ»ΠΈ ΠΎΡ Π½Π΅Π³ΠΎ ΠΎΠ±ΡΠ΅ΠΊΡ-ΠΊΠ»ΠΈΠ΅Π½Ρ. - ΠΠ²Π΅ΡΡΠ°ΠΉΠ΄Π½ΡΠ»ΠΈ (ΠΏΠ΅ΡΠ΅ΠΎΠΏΡΠ΅Π΄Π΅Π»ΠΈΠ»ΠΈ) ΠΌΠ΅ΡΠΎΠ΄
BaseOperator.execute()
, ΠΊΠΎΡΠΎΡΡΠΉ Airfow Π±ΡΠ΄Π΅Ρ ΠΏΠΎΠ΄Π΅ΡΠ³ΠΈΠ²Π°ΡΡ, ΠΊΠΎΠ³Π΄Π° Π½Π°ΡΡΡΠΏΠΈΡ Π²ΡΠ΅ΠΌΡ Π·Π°ΠΏΡΡΠΊΠ°ΡΡ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ β Π² Π½Π΅ΠΌ ΠΌΡ ΠΈ ΡΠ΅Π°Π»ΠΈΠ·ΡΠ΅ΠΌ ΠΎΡΠ½ΠΎΠ²Π½ΠΎΠ΅ Π΄Π΅ΠΉΡΡΠ²ΠΈΠ΅, Π½Π° Π·Π°Π±ΡΠ² Π·Π°Π»ΠΎΠ³ΠΈΡΠΎΠ²Π°ΡΡΡΡ. (ΠΠΎΠ³ΠΈΡΡΠ΅ΠΌΡΡ, ΠΊΡΡΠ°ΡΠΈ, ΠΏΡΡΠΌΠΎ Π²stdout
ΠΈstderr
β Airflow Π²ΡΡ ΠΏΠ΅ΡΠ΅Ρ Π²Π°ΡΠΈΡ, ΠΊΡΠ°ΡΠΈΠ²ΠΎ ΠΎΠ±Π΅ΡΠ½Π΅Ρ, ΡΠ°Π·Π»ΠΎΠΆΠΈΡ, ΠΊΡΠ΄Π° Π½Π°Π΄ΠΎ.)
ΠΠ°Π²Π°ΠΉΡΠ΅ ΡΠΌΠΎΡΡΠ΅ΡΡ, ΡΡΠΎ Ρ Π½Π°Ρ Π² commons/hooks.py
. ΠΠ΅ΡΠ²Π°Ρ ΡΠ°ΡΡΡ ΡΠ°ΠΉΠ»ΠΈΠΊΠ°, Ρ ΡΠ°ΠΌΠΈΠΌ Ρ
ΡΠΊΠΎΠΌ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Π― Π΄Π°ΠΆΠ΅ Π½Π΅ Π·Π½Π°Ρ, ΡΡΠΎ ΡΡΡ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΠ±ΡΡΡΠ½ΡΡΡ, ΠΏΡΠΎΡΡΠΎ ΠΎΡΠΌΠ΅ΡΡ Π²Π°ΠΆΠ½ΡΠ΅ ΠΌΠΎΠΌΠ΅Π½ΡΡ:
- ΠΠ°ΡΠ»Π΅Π΄ΡΠ΅ΠΌΡΡ, Π΄ΡΠΌΠ°Π΅ΠΌ Π½Π°Π΄ Π°ΡΠ³ΡΠΌΠ΅Π½ΡΠ°ΠΌΠΈ β Π² Π±ΠΎΠ»ΡΡΠΈΠ½ΡΡΠ²Π΅ ΡΠ»ΡΡΠ°Π΅Π² ΠΎΠ½ Π±ΡΠ΄Π΅Ρ ΠΎΠ΄ΠΈΠ½:
conn_id
; - ΠΠ΅ΡΠ΅ΠΎΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΡΠ°Π½Π΄Π°ΡΡΠ½ΡΠ΅ ΠΌΠ΅ΡΠΎΠ΄Ρ: Ρ ΠΎΠ³ΡΠ°Π½ΠΈΡΠΈΠ»ΡΡ
get_conn()
, Π² ΠΊΠΎΡΠΎΡΠΎΠΌ Ρ ΠΏΠΎΠ»ΡΡΠ°Ρ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΡ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ ΠΏΠΎ ΠΈΠΌΠ΅Π½ΠΈ ΠΈ Π²ΡΠ΅Π³ΠΎ-Π½Π°Π²ΡΠ΅Π³ΠΎ Π΄ΠΎΡΡΠ°Ρ ΡΠ΅ΠΊΡΠΈΡextra
(ΡΡΠΎ ΠΏΠΎΠ»Π΅ Π΄Π»Ρ JSON), Π² ΠΊΠΎΡΠΎΡΡΡ Ρ (ΠΏΠΎ ΡΠ²ΠΎΠ΅ΠΉ ΠΆΠ΅ ΠΈΠ½ΡΡΡΡΠΊΡΠΈΠΈ!) ΠΏΠΎΠ»ΠΎΠΆΠΈΠ» ΡΠΎΠΊΠ΅Π½ Telegram-Π±ΠΎΡΠ°:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - Π‘ΠΎΠ·Π΄Π°Ρ ΡΠΊΠ·Π΅ΠΌΠΏΠ»ΡΡ Π½Π°ΡΠ΅Π³ΠΎ
TelegramBot
, ΠΎΡΠ΄Π°Π²Π°Ρ Π΅ΠΌΡ ΡΠΆΠ΅ ΠΊΠΎΠ½ΠΊΡΠ΅ΡΠ½ΡΠΉ ΡΠΎΠΊΠ΅Π½.
ΠΠΎΡ ΠΈ Π²ΡΡ. ΠΠΎΠ»ΡΡΠΈΡΡ ΠΊΠ»ΠΈΠ΅Π½Ρ ΠΈΠ· Ρ
ΡΠΊΠ° ΠΌΠΎΠΆΠ½ΠΎ c ΠΏΠΎΠΌΠΎΡΡΡ TelegramBotHook().clent
ΠΈΠ»ΠΈ TelegramBotHook().get_conn()
.
Π Π²ΡΠΎΡΠ°Ρ ΡΠ°ΡΡΡ ΡΠ°ΠΉΠ»ΠΈΠΊΠ°, Π² ΠΊΠΎΡΠΎΡΠΎΠΌ Ρ ΡΠ΄Π΅Π»Π°ΡΡ ΠΌΠΈΠΊΡΠΎΠΎΠ±ΡΡΡΠΎΡΠΊΡ Π΄Π»Ρ Telegram REST API, ΡΡΠΎΠ±Ρ Π½Π΅ ΡΠ°ΡΠΈΡΡ ΡΠΎΡ ΠΆΠ΅ python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
ΠΡΠ°Π²ΠΈΠ»ΡΠ½ΡΠΉ ΠΏΡΡΡ β ΡΠ»ΠΎΠΆΠΈΡΡ Π²ΡΡ ΡΡΠΎ:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
β Π² ΠΏΠ»Π°Π³ΠΈΠ½, ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ Π² ΠΎΠ±ΡΠ΅Π΄ΠΎΡΡΡΠΏΠ½ΡΠΉ ΡΠ΅ΠΏΠΎΠ·ΠΈΡΠΎΡΠΈΠΉ, ΠΈ ΠΎΡΠ΄Π°ΡΡ Π² Open Source.
ΠΠΎΠΊΠ° ΠΌΡ Π²ΡΡ ΡΡΠΎ ΠΈΠ·ΡΡΠ°Π»ΠΈ, Π½Π°ΡΠΈ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½ΠΈΡ ΠΎΡΡΠ΅ΡΠΎΠ² ΡΡΠΏΠ΅Π»ΠΈ ΡΡΠΏΠ΅ΡΠ½ΠΎ Π·Π°Π²Π°Π»ΠΈΡΡΡΡ ΠΈ ΠΎΡΠΏΡΠ°Π²ΠΈΡΡ ΠΌΠ½Π΅ Π² ΠΊΠ°Π½Π°Π» ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΎΠ± ΠΎΡΠΈΠ±ΠΊΠ΅. ΠΠΎΠΉΠ΄Ρ ΠΏΡΠΎΠ²Π΅ΡΡΡΡ, ΡΡΠΎ ΠΎΠΏΡΡΡ Π½Π΅ ΡΠ°ΠΊ…
Π Π½Π°ΡΠ΅ΠΌ Π΄Π°Π³Π΅ ΡΡΠΎ-ΡΠΎ ΡΠ»ΠΎΠΌΠ°Π»ΠΎΡΡ! Π Π½ΠΈ ΡΡΠΎΠ³ΠΎ Π»ΠΈ ΠΌΡ ΠΆΠ΄Π°Π»ΠΈ? ΠΠΌΠ΅Π½Π½ΠΎ!
ΠΠ°Π»ΠΈΠ²Π°ΡΡ-ΡΠΎ Π±ΡΠ΄Π΅ΡΡ?
Π§ΡΠ²ΡΡΠ²ΡΠ΅ΡΠ΅, ΡΡΠΎ-ΡΠΎ Ρ ΠΏΡΠΎΠΏΡΡΡΠΈΠ»? ΠΡΠΎΠ΄Π΅ Π±Ρ ΠΎΠ±Π΅ΡΠ°Π» Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· SQL Server Π² Vertica ΠΏΠ΅ΡΠ΅Π»ΠΈΠ²Π°ΡΡ, ΠΈ ΡΡΡ Π²Π·ΡΠ» ΠΈ ΡΡΠ΅Ρ Π°Π» Ρ ΡΠ΅ΠΌΡ, Π½Π΅Π³ΠΎΠ΄ΡΠΉ!
ΠΠ»ΠΎΠ΄Π΅ΡΠ½ΠΈΠ΅ ΡΡΠΎ Π±ΡΠ»ΠΎ Π½Π°ΠΌΠ΅ΡΠ΅Π½Π½ΡΠΌ, Ρ ΠΏΡΠΎΡΡΠΎ ΠΎΠ±ΡΠ·Π°Π½ Π±ΡΠ» ΡΠ°ΡΡΠΈΡΡΠΎΠ²Π°ΡΡ Π²Π°ΠΌ ΠΊΠΎΠ΅-ΠΊΠ°ΠΊΡΡ ΡΠ΅ΡΠΌΠΈΠ½ΠΎΠ»ΠΎΠ³ΠΈΡ. Π’Π΅ΠΏΠ΅ΡΡ ΠΌΠΎΠΆΠ½ΠΎ Π΅Ρ Π°ΡΡ Π΄Π°Π»ΡΡΠ΅.
ΠΠ»Π°Π½ Ρ Π½Π°Ρ Π±ΡΠ» ΡΠ°ΠΊΠΎΠΉ:
- Π‘Π΄Π΅Π»Π°ΡΡ Π΄Π°Π³
- ΠΠ°Π³Π΅Π½Π΅ΡΠΈΡΡ ΡΠ°ΡΠΊΠΈ
- ΠΠΎΡΠΌΠΎΡΡΠ΅ΡΡ, ΠΊΠ°ΠΊ Π²ΡΡ ΠΊΡΠ°ΡΠΈΠ²ΠΎ
- ΠΡΠΈΡΠ²Π°ΠΈΠ²Π°ΡΡ Π·Π°Π»ΠΈΠ²ΠΊΠ°ΠΌ Π½ΠΎΠΌΠ΅ΡΠ° ΡΠ΅ΡΡΠΈΠΉ
- ΠΠ°Π±ΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· SQL Server
- ΠΠΎΠ»ΠΎΠΆΠΈΡΡ Π΄Π°Π½Π½ΡΠ΅ Π² Vertica
- Π‘ΠΎΠ±ΡΠ°ΡΡ ΡΡΠ°ΡΠΈΡΡΠΈΠΊΡ
ΠΡΠ°ΠΊ, ΡΡΠΎΠ±Ρ Π²ΡΡ ΡΡΠΎ Π·Π°ΠΏΡΡΡΠΈΡΡ, Ρ ΡΠ΄Π΅Π»Π°Π» ΠΌΠ°Π»Π΅Π½ΡΠΊΠΎΠ΅ Π΄ΠΎΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ ΠΊ Π½Π°ΡΠ΅ΠΌΡ docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
Π’Π°ΠΌ ΠΌΡ ΠΏΠΎΠ΄Π½ΠΈΠΌΠ°Π΅ΠΌ:
- Vertica ΠΊΠ°ΠΊ Ρ
ΠΎΡΡ
dwh
Ρ ΡΠ°ΠΌΡΠΌΠΈ Π΄Π΅ΡΠΎΠ»ΡΠ½ΡΠΌΠΈ Π½Π°ΡΡΡΠΎΠΉΠΊΠ°ΠΌΠΈ, - ΡΡΠΈ ΡΠΊΠ·Π΅ΠΌΠΏΠ»ΡΡΠ° SQL Server,
- Π½Π°ΠΏΠΎΠ»Π½ΡΠ΅ΠΌ Π±Π°Π·Ρ Π² ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΡ
ΠΊΠΎΠ΅-ΠΊΠ°ΠΊΠΈΠΌΠΈ Π΄Π°Π½Π½ΡΠΌΠΈ (Π½ΠΈ Π² ΠΊΠΎΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ Π½Π΅ Π·Π°Π³Π»ΡΠ΄ΡΠ²Π°ΠΉΡΠ΅ Π²
mssql_init.py
!)
ΠΠ°ΠΏΡΡΠΊΠ°Π΅ΠΌ Π²ΡΡ Π΄ΠΎΠ±ΡΠΎ Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΡΡΡΡ Π±ΠΎΠ»Π΅Π΅ ΡΠ»ΠΎΠΆΠ½ΠΎΠΉ, ΡΠ΅ΠΌ Π² ΠΏΡΠΎΡΠ»ΡΠΉ ΡΠ°Π·, ΠΊΠΎΠΌΠ°Π½Π΄Ρ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Π§ΡΠΎ Π½Π°Π³Π΅Π½Π΅ΡΠΈΡΠΎΠ²Π°Π» Π½Π°Ρ ΡΡΠ΄ΠΎΡΠ°Π½Π΄ΠΎΠΌΠ°ΠΉΠ·Π΅Ρ, ΠΌΠΎΠΆΠ½ΠΎ, Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π²ΡΠΈΡΡ ΠΏΡΠ½ΠΊΡΠΎΠΌ Data Profiling/Ad Hoc Query
:
ΠΠ»Π°Π²Π½ΠΎΠ΅, Π½Π΅ ΠΏΠΎΠΊΠ°Π·ΡΠ²Π°ΡΡ ΡΡΠΎ Π°Π½Π°Π»ΠΈΡΠΈΠΊΠ°ΠΌ
ΠΠΎΠ΄ΡΠΎΠ±Π½ΠΎ ΠΎΡΡΠ°Π½Π°Π²Π»ΠΈΠ²Π°ΡΡΡΡ Π½Π° ETL-ΡΠ΅ΡΡΠΈΡΡ Ρ Π½Π΅ Π±ΡΠ΄Ρ, ΡΠ°ΠΌ Π²ΡΡ ΡΡΠΈΠ²ΠΈΠ°Π»ΡΠ½ΠΎ: Π΄Π΅Π»Π°Π΅ΠΌ Π±Π°Π·Ρ, Π² Π½Π΅ΠΉ ΡΠ°Π±Π»ΠΈΡΠΊΡ, ΠΎΠ±ΠΎΡΠ°ΡΠΈΠ²Π°Π΅ΠΌ Π²ΡΡ ΠΌΠ΅Π½Π΅Π΄ΠΆΠ΅ΡΠΎΠΌ ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ°, ΠΈ ΡΠ΅ΠΏΠ΅ΡΡ Π΄Π΅Π»Π°Π΅ΠΌ ΡΠ°ΠΊ:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
ΠΠ°ΡΡΠ°Π»Π° ΠΏΠΎΡΠ° Π·Π°Π±ΡΠ°ΡΡ Π½Π°ΡΠΈ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· Π½Π°ΡΠΈΡ ΠΏΠΎΠ»ΡΡΠΎΡΠ° ΡΠΎΡΠ΅Π½ ΡΠ°Π±Π»ΠΈΡ. Π‘Π΄Π΅Π»Π°Π΅ΠΌ ΡΡΠΎ Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΠΎΡΠ΅Π½Ρ Π½Π΅Π·Π°ΡΠ΅ΠΉΠ»ΠΈΠ²ΡΡ ΡΡΡΠΎΡΠ΅ΠΊ:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Π‘ ΠΏΠΎΠΌΠΎΡΡΡ Ρ
ΡΠΊΠ° ΠΏΠΎΠ»ΡΡΠΈΠΌ ΠΈΠ· Airflow
pymssql
-ΠΊΠΎΠ½Π½Π΅ΠΊΡ - Π Π·Π°ΠΏΡΠΎΡ ΠΏΠΎΠ΄ΡΡΠ°Π²ΠΈΠΌ ΠΎΠ³ΡΠ°Π½ΠΈΡΠ΅Π½ΠΈΠ΅ Π² Π²ΠΈΠ΄Π΅ Π΄Π°ΡΡ β Π² ΡΡΠ½ΠΊΡΠΈΡ Π΅Ρ ΠΏΠΎΠ΄Π±ΡΠΎΡΠΈΡ ΡΠ°Π±Π»ΠΎΠ½ΠΈΠ·Π°ΡΠΎΡ.
- Π‘ΠΊΠ°ΡΠΌΠ»ΠΈΠ²Π°Π΅ΠΌ Π½Π°Ρ Π·Π°ΠΏΡΠΎΡ
pandas
, ΠΊΠΎΡΠΎΡΡΠΉ Π΄ΠΎΡΡΠ°Π½Π΅Ρ Π΄Π»Ρ Π½Π°ΡDataFrame
β ΠΎΠ½ Π½Π°ΠΌ ΠΏΡΠΈΠ³ΠΎΠ΄ΠΈΡΡΡ Π² Π΄Π°Π»ΡΠ½Π΅ΠΉΡΠ΅ΠΌ.
Π― ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡ ΠΏΠΎΠ΄ΡΡΠ°Π½ΠΎΠ²ΠΊΡ
{dt}
Π²ΠΌΠ΅ΡΡΠΎ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΠ° Π·Π°ΠΏΡΠΎΡΠ°%s
Π½Π΅ ΠΏΠΎΡΠΎΠΌΡ, ΡΡΠΎ Ρ Π·Π»ΠΎΠ±Π½ΡΠΉ ΠΡΡΠ°ΡΠΈΠ½ΠΎ, Π° ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎpandas
Π½Π΅ ΠΌΠΎΠΆΠ΅Ρ ΡΠΎΠ²Π»Π°Π΄Π°ΡΡ Ρpymssql
ΠΈ ΠΏΠΎΠ΄ΡΠΎΠ²ΡΠ²Π°Π΅Ρ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅ΠΌΡparams: List
, Ρ ΠΎΡΡ ΡΠΎΡ ΠΎΡΠ΅Π½Ρ Ρ ΠΎΡΠ΅Ρtuple
.
Π’Π°ΠΊΠΆΠ΅ ΠΎΠ±ΡΠ°ΡΠΈΡΠ΅ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅, ΡΡΠΎ ΡΠ°Π·ΡΠ°Π±ΠΎΡΡΠΈΠΊpymssql
ΡΠ΅ΡΠΈΠ» Π±ΠΎΠ»ΡΡΠ΅ Π΅Π³ΠΎ Π½Π΅ ΠΏΠΎΠ΄Π΄Π΅ΡΠΆΠΈΠ²Π°ΡΡ, ΠΈ ΡΠ°ΠΌΠΎΠ΅ Π²ΡΠ΅ΠΌΡ ΡΡΠ΅Ρ Π°ΡΡ Π½Π°pyodbc
.
ΠΠΎΡΠΌΠΎΡΡΠΈΠΌ, ΡΠ΅ΠΌ Airflow Π½Π°ΡΠΏΠΈΠ³ΠΎΠ²Π°Π» Π°ΡΠ³ΡΠΌΠ΅Π½ΡΡ Π½Π°ΡΠΈΡ ΡΡΠ½ΠΊΡΠΈΠΉ:
ΠΡΠ»ΠΈ Π΄Π°Π½Π½ΡΡ Π½Π΅ ΠΎΠΊΠ°Π·Π°Π»ΠΎΡΡ, ΡΠΎ ΠΏΡΠΎΠ΄ΠΎΠ»ΠΆΠ°ΡΡ ΡΠΌΡΡΠ»Π° Π½Π΅Ρ. ΠΠΎ ΡΡΠΈΡΠ°ΡΡ Π·Π°Π»ΠΈΠ²ΠΊΡ ΡΡΠΏΠ΅ΡΠ½ΠΎΠΉ ΡΠΎΠΆΠ΅ ΡΡΡΠ°Π½Π½ΠΎ. ΠΠΎ ΡΡΠΎ ΠΈ Π½Π΅ ΠΎΡΠΈΠ±ΠΊΠ°. Π-Π°-Π°, ΡΡΠΎ Π΄Π΅Π»Π°ΡΡ?! Π Π²ΠΎΡ ΡΡΠΎ:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
ΡΠΊΠ°ΠΆΠ΅Ρ Airflow, ΡΡΠΎ ΠΎΡΠΈΠ±ΠΊΠΈ, ΡΠΎΠ±ΡΡΠ²Π΅Π½Π½ΠΎ Π½Π΅Ρ, Π° ΡΠ°ΡΠΊ ΠΌΡ ΠΏΡΠΎΠΏΡΡΠΊΠ°Π΅ΠΌ. Π ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡΠ΅ Π±ΡΠ΄Π΅Ρ Π½Π΅ Π·Π΅Π»Π΅Π½ΡΠΉ ΠΈ Π½Π΅ ΠΊΡΠ°ΡΠ½ΡΠΉ ΠΊΠ²Π°Π΄ΡΠ°ΡΠΈΠΊ, Π° ΡΠ²Π΅ΡΠ° pink.
ΠΠΎΠ΄Π±ΡΠΎΡΠΈΠΌ Π½Π°ΡΠΈΠΌ Π΄Π°Π½Π½ΡΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
Π ΠΈΠΌΠ΅Π½Π½ΠΎ:
- ΠΠ, ΠΈΠ· ΠΊΠΎΡΠΎΡΠΎΠΉ ΠΌΡ Π·Π°Π±ΡΠ°Π»ΠΈ Π·Π°ΠΊΠ°Π·Ρ,
- ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ Π½Π°ΡΠ΅ΠΉ Π·Π°Π»ΠΈΠ²Π°ΡΡΠ΅ΠΉ ΡΠ΅ΡΡΠΈΠΈ (ΠΎΠ½Π° Π±ΡΠ΄Π΅Ρ ΡΠ°Π·Π½ΠΎΠΉ Π½Π° ΠΊΠ°ΠΆΠ΄ΡΠΉ ΡΠ°ΡΠΊ),
- Π₯ΡΡ ΠΎΡ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠ° ΠΈ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° Π·Π°ΠΊΠ°Π·Π° β ΡΡΠΎΠ±Ρ Π² ΠΊΠΎΠ½Π΅ΡΠ½ΠΎΠΉ Π±Π°Π·Π΅ (Π³Π΄Π΅ Π²ΡΡ ΡΡΡΠΏΠ΅ΡΡΡ Π² ΠΎΠ΄Π½Ρ ΡΠ°Π±Π»ΠΈΡΡ) Ρ Π½Π°Ρ Π±ΡΠ» ΡΠ½ΠΈΠΊΠ°Π»ΡΠ½ΡΠΉ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ Π·Π°ΠΊΠ°Π·Π°.
ΠΡΡΠ°Π»ΡΡ ΠΏΡΠ΅Π΄ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΠΉ ΡΠ°Π³: Π·Π°Π»ΠΈΡΡ Π²ΡΡ Π² Vertica. Π, ΠΊΠ°ΠΊ Π½ΠΈ ΡΡΡΠ°Π½Π½ΠΎ, ΠΎΠ΄ΠΈΠ½ ΠΈΠ· ΡΠ°ΠΌΡΡ ΡΡΡΠ΅ΠΊΡΠ½ΡΡ ΡΡΡΠ΅ΠΊΡΠΈΠ²Π½ΡΡ ΡΠΏΠΎΡΠΎΠ±ΠΎΠ² ΡΠ΄Π΅Π»Π°ΡΡ ΡΡΠΎ β ΡΠ΅ΡΠ΅Π· CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- ΠΡ Π΄Π΅Π»Π°Π΅ΠΌ ΡΠΏΠ΅ΡΠΏΡΠΈΡΠΌΠ½ΠΈΠΊ
StringIO
. pandas
Π»ΡΠ±Π΅Π·Π½ΠΎ ΡΠ»ΠΎΠΆΠΈΡ Π² Π½Π΅Π³ΠΎ Π½Π°ΡDataFrame
Π² Π²ΠΈΠ΄Π΅CSV
-ΡΡΡΠΎΠΊ.- ΠΡΠΊΡΠΎΠ΅ΠΌ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠ΅ ΠΊ Π½Π°ΡΠ΅ΠΉ Π»ΡΠ±ΠΈΠΌΠΎΠΉ Vertica Ρ ΡΠΊΠΎΠΌ.
- Π ΡΠ΅ΠΏΠ΅ΡΡ Ρ ΠΏΠΎΠΌΠΎΡΡΡ
copy()
ΠΎΡΠΏΡΠ°Π²ΠΈΠΌ Π½Π°ΡΠΈ Π΄Π°Π½Π½ΡΠ΅ ΠΏΡΡΠΌΠΎ Π² ΠΠ΅ΡΡΠΈΠΊΡ!
ΠΠ· Π΄ΡΠ°ΠΉΠ²Π΅ΡΠ° Π·Π°Π±Π΅ΡΠ΅ΠΌ, ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΡΡΠΎΡΠ΅ΠΊ Π·Π°ΡΡΠΏΠ°Π»ΠΎΡΡ, ΠΈ ΡΠΊΠ°ΠΆΠ΅ΠΌ ΠΌΠ΅Π½Π΅Π΄ΠΆΠ΅ΡΡ ΡΠ΅ΡΡΠΈΠΈ, ΡΡΠΎ Π²ΡΡ ΠΠ:
session.loaded_rows = cursor.rowcount
session.successful = True
ΠΠΎΡ ΠΈ Π²ΡΡ.
ΠΠ° ΠΏΡΠΎΠ΄Π΅ ΠΌΡ ΡΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠ΅Π»Π΅Π²ΡΡ ΡΠ°Π±Π»ΠΈΡΠΊΡ Π²ΡΡΡΠ½ΡΡ. ΠΠ΄Π΅ΡΡ ΠΆΠ΅ Ρ ΠΏΠΎΠ·Π²ΠΎΠ»ΠΈΠ» ΡΠ΅Π±Π΅ Π½Π΅Π±ΠΎΠ»ΡΡΠΎΠΉ Π°Π²ΡΠΎΠΌΠ°Ρ:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Π― Ρ ΠΏΠΎΠΌΠΎΡΡΡ
VerticaOperator()
ΡΠΎΠ·Π΄Π°Ρ ΡΡ Π΅ΠΌΡ ΠΠ ΠΈ ΡΠ°Π±Π»ΠΈΡΡ (Π΅ΡΠ»ΠΈ ΠΈΡ Π΅ΡΠ΅ Π½Π΅Ρ, Π΅ΡΡΠ΅ΡΡΠ²Π΅Π½Π½ΠΎ). ΠΠ»Π°Π²Π½ΠΎΠ΅, ΠΏΡΠ°Π²ΠΈΠ»ΡΠ½ΠΎ ΡΠ°ΡΡΡΠ°Π²ΠΈΡΡ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
ΠΠΎΠ΄Π²ΠΎΠ΄ΠΈΠΌ ΠΈΡΠΎΠ³ΠΈ
β ΠΡ Π²ΠΎΡ, β ΡΠΊΠ°Π·Π°Π» ΠΌΡΡΠΎΠ½ΠΎΠΊ, β Π½Π΅ ΠΏΡΠ°Π²Π΄Π° Π»ΠΈ, ΡΠ΅ΠΏΠ΅ΡΡ
Π’Ρ ΡΠ±Π΅Π΄ΠΈΠ»ΡΡ, ΡΡΠΎ Π² Π»Π΅ΡΡ Ρ ΡΠ°ΠΌΡΠΉ ΡΡΡΠ°ΡΠ½ΡΠΉ Π·Π²Π΅ΡΡ?
ΠΠΆΡΠ»ΠΈΡ ΠΠΎΠ½Π°Π»ΡΠ΄ΡΠΎΠ½, Β«ΠΡΡΡΡΠ°Π»ΠΎΒ»
ΠΡΠΌΠ°Ρ, Π΅ΡΠ»ΠΈ Π±Ρ ΠΌΡ Ρ ΠΌΠΎΠΈΠΌΠΈ ΠΊΠΎΠ»Π»Π΅Π³Π°ΠΌΠΈ ΡΡΡΡΠΎΠΈΠ»ΠΈ ΡΠΎΡΠ΅Π²Π½ΠΎΠ²Π°Π½ΠΈΠ΅: ΠΊΡΠΎ Π±ΡΡΡΡΠ΅Π΅ ΡΠΎΡΡΠ°Π²ΠΈΡ ΠΈ Π·Π°ΠΏΡΡΡΠΈΡ Ρ Π½ΡΠ»Ρ ETL-ΠΏΡΠΎΡΠ΅ΡΡ: ΠΎΠ½ΠΈ ΡΠΎ ΡΠ²ΠΎΠΈΠΌΠΈ SSIS ΠΈ ΠΌΡΡΠΊΠΎΠΉ ΠΈ Ρ Ρ Airflowβ¦ Π ΠΏΠΎΡΠΎΠΌ Π±Ρ ΠΌΡ Π΅ΡΠ΅ ΡΡΠ°Π²Π½ΠΈΠ»ΠΈ ΡΠ΄ΠΎΠ±ΡΡΠ²ΠΎ ΡΠΎΠΏΡΠΎΠ²ΠΎΠΆΠ΄Π΅Π½ΠΈΡβ¦ Π£Ρ , Π΄ΡΠΌΠ°Ρ, Π²Ρ ΡΠΎΠ³Π»Π°ΡΠΈΡΠ΅ΡΡ, ΡΡΠΎ Ρ ΠΎΠ±ΠΎΠΉΠ΄Ρ ΠΈΡ ΠΏΠΎ Π²ΡΠ΅ΠΌ ΡΡΠΎΠ½ΡΠ°ΠΌ!
ΠΡΠ»ΠΈ ΠΆΠ΅ ΡΡΡΡ-ΡΡΡΡ ΠΏΠΎΡΠ΅ΡΡΠ΅Π·Π½Π΅Π΅, ΡΠΎ Apache Airflow β Π·Π° ΡΡΠ΅Ρ ΠΎΠΏΠΈΡΠ°Π½ΠΈΡ ΠΏΡΠΎΡΠ΅ΡΡΠΎΠ² Π² Π²ΠΈΠ΄Π΅ ΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΠ½ΠΎΠ³ΠΎ ΠΊΠΎΠ΄Π° β ΡΠ΄Π΅Π»Π°Π» ΠΌΠΎΡ ΡΠ°Π±ΠΎΡΡ Π³ΠΎΡΠ°Π·Π΄ΠΎ ΡΠ΄ΠΎΠ±Π½Π΅Π΅ ΠΈ ΠΏΡΠΈΡΡΠ½Π΅Π΅.
ΠΠ³ΠΎ ΠΆΠ΅ Π½Π΅ΠΎΠ³ΡΠ°Π½ΠΈΡΠ΅Π½Π½Π°Ρ ΡΠ°ΡΡΠΈΡΡΠ΅ΠΌΠΎΡΡΡ: ΠΊΠ°ΠΊ Π² ΠΏΠ»Π°Π½Π΅ ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ², ΡΠ°ΠΊ ΠΈ ΠΏΡΠ΅Π΄ΡΠ°ΡΠΏΠΎΠ»ΠΎΠΆΠ΅Π½Π½ΠΎΡΡΡ ΠΊ ΠΌΠ°ΡΡΡΠ°Π±ΠΈΡΡΠ΅ΠΌΠΎΡΡΠΈ β Π΄Π°ΡΡ Π²Π°ΠΌ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΏΡΠΈΠΌΠ΅Π½ΡΡΡ Airflow ΠΏΡΠ°ΠΊΡΠΈΡΠ΅ΡΠΊΠΈ Π² Π»ΡΠ±ΠΎΠΉ ΠΎΠ±Π»Π°ΡΡΠΈ: Ρ ΠΎΡΡ Π² ΠΏΠΎΠ»Π½ΠΎΠΌ ΡΠΈΠΊΠ»Π΅ ΡΠ±ΠΎΡΠ°, ΠΏΠΎΠ΄Π³ΠΎΡΠΎΠ²ΠΊΠΈ ΠΈ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠΈ Π΄Π°Π½Π½ΡΡ , Ρ ΠΎΡΡ Π² Π·Π°ΠΏΡΡΠΊΠ΅ ΡΠ°ΠΊΠ΅Ρ (Π½Π° ΠΠ°ΡΡ, ΠΊΠΎΠ½Π΅ΡΠ½ΠΎ ΠΆΠ΅).
Π§Π°ΡΡΡ Π·Π°ΠΊΠ»ΡΡΠΈΡΠ΅Π»ΡΠ½Π°Ρ, ΡΠΏΡΠ°Π²ΠΎΡΠ½ΠΎ-ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΠΎΠ½Π½Π°Ρ
ΠΡΠ°Π±Π»ΠΈ, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΌΡ ΡΠΎΠ±ΡΠ°Π»ΠΈ Π·Π° Π²Π°Ρ
start_date
. ΠΠ°, ΡΡΠΎ ΡΠΆΠ΅ Π»ΠΎΠΊΠ°Π»ΡΠ½ΡΠΉ ΠΌΠ΅ΠΌΠ°ΡΠΈΠΊ. Π§Π΅ΡΠ΅Π· Π³Π»Π°Π²Π½ΡΠΉ Π°ΡΠ³ΡΠΌΠ΅Π½Ρ Π΄Π°Π³Π°start_date
ΠΏΡΠΎΡ ΠΎΠ΄ΡΡ Π²ΡΠ΅. ΠΡΠ°ΡΠΊΠΎ, Π΅ΡΠ»ΠΈ ΡΠΊΠ°Π·Π°ΡΡ Π²start_date
ΡΠ΅ΠΊΡΡΡΡ Π΄Π°ΡΡ, Π° Π²schedule_interval
β ΠΎΠ΄ΠΈΠ½ Π΄Π΅Π½Ρ, ΡΠΎ DAG Π·Π°ΠΏΡΡΡΠΈΡΡΡ Π·Π°Π²ΡΡΠ° Π½Π΅ ΡΠ°Π½ΡΡΠ΅.start_date = datetime(2020, 7, 7, 0, 1, 2)
Π Π±ΠΎΠ»ΡΡΠ΅ Π½ΠΈΠΊΠ°ΠΊΠΈΡ ΠΏΡΠΎΠ±Π»Π΅ΠΌ.
Π‘ Π½ΠΈΠΌ ΠΆΠ΅ ΡΠ²ΡΠ·Π°Π½Π° ΠΈ Π΅ΡΠ΅ ΠΎΠ΄Π½Π° ΠΎΡΠΈΠ±ΠΊΠ° Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΡ:
Task is missing the start_date parameter
, ΠΊΠΎΡΠΎΡΠ°Ρ ΡΠ°ΡΠ΅ Π²ΡΠ΅Π³ΠΎ Π³ΠΎΠ²ΠΎΡΠΈΡ ΠΎ ΡΠΎΠΌ, ΡΡΠΎ Π²Ρ Π·Π°Π±ΡΠ»ΠΈ ΠΏΡΠΈΠ²ΡΠ·Π°ΡΡ ΠΊ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΡ Π΄Π°Π³.- ΠΡΡ Π½Π° ΠΎΠ΄Π½ΠΎΠΉ ΠΌΠ°ΡΠΈΠ½Π΅. ΠΠ°, ΠΈ Π±Π°Π·Ρ (ΡΠ°ΠΌΠΎΠ³ΠΎ Airflow ΠΈ Π½Π°ΡΠ΅ΠΉ ΠΎΠ±ΠΌΠ°Π·ΠΊΠΈ), ΠΈ Π²Π΅Π±-ΡΠ΅ΡΠ²Π΅Ρ, ΠΈ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊ, ΠΈ Π²ΠΎΡΠΊΠ΅ΡΡ. Π ΠΎΠ½ΠΎ Π΄Π°ΠΆΠ΅ ΡΠ°Π±ΠΎΡΠ°Π»ΠΎ. ΠΠΎ ΡΠΎ Π²ΡΠ΅ΠΌΠ΅Π½Π΅ΠΌ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ Π·Π°Π΄Π°Ρ Ρ ΡΠ΅ΡΠ²ΠΈΡΠΎΠ² ΡΠΎΡΠ»ΠΎ, ΠΈ ΠΊΠΎΠ³Π΄Π° PostgreSQL ΡΡΠ°Π» ΠΎΡΠ΄Π°Π²Π°ΡΡ ΠΎΡΠ²Π΅Ρ ΠΏΠΎ ΠΈΠ½Π΄Π΅ΠΊΡΡ Π·Π° 20 Ρ Π²ΠΌΠ΅ΡΡΠΎ 5 ΠΌΡ, ΠΌΡ Π΅Π³ΠΎ Π²Π·ΡΠ»ΠΈ ΠΈ ΡΠ½Π΅ΡΠ»ΠΈ.
- LocalExecutor. ΠΠ°, ΠΌΡ ΡΠΈΠ΄ΠΈΠΌ Π½Π° Π½ΡΠΌ Π΄ΠΎ ΡΠΈΡ ΠΏΠΎΡ, ΠΈ ΠΌΡ ΡΠΆΠ΅ ΠΏΠΎΠ΄ΠΎΡΠ»ΠΈ ΠΊ ΠΊΡΠ°Ρ ΠΏΡΠΎΠΏΠ°ΡΡΠΈ. LocalExecutorβΠ° Π½Π°ΠΌ Π΄ΠΎ ΡΠΈΡ ΠΏΠΎΡ Ρ Π²Π°ΡΠ°Π»ΠΎ, Π½ΠΎ ΡΠ΅ΠΉΡΠ°Ρ ΠΏΡΠΈΡΠ»Π° ΠΏΠΎΡΠ° ΡΠ°ΡΡΠΈΡΠΈΡΡΡΡ ΠΌΠΈΠ½ΠΈΠΌΡΠΌ ΠΎΠ΄Π½ΠΈΠΌ Π²ΠΎΡΠΊΠ΅ΡΠΎΠΌ, ΠΈ ΠΏΡΠΈΠ΄Π΅ΡΡΡ ΠΏΠΎΠ΄Π½Π°ΠΏΡΡΡΡΡΡ, ΡΡΠΎΠ±Ρ ΠΏΠ΅ΡΠ΅Π΅Ρ Π°ΡΡ Π½Π° CeleryExecutor. Π Π²Π²ΠΈΠ΄Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎ Ρ Π½ΠΈΠΌ ΠΌΠΎΠΆΠ½ΠΎ ΡΠ°Π±ΠΎΡΠ°ΡΡ ΠΈ Π½Π° ΠΎΠ΄Π½ΠΎΠΉ ΠΌΠ°ΡΠΈΠ½ΠΎΠΉ, ΡΠΎ Π½ΠΈΡΠ΅Π³ΠΎ Π½Π΅ ΠΎΡΡΠ°Π½Π°Π²Π»ΠΈΠ²Π°Π΅Ρ ΠΎΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΡ Celery Π΄Π°ΠΆΠ΅ Π½Π΅ ΡΠ΅ΡΠ²Π΅ΡΠ΅, ΠΊΠΎΡΠΎΡΡΠΉ Β«Π΅ΡΡΠ΅ΡΡΠ²Π΅Π½Π½ΠΎ, Π½ΠΈΠΊΠΎΠ³Π΄Π° Π½Π΅ ΠΏΠΎΠΉΠ΄Π΅Ρ Π² ΠΏΡΠΎΠ΄, ΡΠ΅ΡΡΠ»ΠΎΠ²ΠΎ!Β»
- ΠΠ΅ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠ΅ Π²ΡΡΡΠΎΠ΅Π½Π½ΡΡ
ΡΡΠ΅Π΄ΡΡΠ²:
- Connections Π΄Π»Ρ Ρ ΡΠ°Π½Π΅Π½ΠΈΡ ΡΡΠ΅ΡΠ½ΡΡ Π΄Π°Π½Π½ΡΡ ΡΠ΅ΡΠ²ΠΈΡΠΎΠ²,
- SLA Misses Π΄Π»Ρ ΡΠ΅Π°Π³ΠΈΡΠΎΠ²Π°Π½ΠΈΡ Π½Π° ΡΠ°ΡΠΊΠΈ, ΠΊΠΎΡΠΎΡΡΠ΅ Π½Π΅ ΠΎΡΡΠ°Π±ΠΎΡΠ°Π»ΠΈ Π²ΠΎΠ²ΡΠ΅ΠΌΡ,
- XCom Π΄Π»Ρ ΠΎΠ±ΠΌΠ΅Π½Π° ΠΌΠ΅ΡΠ°Π΄Π°Π½Π½ΡΠΌΠΈ (Ρ ΡΠΊΠ°Π·Π°Π» ΠΌΠ΅ΡΠ°Π΄Π°Π½Π½ΡΠΌΠΈ!) ΠΌΠ΅ΠΆΠ΄Ρ ΡΠ°ΡΠΊΠ°ΠΌΠΈ Π΄Π°Π³Π°.
- ΠΠ»ΠΎΡΠΏΠΎΡΡΠ΅Π±Π»Π΅Π½ΠΈΠ΅ ΠΏΠΎΡΡΠΎΠΉ. ΠΡ ΡΡΠΎ ΡΡΡ ΡΠΊΠ°Π·Π°ΡΡ? ΠΡΠ»ΠΈ Π½Π°ΡΡΡΠΎΠ΅Π½Ρ ΠΎΠΏΠΎΠ²Π΅ΡΠ΅Π½ΠΈΡ Π½Π° Π²ΡΠ΅ ΠΏΠΎΠ²ΡΠΎΡΡ ΡΠΏΠ°Π²ΡΠΈΡ ΡΠ°ΡΠΊΠΎΠ². Π’Π΅ΠΏΠ΅ΡΡ Π² ΠΌΠΎΡΠΌ ΡΠ°Π±ΠΎΡΠ΅ΠΌ Gmail >90k ΠΏΠΈΡΠ΅ΠΌ ΠΎΡ Airflow, ΠΈ Π²Π΅Π±-ΠΌΠΎΡΠ΄Π° ΠΏΠΎΡΡΡ ΠΎΡΠΊΠ°Π·ΡΠ²Π°Π΅ΡΡΡ Π±ΡΠ°ΡΡ ΠΈ ΡΠ΄Π°Π»ΡΡΡ Π±ΠΎΠ»ΡΡΠ΅ ΡΠ΅ΠΌ ΠΏΠΎ 100 ΡΡΡΠΊ Π·Π° ΡΠ°Π·.
ΠΠΎΠ»ΡΡΠ΅ ΠΏΠΎΠ΄Π²ΠΎΠ΄Π½ΡΡ ΠΊΠ°ΠΌΠ½Π΅ΠΉ:
Apache Airflow Pitfails
Π‘ΡΠ΅Π΄ΡΡΠ²Π° Π΅ΡΡ Π±ΠΎΠ»ΡΡΠ΅ΠΉ Π°Π²ΡΠΎΠΌΠ°ΡΠΈΠ·Π°ΡΠΈΠΈ
ΠΠ»Ρ ΡΠΎΠ³ΠΎ ΡΡΠΎΠ±Ρ Π½Π°ΠΌ Π΅ΡΠ΅ Π±ΠΎΠ»ΡΡΠ΅ ΡΠ°Π±ΠΎΡΠ°ΡΡ Π³ΠΎΠ»ΠΎΠ²ΠΎΠΉ, Π° Π½Π΅ ΡΡΠΊΠ°ΠΌΠΈ, Airflow Π·Π°Π³ΠΎΡΠΎΠ²ΠΈΠ»Π° Π΄Π»Ρ Π½Π°Ρ Π²ΠΎΡ ΡΡΠΎ:
REST API β ΠΎΠ½ Π΄ΠΎ ΡΠΈΡ ΠΏΠΎΡ ΠΈΠΌΠ΅Π΅Ρ ΡΡΠ°ΡΡΡ Experimental, ΡΡΠΎ Π½Π΅ ΠΌΠ΅ΡΠ°Π΅Ρ Π΅ΠΌΡ ΡΠ°Π±ΠΎΡΠ°ΡΡ. Π‘ Π΅Π³ΠΎ ΠΏΠΎΠΌΠΎΡΡΡ ΠΌΠΎΠΆΠ½ΠΎ Π½Π΅ ΡΠΎΠ»ΡΠΊΠΎ ΠΏΠΎΠ»ΡΡΠ°ΡΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ ΠΎ Π΄Π°Π³Π°Ρ ΠΈ ΡΠ°ΡΠΊΠ°Ρ , Π½ΠΎ ΠΎΡΡΠ°Π½ΠΎΠ²ΠΈΡΡ/Π·Π°ΠΏΡΡΡΠΈΡΡ Π΄Π°Π³, ΡΠΎΠ·Π΄Π°ΡΡ DAG Run ΠΈΠ»ΠΈ ΠΏΡΠ».CLI β ΡΠ΅ΡΠ΅Π· ΠΊΠΎΠΌΠ°Π½Π΄Π½ΡΡ ΡΡΡΠΎΠΊΡ Π΄ΠΎΡΡΡΠΏΠ½Ρ ΠΌΠ½ΠΎΠ³ΠΈΠ΅ ΡΡΠ΅Π΄ΡΡΠ²Π°, ΠΊΠΎΡΠΎΡΡΠ΅ Π½Π΅ ΠΏΡΠΎΡΡΠΎ Π½Π΅ΡΠ΄ΠΎΠ±Π½Ρ Π² ΠΎΠ±ΡΠ°ΡΠ΅Π½ΠΈΠΈ ΡΠ΅ΡΠ΅Π· WebUI, Π° Π²ΠΎΠΎΠ±ΡΠ΅ ΠΎΡΡΡΡΡΡΠ²ΡΡΡ. ΠΠ°ΠΏΡΠΈΠΌΠ΅Ρ:backfill
Π½ΡΠΆΠ΅Π½ Π΄Π»Ρ ΠΏΠΎΠ²ΡΠΎΡΠ½ΠΎΠ³ΠΎ Π·Π°ΠΏΡΡΠΊΠ° ΠΈΠ½ΡΡΠ°Π½ΡΠΎΠ² ΡΠ°ΡΠΊΠΎΠ².
ΠΠ°ΠΏΡΠΈΠΌΠ΅Ρ, ΠΏΡΠΈΡΠ»ΠΈ Π°Π½Π°Π»ΠΈΡΠΈΠΊΠΈ, Π³ΠΎΠ²ΠΎΡΡΡ: Β«Π Ρ Π²Π°Ρ, ΡΠΎΠ²Π°ΡΠΈΡ, Π΅ΡΡΠ½Π΄Π° Π² Π΄Π°Π½Π½ΡΡ Ρ 1 ΠΏΠΎ 13 ΡΠ½Π²Π°ΡΡ! Π§ΠΈΠ½ΠΈ-ΡΠΈΠ½ΠΈ-ΡΠΈΠ½ΠΈ-ΡΠΈΠ½ΠΈ!Β». Π ΡΡ ΡΠ°ΠΊΠΎΠΉ Ρ ΠΎΠ±Π°:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- ΠΠ±ΡΠ»ΡΠΆΠΈΠ²Π°Π½ΠΈΠ΅ Π±Π°Π·Ρ:
initdb
,resetdb
,upgradedb
,checkdb
. run
, ΠΊΠΎΡΠΎΡΡΠΉ ΠΏΠΎΠ·Π²ΠΎΠ»ΡΠ΅Ρ Π·Π°ΠΏΡΡΡΠΈΡΡ ΠΎΠ΄ΠΈΠ½ ΠΈΠ½ΡΡΠ°Π½Ρ ΡΠ°ΡΠΊΠ°, Π΄Π° Π΅ΡΠ΅ ΠΈ Π·Π°Π±ΠΈΡΡ Π½Π° Π²ΡΡ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ. ΠΠΎΠ»Π΅Π΅ ΡΠΎΠ³ΠΎ, ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡΡΡΠΈΡΡ Π΅Π³ΠΎ ΡΠ΅ΡΠ΅Π·LocalExecutor
, Π΄Π°ΠΆΠ΅ Π΅ΡΠ»ΠΈ Ρ Π²Π°Ρ Celery-ΠΊΠ»Π°ΡΡΠ΅Ρ.- ΠΡΠΈΠΌΠ΅ΡΠ½ΠΎ ΡΠΎ ΠΆΠ΅ ΡΠ°ΠΌΠΎΠ΅ Π΄Π΅Π»Π°Π΅Ρ
test
, ΡΠΎΠ»ΡΠΊΠΎ Π΅ΡΠ΅ ΠΈ Π² Π±Π°Π· Π½ΠΈΡΠ΅Π³ΠΎ Π½Π΅ ΠΏΠΈΡΠ΅Ρ. connections
ΠΏΠΎΠ·Π²ΠΎΠ»ΡΠ΅Ρ ΠΌΠ°ΡΡΠΎΠ²ΠΎ ΡΠΎΠ·Π΄Π°Π²Π°ΡΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ· ΡΠ΅Π»Π»Π°.
Python API β Π΄ΠΎΠ²ΠΎΠ»ΡΠ½ΠΎ Ρ Π°ΡΠ΄ΠΊΠΎΡΠ½ΡΠΉ ΡΠΏΠΎΡΠΎΠ± Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΠΈΡ, ΠΊΠΎΡΠΎΡΡΠΉ ΠΏΡΠ΅Π΄Π½Π°Π·Π½Π°ΡΠ΅Π½ Π΄Π»Ρ ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ², Π° Π½Π΅ ΠΊΠΎΠΏΠΎΡΠ΅Π½ΠΈΡ Π² Π½ΡΠΌ ΡΡΡΡΠ½ΠΊΠ°ΠΌΠΈ. ΠΠΎ ΠΊΡΠΎ ΠΆ Π½Π°ΠΌ ΠΏΠΎΠΌΠ΅ΡΠ°Π΅Ρ ΠΏΠΎΠΉΡΠΈ Π²/home/airflow/dags
, Π·Π°ΠΏΡΡΡΠΈΡΡipython
ΠΈ Π½Π°ΡΠ°ΡΡ Π±Π΅ΡΠΏΡΠ΅Π΄Π΅Π»ΡΠ½ΠΈΡΠ°ΡΡ? ΠΠΎΠΆΠ½ΠΎ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, ΡΠΊΡΠΏΠΎΡΡΠΈΡΠΎΠ²Π°ΡΡ Π²ΡΠ΅ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΡΠ°ΠΊΠΎΠΌ ΠΊΠΎΠ΄ΠΎΠΌ:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- ΠΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΠ΅ ΠΊ Π±Π°Π·Π΅ ΠΌΠ΅ΡΠ°Π΄Π°Π½Π½ΡΡ
Airflow. ΠΠΈΡΠ°ΡΡ Π² Π½Π΅Ρ Ρ Π½Π΅ ΡΠ΅ΠΊΠΎΠΌΠ΅Π½Π΄ΡΡ, Π° Π²ΠΎΡ Π΄ΠΎΡΡΠ°Π²Π°ΡΡ ΡΠΎΡΡΠΎΡΠ½ΠΈΡ ΡΠ°ΡΠΊΠΎΠ² Π΄Π»Ρ ΡΠ°Π·Π»ΠΈΡΠ½ΡΡ
ΡΠΏΠ΅ΡΠΈΡΠΈΡΠ΅ΡΠΊΠΈΡ
ΠΌΠ΅ΡΡΠΈΠΊ ΠΌΠΎΠΆΠ½ΠΎ Π·Π½Π°ΡΠΈΡΠ΅Π»ΡΠ½ΠΎ Π±ΡΡΡΡΠ΅Π΅ ΠΈ ΠΏΡΠΎΡΠ΅, ΡΠ΅ΠΌ ΡΠ΅ΡΠ΅Π· Π»ΡΠ±ΠΎΠΉ ΠΈΠ· API.
Π‘ΠΊΠ°ΠΆΠ΅ΠΌ, Π΄Π°Π»Π΅ΠΊΠΎ Π½Π΅ Π²ΡΠ΅ Π½Π°ΡΠΈ ΡΠ°ΡΠΊΠΈ ΠΈΠ΄Π΅ΠΌΠΏΠΎΡΠ΅Π½ΡΠ½Ρ, Π° ΠΌΠΎΠ³ΡΡ ΠΈΠ½ΠΎΠ³Π΄Π° ΠΏΠ°Π΄Π°ΡΡ ΠΈ ΡΡΠΎ Π½ΠΎΡΠΌΠ°Π»ΡΠ½ΠΎ. ΠΠΎ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ Π·Π°Π²Π°Π»ΠΎΠ² β ΡΡΠΎ ΡΠΆΠ΅ ΠΏΠΎΠ΄ΠΎΠ·ΡΠΈΡΠ΅Π»ΡΠ½ΠΎ, ΠΈ Π½Π°Π΄ΠΎ Π±Ρ ΠΏΡΠΎΠ²Π΅ΡΠΈΡΡ.
ΠΡΡΠΎΡΠΎΠΆΠ½ΠΎ, SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Π‘ΡΡΠ»ΠΊΠΈ
ΠΡ ΠΈ Π΅ΡΡΠ΅ΡΡΠ²Π΅Π½Π½ΠΎ ΠΏΠ΅ΡΠ²ΡΠ΅ Π΄Π΅ΡΡΡΡ ΡΡΡΠ»ΠΎΠΊ ΠΈΠ· Π²ΡΠ΄Π°ΡΠΈ Π³ΡΠ³Π»Π° ΡΠΎΠ΄Π΅ΡΠΆΠΈΠΌΠΎΠ΅ ΠΏΠ°ΠΏΠΊΠΈ Airflow ΠΈΠ· ΠΌΠΎΠΈΡ Π·Π°ΠΊΠ»Π°Π΄ΠΎΠΊ.
Apache Airflow Documentation β ΠΊΠΎΠ½Π΅ΡΠ½ΠΎ, Π½Π°Π΄ΠΎ Π½Π°ΡΠ°ΡΡ Ρ ΠΎΡ. Π΄ΠΎΠΊΡΠΌΠ΅Π½ΡΠ°ΡΠΈΠΈ, Π½ΠΎ ΠΊΡΠΎ ΠΆΠ΅ ΡΠΈΡΠ°Π΅Ρ ΠΈΠ½ΡΡΡΡΠΊΡΠΈΠΈ?Best Practices β Π½Ρ Ρ ΠΎΡΡ Π±Ρ ΡΠ΅ΠΊΠΎΠΌΠ΅Π½Π΄Π°ΡΠΈΠΈ ΠΎΡ ΡΠΎΠ·Π΄Π°ΡΠ΅Π»Π΅ΠΉ ΠΏΡΠΎΡΠΈΡΠ°ΠΉΡΠ΅.The Airflow UI β ΡΠ°ΠΌΠΎΠ΅ Π½Π°ΡΠ°Π»ΠΎ: ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΠ΅Π»ΡΡΠΊΠΈΠΉ ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡ Π² ΠΊΠ°ΡΡΠΈΠ½ΠΊΠ°ΡUnderstanding Apache Airflowβs key concepts β Ρ ΠΎΡΠΎΡΠΎ ΡΠ°ΡΠΏΠΈΡΠ°Π½Ρ Π±Π°Π·ΠΎΠ²ΡΠ΅ ΠΏΠΎΠ½ΡΡΠΈΡ, Π΅ΡΠ»ΠΈ (Π²Π΄ΡΡΠ³!) Π²Ρ ΡΡΠΎ-ΡΠΎ Π½Π΅ ΠΏΠΎΠ½ΡΠ»ΠΈ Ρ ΠΌΠ΅Π½Ρ.Tianlong’s Blog β A Guide On How To Build An Airflow Server/Cluster β ΠΊΡΠ°ΡΠΊΠΈΠΉ Π³Π°ΠΉΠ΄ ΠΏΠΎ Π½Π°ΡΡΡΠΎΠΉΠΊΠ΅ Airflow-ΠΊΠ»Π°ΡΡΠ΅ΡΠ°.Running Apache Airflow At Lyft β ΠΏΠΎΡΡΠΈ ΡΠ°ΠΊΠ°Ρ ΠΆΠ΅ ΠΈΠ½ΡΠ΅ΡΠ΅ΡΠ½Π°Ρ ΡΡΠ°ΡΡΡ, ΡΠ°Π·Π²Π΅ ΡΡΠΎ ΡΠΎΡΠΌΠ°Π»ΠΈΠ·ΠΌΠ° ΠΏΠΎΠ±ΠΎΠ»ΡΡΠ΅, Π° ΠΏΡΠΈΠΌΠ΅ΡΠΎΠ² ΠΏΠΎΠΌΠ΅Π½ΡΡΠ΅.How Apache Airflow Distributes Jobs on Celery workers β ΠΎ ΡΠ°Π±ΠΎΡΠ΅ Π² ΡΠ²ΡΠ·ΠΊΠ΅ Ρ Celery.DAG Writing Best Practices in Apache Airflow β ΠΏΡΠΎ ΠΈΠ΄Π΅ΠΌΠΏΠΎΡΠ΅Π½ΡΠ½ΠΎΡΡΡ ΡΠ°ΡΠΊΠΎΠ², Π·Π°Π³ΡΡΠ·ΠΊΡ ΠΏΠΎ ID Π²ΠΌΠ΅ΡΡΠΎ Π΄Π°ΡΡ, ΡΡΠ°Π½ΡΡΠΎΡΠΌΠ°ΡΠΈΠΈ, ΡΡΡΡΠΊΡΡΡΡ ΡΠ°ΠΉΠ»ΠΎΠ² ΠΈ ΠΏΡΠΎΡΠΈΠ΅ ΠΈΠ½ΡΠ΅ΡΠ΅ΡΠ½ΡΠ΅ Π²Π΅ΡΠΈ.Managing Dependencies in Apache Airflow β Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ ΡΠ°ΡΠΊΠΎΠ² ΠΈ Trigger Rule, ΠΊΠΎΡΠΎΡΡΠ΅ Ρ ΡΠΏΠΎΠΌΡΠ½ΡΠ» Π»ΠΈΡΡ Π²ΡΠΊΠΎΠ»ΡΠ·Ρ.Airflow: When Your DAG is Far Behind The Schedule β ΠΊΠ°ΠΊ ΠΏΡΠ΅ΠΎΠ΄ΠΎΠ»Π΅Π²Π°ΡΡ Π½Π΅ΠΊΠΎΡΠΎΡΡΠ΅ Β«ΡΠ°Π±ΠΎΡΠ°Π΅Ρ, ΠΊΠ°ΠΊ Π·Π°Π΄ΡΠΌΠ°Π½ΠΎΒ» Ρ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊΠ°, Π·Π°Π³ΡΡΠΆΠ°ΡΡ ΠΏΠΎΡΠ΅ΡΡΠ½Π½ΡΠ΅ Π΄Π°Π½Π½ΡΠ΅ ΠΈ ΡΠ°ΡΡΡΠ°Π²Π»ΡΡΡ ΠΏΡΠΈΠΎΡΠΈΡΠ΅ΡΡ ΡΠ°ΡΠΊΠΎΠ².Useful SQL queries for Apache Airflow β ΠΏΠΎΠ»Π΅Π·Π½ΡΠ΅ SQL-Π·Π°ΠΏΡΠΎΡΡ ΠΊ ΠΌΠ΅ΡΠ°Π΄Π°Π½Π½ΡΠΌ Airflow.Get started developing workflows with Apache Airflow β Π΅ΡΡΡ ΠΏΠΎΠ»Π΅Π·Π½ΡΠΉ ΡΠ°Π·Π΄Π΅Π» ΠΏΡΠΎ ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ ΠΊΠ°ΡΡΠΎΠΌΠ½ΠΎΠ³ΠΎ ΡΠ΅Π½ΡΠΎΡΠ°.Building the Fetchr Data Science Infra on AWS with Presto and Airflow β ΠΈΠ½ΡΠ΅ΡΠ΅ΡΠ½Π°Ρ ΠΊΠΎΡΠΎΡΠΊΠ°Ρ Π·Π°ΠΌΠ΅ΡΠΊΠ° ΠΎ ΠΏΠΎΡΡΡΠΎΠ΅Π½ΠΈΠΈ ΠΈΠ½ΡΡΠ°ΡΡΡΡΠΊΡΡΡΡ Π½Π° AWS Π΄Π»Ρ Data Science.7 Common Errors to Check when Debugging Airflow DAGs β ΡΠ°ΡΠΏΡΠΎΡΡΡΠ°Π½Π΅Π½Π½ΡΠ΅ ΠΎΡΠΈΠ±ΠΊΠΈ (ΠΊΠΎΠ³Π΄Π° ΠΊΠΎΠ΅-ΠΊΡΠΎ Π²ΡΡ-ΡΠ°ΠΊΠΈ Π½Π΅ ΡΠΈΡΠ°Π΅Ρ ΠΈΠ½ΡΡΡΡΠΊΡΠΈΠΉ).Store and access password using Apache Airflow β ΡΠ»ΡΠ±Π½ΠΈΡΠ΅ΡΡ, ΠΊΠ°ΠΊ Π»ΡΠ΄ΠΈ ΠΊΠΎΡΡΡΠ»ΡΡ Ρ ΡΠ°Π½Π΅Π½ΠΈΠ΅ ΠΏΠ°ΡΠΎΠ»Π΅ΠΉ, Ρ ΠΎΡΡ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡΠΎΡΡΠΎ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡ Connections.The Zen of Python and Apache Airflow β Π½Π΅ΡΠ²Π½ΡΠΉ ΠΏΡΠΎΠ±ΡΠΎΡ DAG, Π·Π°Π±ΡΠΎΡ ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ° Π² ΡΡΠ½ΠΊΡΠΈΠΈ, ΡΠ½ΠΎΠ²Π° ΠΏΡΠΎ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ, Π° Π΅ΡΠ΅ ΠΏΡΠΎ ΠΏΡΠΎΠΏΡΡΠΊ Π·Π°ΠΏΡΡΠΊΠΎΠ² ΡΠ°ΡΠΊΠΎΠ².Airflow: Lesser Known Tips, Tricks, and Best Practises β ΠΎΠ± ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠΈdefault arguments
ΠΈparams
Π² ΡΠ°Π±Π»ΠΎΠ½Π°Ρ , Π° ΡΠ°ΠΊΠΆΠ΅ ΠΎ Variables ΠΈ Connections.Profiling the Airflow Scheduler β ΡΠ°ΡΡΠΊΠ°Π· ΠΎ ΡΠΎΠΌ, ΠΊΠ°ΠΊ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²ΡΠΈΠΊ Π³ΠΎΡΠΎΠ²ΡΡ ΠΊ Airflow 2.0.Apache Airflow with 3 Celery workers in docker-compose β Π½Π΅ΠΌΠ½ΠΎΠΆΠΊΠΎ ΡΡΡΠ°ΡΠ΅Π²ΡΠ°Ρ ΡΡΠ°ΡΡΡ ΠΏΡΠΎ Π΄Π΅ΠΏΠ»ΠΎΠΉ Π½Π°ΡΠ΅Π³ΠΎ ΠΊΠ»Π°ΡΡΠ΅ΡΠ° Π²docker-compose
.4 Templating Tasks Using the Airflow Context β Π΄ΠΈΠ½Π°ΠΌΠΈΡΠ΅ΡΠΊΠΈΠ΅ ΡΠ°ΡΠΊ Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΡΠ°Π±Π»ΠΎΠ½ΠΎΠ² ΠΈ ΠΏΡΠΎΠ±ΡΠΎΡΠ° ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ°.Error Notifications in Airflow β ΡΡΠ°Π½Π΄Π°ΡΡΠ½ΡΠ΅ ΠΈ ΠΊΠ°ΡΡΠΎΠΌΠ½ΡΠ΅ ΠΎΠΏΠΎΠ²Π΅ΡΠ΅Π½ΠΈΡ ΠΏΠΎΡΡΠΎΠΉ ΠΈ Slack.Airflow Workshop: ΡΠ»ΠΎΠΆΠ½ΡΠ΅ DAGβΠΈ Π±Π΅Π· ΠΊΠΎΡΡΡΠ»Π΅ΠΉ β ΠΠ΅ΡΠ²Π»Π΅Π½ΠΈΡ ΡΠ°ΡΠΊΠΎΠ², ΠΌΠ°ΠΊΡΠΎΡΡ ΠΈ XCom.
Π ΡΡΡΠ»ΠΊΠΈ, Π·Π°Π΄Π΅ΠΉΡΡΠ²ΠΎΠ²Π°Π½Π½ΡΠ΅ Π² ΡΡΠ°ΡΡΠ΅:
Macros reference β Π΄ΠΎΡΡΡΠΏΠ½ΡΠ΅ Π΄Π»Ρ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΡ Π² ΡΠ°Π±Π»ΠΎΠ½Π°Ρ ΠΏΠ»Π΅ΠΉΡΡ ΠΎΠ»Π΄Π΅ΡΡ.Common Pitfalls β Airflow β Π Π°ΡΠΏΡΠΎΡΡΡΠ°Π½Π΅Π½Π½ΡΠ΅ ΠΎΡΠΈΠ±ΠΊΠΈ ΠΏΡΠΈ ΡΠΎΠ·Π΄Π°Π½ΠΈΠΈ Π΄Π°Π³ΠΎΠ².puckel/docker-airflow: Docker Apache Airflow βdocker-compose
Π΄Π»Ρ ΡΠΊΡΠΏΠ΅ΡΠΈΠΌΠ΅Π½ΡΠΎΠ², ΠΎΡΠ»Π°Π΄ΠΊΠΈ ΠΈ Π½Π΅ ΡΠΎΠ»ΡΠΊΠΎ.python-telegram-bot/python-telegram-bot: We have made you a wrapper you can’t refuse β Python-ΠΎΠ±Π΅ΡΡΠΊΠ° Π΄Π»Ρ Telegram REST API.
ΠΡΡΠΎΡΠ½ΠΈΠΊ: habr.com