Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

ΠŸΡ€ΠΈΠ²Π΅Ρ‚, я Π”ΠΌΠΈΡ‚Ρ€ΠΈΠΉ Π›ΠΎΠ³Π²ΠΈΠ½Π΅Π½ΠΊΠΎ β€” Data Engineer ΠΎΡ‚Π΄Π΅Π»Π° Π°Π½Π°Π»ΠΈΡ‚ΠΈΠΊΠΈ Π³Ρ€ΡƒΠΏΠΏΡ‹ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΠΉ Β«Π’Π΅Π·Ρ‘Ρ‚Β».

Π― расскаТу Π²Π°ΠΌ ΠΎ Π·Π°ΠΌΠ΅Ρ‡Π°Ρ‚Π΅Π»ΡŒΠ½ΠΎΠΌ инструмСнтС для Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ ETL-процСссов β€” Apache Airflow. Но Airflow Π½Π°ΡΡ‚ΠΎΠ»ΡŒΠΊΠΎ унивСрсалСн ΠΈ ΠΌΠ½ΠΎΠ³ΠΎΠ³Ρ€Π°Π½Π΅Π½, Ρ‡Ρ‚ΠΎ Π²Π°ΠΌ стоит ΠΏΡ€ΠΈΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒΡΡ ΠΊ Π½Π΅ΠΌΡƒ Π΄Π°ΠΆΠ΅ Ссли Π²Ρ‹ Π½Π΅ Π·Π°Π½ΠΈΠΌΠ°Π΅Ρ‚Π΅ΡΡŒ ΠΏΠΎΡ‚ΠΎΠΊΠ°ΠΌΠΈ Π΄Π°Π½Π½Ρ‹Ρ…, Π° ΠΈΠΌΠ΅Π΅Ρ‚Π΅ ΠΏΠΎΡ‚Ρ€Π΅Π±Π½ΠΎΡΡ‚ΡŒ пСриодичСски Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ ΠΊΠ°ΠΊΠΈΠ΅-Π»ΠΈΠ±ΠΎ процСссы ΠΈ ΡΠ»Π΅Π΄ΠΈΡ‚ΡŒ Π·Π° ΠΈΡ… Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ΠΌ.

И Π΄Π°, я Π±ΡƒΠ΄Ρƒ Π½Π΅ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Ρ€Π°ΡΡΠΊΠ°Π·Ρ‹Π²Π°Ρ‚ΡŒ, Π½ΠΎ ΠΈ ΠΏΠΎΠΊΠ°Π·Ρ‹Π²Π°Ρ‚ΡŒ: Π² ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΠ΅ ΠΌΠ½ΠΎΠ³ΠΎ ΠΊΠΎΠ΄Π°, ΡΠΊΡ€ΠΈΠ½ΡˆΠΎΡ‚ΠΎΠ² ΠΈ Ρ€Π΅ΠΊΠΎΠΌΠ΅Π½Π΄Π°Ρ†ΠΈΠΉ.

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅
Π§Ρ‚ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ видишь, ΠΊΠΎΠ³Π΄Π° Π³ΡƒΠ³Π»ΠΈΡˆΡŒ слово Airflow / Wikimedia Commons

ОглавлСниС

Π’Π²Π΅Π΄Π΅Π½ΠΈΠ΅

Apache Airflow β€” ΠΎΠ½ прямо ΠΊΠ°ΠΊ Django:

  • написан Π½Π° Python,
  • Π΅ΡΡ‚ΡŒ отличная Π°Π΄ΠΌΠΈΠ½ΠΊΠ°,
  • Π½Π΅ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½Π½ΠΎ Ρ€Π°ΡΡˆΠΈΡ€ΡΠ΅ΠΌ,

β€” Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Π»ΡƒΡ‡ΡˆΠ΅, Π΄Π° ΠΈ сдСлан совсСм для Π΄Ρ€ΡƒΠ³ΠΈΡ… Ρ†Π΅Π»Π΅ΠΉ, Π° ΠΈΠΌΠ΅Π½Π½ΠΎ (ΠΊΠ°ΠΊ написано Π΄ΠΎ ΠΊΠ°Ρ‚Π°):

  • запуск ΠΈ ΠΌΠΎΠ½ΠΈΡ‚ΠΎΡ€ΠΈΠ½Π³ Π·Π°Π΄Π°Ρ‡ Π½Π° Π½Π΅ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½Π½ΠΎΠΌ количСствС машин (сколько Π²Π°ΠΌ ΠΏΠΎΠ·Π²ΠΎΠ»ΠΈΡ‚ Celery/Kubernetes ΠΈ ваша ΡΠΎΠ²Π΅ΡΡ‚ΡŒ)
  • с динамичСской Π³Π΅Π½Π΅Ρ€Π°Ρ†ΠΈΠ΅ΠΉ workflow ΠΈΠ· ΠΎΡ‡Π΅Π½ΡŒ Π»Π΅Π³ΠΊΠΎΠ³ΠΎ для написания ΠΈ восприятия Python-ΠΊΠΎΠ΄Π°
  • ΠΈ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒΡŽ ΡΠ²ΡΠ·Ρ‹Π²Π°Ρ‚ΡŒ Π΄Ρ€ΡƒΠ³ с Π΄Ρ€ΡƒΠ³ Π»ΡŽΠ±Ρ‹Π΅ Π±Π°Π·Ρ‹ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈ API с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ ΠΊΠ°ΠΊ Π³ΠΎΡ‚ΠΎΠ²Ρ‹Ρ… ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚ΠΎΠ², Ρ‚Π°ΠΊ ΠΈ ΡΠ°ΠΌΠΎΠ΄Π΅Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ² (Ρ‡Ρ‚ΠΎ дСлаСтся Ρ‡Ρ€Π΅Π·Π²Ρ‹Ρ‡Π°ΠΉΠ½ΠΎ просто).

ΠœΡ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ Apache Airflow Ρ‚Π°ΠΊ:

  • собираСм Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· Ρ€Π°Π·Π»ΠΈΡ‡Π½Ρ‹Ρ… источников (мноТСство инстансов SQL Server ΠΈ PostgreSQL, Ρ€Π°Π·Π»ΠΈΡ‡Π½Ρ‹Π΅ API с ΠΌΠ΅Ρ‚Ρ€ΠΈΠΊΠ°ΠΌΠΈ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ, Π΄Π°ΠΆΠ΅ 1Π‘) Π² DWH ΠΈ ODS (Ρƒ нас это Vertica ΠΈ Clickhouse).
  • ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΠ΄Π²ΠΈΠ½ΡƒΡ‚Ρ‹ΠΉ cron, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ запускаСт процСссы консолидации Π΄Π°Π½Π½Ρ‹Ρ… Π½Π° ODS, Π° Ρ‚Π°ΠΊΠΆΠ΅ слСдит Π·Π° ΠΈΡ… обслуТиваниСм.

Π”ΠΎ Π½Π΅Π΄Π°Π²Π½Π΅Π³ΠΎ Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ наши потрСбности ΠΏΠΎΠΊΡ€Ρ‹Π²Π°Π» ΠΎΠ΄ΠΈΠ½ нСбольшой сСрвСр Π½Π° 32 ядрах ΠΈ 50 GB ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΈΠ²ΠΊΠΈ. Π’ Airflow ΠΏΡ€ΠΈ этом Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚:

  • Π±ΠΎΠ»Π΅Π΅ 200 Π΄Π°Π³ΠΎΠ² (собствСнно workflows, Π² ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΌΡ‹ Π½Π°Π±ΠΈΠ»ΠΈ Π·Π°Π΄Π°Ρ‡ΠΊΠΈ),
  • Π² ΠΊΠ°ΠΆΠ΄ΠΎΠΌ Π² срСднСм ΠΏΠΎ 70 тасков,
  • запускаСтся это Π΄ΠΎΠ±Ρ€ΠΎ (Ρ‚ΠΎΠΆΠ΅ Π² срСднСм) Ρ€Π°Π· Π² час.

А ΠΎ Ρ‚ΠΎΠΌ, ΠΊΠ°ΠΊ ΠΌΡ‹ Ρ€Π°ΡΡˆΠΈΡ€ΡΠ»ΠΈΡΡŒ, я Π½Π°ΠΏΠΈΡˆΡƒ Π½ΠΈΠΆΠ΅, Π° сСйчас Π΄Π°Π²Π°ΠΉΡ‚Π΅ ΠΎΠΏΡ€Π΅Π΄Π΅Π»ΠΈΠΌ ΓΌber-Π·Π°Π΄Π°Ρ‡Ρƒ, ΠΊΠΎΡ‚ΠΎΡ€ΡƒΡŽ ΠΌΡ‹ Π±ΡƒΠ΄Π΅ΠΌ Ρ€Π΅ΡˆΠ°Ρ‚ΡŒ:

Π•ΡΡ‚ΡŒ Ρ‚Ρ€ΠΈ исходных SQL Server’а, Π½Π° ΠΊΠ°ΠΆΠ΄ΠΎΠΌ ΠΏΠΎ 50 Π±Π°Π· Π΄Π°Π½Π½Ρ‹Ρ… β€” инстансов ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΏΡ€ΠΎΠ΅ΠΊΡ‚Π°, соотвСтствСнно, структура Ρƒ Π½ΠΈΡ… одинаковая (ΠΏΠΎΡ‡Ρ‚ΠΈ Π²Π΅Π·Π΄Π΅, ΠΌΡƒΠ°-Ρ…Π°-Ρ…Π°), Π° Π·Π½Π°Ρ‡ΠΈΡ‚ Π² ΠΊΠ°ΠΆΠ΄ΠΎΠΉ Π΅ΡΡ‚ΡŒ Ρ‚Π°Π±Π»ΠΈΡ†Π° Orders (Π±Π»Π°Π³ΠΎ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ с Ρ‚Π°ΠΊΠΈΠΌ Π½Π°Π·Π²Π°Π½ΠΈΠ΅ΠΌ ΠΌΠΎΠΆΠ½ΠΎ Π·Π°Ρ‚ΠΎΠ»ΠΊΠ°Ρ‚ΡŒ Π² любой бизнСс). ΠœΡ‹ Π·Π°Π±ΠΈΡ€Π°Π΅ΠΌ Π΄Π°Π½Π½Ρ‹Π΅, добавляя слуТСбныС поля (сСрвСр-источник, Π±Π°Π·Π°-источник, ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ETL-Π·Π°Π΄Π°Ρ‡ΠΈ) ΠΈ Π½Π°ΠΈΠ²Π½Ρ‹ΠΌ ΠΎΠ±Ρ€Π°Π·ΠΎΠΌ бросим ΠΈΡ… Π², скаТСм, Vertica.

ΠŸΠΎΠ΅Ρ…Π°Π»ΠΈ!

Π§Π°ΡΡ‚ΡŒ основная, практичСская (ΠΈ Π½Π΅ΠΌΠ½ΠΎΠ³ΠΎ тСорСтичСская)

Π—Π°Ρ‡Π΅ΠΌ ΠΎΠ½ΠΎ Π½Π°ΠΌ (ΠΈ Π²Π°ΠΌ)

Когда Π΄Π΅Ρ€Π΅Π²ΡŒΡ Π±Ρ‹Π»ΠΈ большими, Π° я Π±Ρ‹Π» простым SQL-Ρ‰ΠΈΠΊΠΎΠΌ Π² ΠΎΠ΄Π½ΠΎΠΌ российском Ρ€ΠΈΡ‚Π΅ΠΉΠ»Π΅, ΠΌΡ‹ ΡˆΠΏΠ°Ρ€ΠΈΠ»ΠΈ ETL-процСссы aka ΠΏΠΎΡ‚ΠΎΠΊΠΈ Π΄Π°Π½Π½Ρ‹Ρ… с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Π΄Π²ΡƒΡ… доступных Π½Π°ΠΌ срСдств:

  • Informatica Power Center β€” ΠΊΡ€Π°ΠΉΠ½Π΅ развСсистая систСма, Ρ‡Ρ€Π΅Π·Π²Ρ‹Ρ‡Π°ΠΉΠ½ΠΎ ΠΏΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡ‚Π΅Π»ΡŒΠ½Π°Ρ, со своими ΠΆΠ΅Π»Π΅Π·ΠΊΠ°ΠΌΠΈ, собствСнным вСрсионированиСм. Использовал я Π΄Π°ΠΉ Π±ΠΎΠ³ 1% Π΅Ρ‘ возмоТностСй. ΠŸΠΎΡ‡Π΅ΠΌΡƒ? Ну, Π²ΠΎ-ΠΏΠ΅Ρ€Π²Ρ‹Ρ…, этот интСрфСйс Π³Π΄Π΅-Ρ‚ΠΎ ΠΈΠ· Π½ΡƒΠ»Π΅Π²Ρ‹Ρ… психичСски Π΄Π°Π²ΠΈΠ» Π½Π° нас. Π’ΠΎ-Π²Ρ‚ΠΎΡ€Ρ‹Ρ…, эта ΡˆΡ‚ΡƒΠΊΠΎΠ²ΠΈΠ½Π° Π·Π°Ρ‚ΠΎΡ‡Π΅Π½Π° ΠΏΠΎΠ΄ Ρ‡Ρ€Π΅Π·Π²Ρ‹Ρ‡Π°ΠΉΠ½ΠΎ Π½Π°Π²ΠΎΡ€ΠΎΡ‡Π΅Π½Π½Ρ‹Π΅ процСссы, яростноС ΠΏΠ΅Ρ€Π΅ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Π½ΠΈΠ΅ ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½Ρ‚ΠΎΠ² ΠΈ Π΄Ρ€ΡƒΠ³ΠΈΠ΅ ΠΎΡ‡Π΅Π½ΡŒ-Π²Π°ΠΆΠ½Ρ‹Π΅-энтСрпрайз-Ρ„ΠΈΡˆΠ΅Ρ‡ΠΊΠΈ. ΠŸΡ€ΠΎ Ρ‚ΠΎ Ρ‡Ρ‚ΠΎ стоит ΠΎΠ½Π°, ΠΊΠ°ΠΊ ΠΊΡ€Ρ‹Π»ΠΎ Airbus A380/Π³ΠΎΠ΄, ΠΌΡ‹ ΠΏΡ€ΠΎΠΌΠΎΠ»Ρ‡ΠΈΠΌ.

    ΠžΡΡ‚ΠΎΡ€ΠΎΠΆΠ½ΠΎ, ΡΠΊΡ€ΠΈΠ½ΡˆΠΎΡ‚ ΠΌΠΎΠΆΠ΅Ρ‚ ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ людям младшС 30 Π½Π΅ΠΌΠ½ΠΎΠ³ΠΎ больно

    Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

  • SQL Server Integration Server β€” этим Ρ‚ΠΎΠ²Π°Ρ€ΠΈΡ‰Π΅ΠΌ ΠΌΡ‹ пользовались Π² своих Π²Π½ΡƒΡ‚Ρ€ΠΈΠΏΡ€ΠΎΠ΅ΠΊΡ‚Π½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠ°Ρ…. Ну Π° Π² самом Π΄Π΅Π»Π΅: SQL Server ΠΌΡ‹ ΡƒΠΆΠ΅ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ, ΠΈ Π½Π΅ ΡŽΠ·Π°Ρ‚ΡŒ Π΅Π³ΠΎ ETL-Ρ‚ΡƒΠ»Π·Ρ‹ Π±Ρ‹Π»ΠΎ Π±Ρ‹ ΠΊΠ°ΠΊ-Ρ‚ΠΎ Π½Π΅Ρ€Π°Π·ΡƒΠΌΠ½ΠΎ. Всё Π² Π½Ρ‘ΠΌ Π² Ρ…ΠΎΡ€ΠΎΡˆΠΎ: ΠΈ интСрфСйс красивый, ΠΈ ΠΎΡ‚Ρ‡Ρ‘Ρ‚ΠΈΠΊΠΈ выполнСния… Но Π½Π΅ Π·Π° это ΠΌΡ‹ любим ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΠ΄ΡƒΠΊΡ‚Ρ‹, ΠΎΡ… Π½Π΅ Π·Π° это. Π’Π΅Ρ€ΡΠΈΠΎΠ½ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ Π΅Π³ΠΎ dtsx (ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ прСдставляСт собой XML с ΠΏΠ΅Ρ€Π΅ΠΌΠ΅ΡˆΠΈΠ²Π°ΡŽΡ‰ΠΈΠΌΠΈΡΡ ΠΏΡ€ΠΈ сохранСнии Π½ΠΎΠ΄Π°ΠΌΠΈ) ΠΌΡ‹ ΠΌΠΎΠΆΠ΅ΠΌ, Π° Ρ‚ΠΎΠ»ΠΊΡƒ? А ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ ΠΏΠ°ΠΊΠ΅Ρ‚ тасков, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΠΏΠ΅Ρ€Π΅Ρ‚Π°Ρ‰ΠΈΡ‚ ΡΠΎΡ‚Π½ΡŽ Ρ‚Π°Π±Π»ΠΈΡ† с ΠΎΠ΄Π½ΠΎΠ³ΠΎ сСрвСра Π½Π° Π΄Ρ€ΡƒΠ³ΠΎΠΉ? Π”Π° Ρ‡Ρ‚ΠΎ ΡΠΎΡ‚Π½ΡŽ, Ρƒ вас ΠΎΡ‚ Π΄Π²Π°Π΄Ρ†Π°Ρ‚ΠΈ ΡˆΡ‚ΡƒΠΊ отвалится ΡƒΠΊΠ°Π·Π°Ρ‚Π΅Π»ΡŒΠ½Ρ‹ΠΉ ΠΏΠ°Π»Π΅Ρ†, Ρ‰Ρ‘Π»ΠΊΠ°ΡŽΡ‰ΠΈΠΉ ΠΏΠΎ ΠΌΡ‹ΡˆΠΈΠ½ΠΎΠΉ ΠΊΠ½ΠΎΠΏΠΊΠ΅. Но выглядит ΠΎΠ½, ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½Π½ΠΎ, Π±ΠΎΠ»Π΅Π΅ ΠΌΠΎΠ΄Π½ΠΎ:

    Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

ΠœΡ‹ бСзусловно искали Π²Ρ‹Ρ…ΠΎΠ΄Ρ‹. Π”Π΅Π»ΠΎ Π΄Π°ΠΆΠ΅ ΠΏΠΎΡ‡Ρ‚ΠΈ дошло Π΄ΠΎ самописного Π³Π΅Π½Π΅Ρ€Π°Ρ‚ΠΎΡ€Π° SSIS-ΠΏΠ°ΠΊΠ΅Ρ‚ΠΎΠ²…

… Π° ΠΏΠΎΡ‚ΠΎΠΌ мСня нашла новая Ρ€Π°Π±ΠΎΡ‚Π°. А Π½Π° Π½Π΅ΠΉ мСня настиг Apache Airflow.

Когда я ΡƒΠ·Π½Π°Π», Ρ‡Ρ‚ΠΎ описания ETL-процСссов β€” это простой Python-ΠΊΠΎΠ΄, я Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Ρ‡Ρ‚ΠΎ Π½Π΅ плясал ΠΎΡ‚ радости. Π’ΠΎΡ‚ Ρ‚Π°ΠΊ ΠΏΠΎΡ‚ΠΎΠΊΠΈ Π΄Π°Π½Π½Ρ‹Ρ… ΠΏΠΎΠ΄Π²Π΅Ρ€Π³Π»ΠΈΡΡŒ Π²Π΅Ρ€ΡΠΈΠΎΠ½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΡŽ ΠΈ Π΄ΠΈΡ„Ρ„Ρƒ, Π° ΡΡΡ‹ΠΏΠ°Ρ‚ΡŒ Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹ с Π΅Π΄ΠΈΠ½ΠΎΠΉ структурой ΠΈΠ· сотни Π±Π°Π· Π΄Π°Π½Π½Ρ‹Ρ… Π² ΠΎΠ΄ΠΈΠ½ Ρ‚Π°Ρ€Π³Π΅Ρ‚ стало Π΄Π΅Π»ΠΎΠΌ Python-ΠΊΠΎΠ΄Π° Π² ΠΏΠΎΠ»Ρ‚ΠΎΡ€Π°-Π΄Π²Π° 13” экрана.

Π‘ΠΎΠ±ΠΈΡ€Π°Π΅ΠΌ кластСр

Π”Π°Π²Π°ΠΉΡ‚Π΅ Π½Π΅ ΡƒΡΡ‚Ρ€Π°ΠΈΠ²Π°Ρ‚ΡŒ совсСм ΡƒΠΆ дСтский сад, ΠΈ Π½Π΅ Π³ΠΎΠ²ΠΎΡ€ΠΈΡ‚ΡŒ Ρ‚ΡƒΡ‚ ΠΎ ΡΠΎΠ²Π΅Ρ€ΡˆΠ΅Π½Π½ΠΎ ΠΎΡ‡Π΅Π²ΠΈΠ΄Π½Ρ‹Ρ… Π²Π΅Ρ‰Π°Ρ…, Π²Ρ€ΠΎΠ΄Π΅ установки Airflow, Π²Ρ‹Π±Ρ€Π°Π½Π½ΠΎΠΉ Π²Π°ΠΌΠΈ Π‘Π”, Celery ΠΈ Π΄Ρ€ΡƒΠ³ΠΈΡ… Π΄Π΅Π», описанных Π² Π΄ΠΎΠΊΠ°Ρ….

Π§Ρ‚ΠΎΠ±Ρ‹ ΠΌΡ‹ ΠΌΠΎΠ³Π»ΠΈ сразу ΠΏΡ€ΠΈΡΡ‚ΡƒΠΏΠΈΡ‚ΡŒ ΠΊ экспСримСнтам, я набросал docker-compose.yml Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ:

  • ПоднимСм собствСнно Airflow: Scheduler, Webserver. Π’Π°ΠΌ ΠΆΠ΅ Π±ΡƒΠ΄Π΅Ρ‚ крутится Flower для ΠΌΠΎΠ½ΠΈΡ‚ΠΎΡ€ΠΈΠ½Π³Π° Celery-Π·Π°Π΄Π°Ρ‡ (ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π΅Π³ΠΎ ΡƒΠΆΠ΅ Π·Π°Ρ‚ΠΎΠ»ΠΊΠ°Π»ΠΈ Π² apache/airflow:1.10.10-python3.7, Π° ΠΌΡ‹ ΠΈ Π½Π΅ ΠΏΡ€ΠΎΡ‚ΠΈΠ²);
  • PostgreSQL, Π² ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Airflow Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΈΡΠ°Ρ‚ΡŒ свою ΡΠ»ΡƒΠΆΠ΅Π±Π½ΡƒΡŽ ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ (Π΄Π°Π½Π½Ρ‹Π΅ ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊΠ°, статистика выполнСния ΠΈ Ρ‚. Π΄.), Π° Celery β€” ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½Π½Ρ‹Π΅ таски;
  • Redis, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΡΡ‚ΡƒΠΏΠ°Ρ‚ΡŒ Π±Ρ€ΠΎΠΊΠ΅Ρ€ΠΎΠΌ Π·Π°Π΄Π°Ρ‡ для Celery;
  • Celery worker, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΠΈ займСтся нСпосрСдствСнным Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ΠΌ Π·Π°Π΄Π°Ρ‡Π΅ΠΊ.
  • Π’ ΠΏΠ°ΠΏΠΊΡƒ ./dags ΠΌΡ‹ Π±ΡƒΠ΄Π΅Ρ‚ ΡΠΊΠ»Π°Π΄Ρ‹Π²Π°Ρ‚ΡŒ наши Ρ„Π°ΠΉΠ»Ρ‹ с описаниСм Π΄Π°Π³ΠΎΠ². Они Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΠΎΠ΄Ρ…Π²Π°Ρ‚Ρ‹Π²Π°Ρ‚ΡŒΡΡ Π½Π° Π»Π΅Ρ‚Ρƒ, поэтому ΠΏΠ΅Ρ€Π΅Π΄Ρ‘Ρ€Π³ΠΈΠ²Π°Ρ‚ΡŒ вСсь стСк послС ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ Ρ‡ΠΈΡ…Π° Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ.

КоС-Π³Π΄Π΅ ΠΊΠΎΠ΄ Π² ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π°Ρ… ΠΏΡ€ΠΈΠ²Π΅Π΄Π΅Π½ Π½Π΅ ΠΏΠΎΠ»Π½ΠΎΡΡ‚ΡŒΡŽ (Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π½Π΅ Π·Π°Π³Ρ€ΠΎΠΌΠΎΠΆΠ΄Π°Ρ‚ΡŒ тСкст), Π° Π³Π΄Π΅-Ρ‚ΠΎ ΠΎΠ½ модифицируСтся Π² процСссС. Π¦Π΅Π»ΡŒΠ½Ρ‹Π΅ Ρ€Π°Π±ΠΎΡ‚Π°ΡŽΡ‰ΠΈΠ΅ ΠΏΡ€ΠΈΠΌΠ΅Ρ€Ρ‹ ΠΊΠΎΠ΄Π° ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ Π² Ρ€Π΅ΠΏΠΎΠ·ΠΈΡ‚ΠΎΡ€ΠΈΠΈ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

ΠŸΡ€ΠΈΠΌΠ΅Ρ‡Π°Π½ΠΈΡ:

  • Π’ сборкС ΠΊΠΎΠΌΠΏΠΎΠ·Π° я Π²ΠΎ ΠΌΠ½ΠΎΠ³ΠΎΠΌ опирался Π½Π° извСстный ΠΎΠ±Ρ€Π°Π· puckel/docker-airflow – ΠΎΠ±ΡΠ·Π°Ρ‚Π΅Π»ΡŒΠ½ΠΎ посмотритС. ΠœΠΎΠΆΠ΅Ρ‚, Π²Π°ΠΌ Π² ΠΆΠΈΠ·Π½ΠΈ большС Π½ΠΈΡ‡Π΅Π³ΠΎ ΠΈ Π½Π΅ понадобится.
  • ВсС настройки Airflow доступны Π½Π΅ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Ρ‡Π΅Ρ€Π΅Π· airflow.cfg, Π½ΠΎ ΠΈ Ρ‡Π΅Ρ€Π΅Π· ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Π΅ срСды (слава Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠ°ΠΌ), Ρ‡Π΅ΠΌ я злостно воспользовался.
  • ЕстСствСнно, ΠΎΠ½ Π½Π΅ production-ready: я Π½Π°ΠΌΠ΅Ρ€Π΅Π½Π½ΠΎ Π½Π΅ ставил heartbeats Π½Π° ΠΊΠΎΠ½Ρ‚Π΅ΠΉΠ½Π΅Ρ€Ρ‹, Π½Π΅ заморачивался с Π±Π΅Π·ΠΎΠΏΠ°ΡΠ½ΠΎΡΡ‚ΡŒΡŽ. Но ΠΌΠΈΠ½ΠΈΠΌΡƒΠΌ, подходящий для Π½Π°ΡˆΠΈΡ… экспСримСнтиков я сдСлал.
  • ΠžΠ±Ρ€Π°Ρ‚ΠΈΡ‚Π΅ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅, Ρ‡Ρ‚ΠΎ:
    • Папка с Π΄Π°Π³Π°ΠΌΠΈ Π΄ΠΎΠ»ΠΆΠ½Π° Π±Ρ‹Ρ‚ΡŒ доступна ΠΊΠ°ΠΊ ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊΡƒ, Ρ‚Π°ΠΊ ΠΈ Π²ΠΎΡ€ΠΊΠ΅Ρ€Π°ΠΌ.
    • Π’ΠΎ ΠΆΠ΅ самоС касаСтся ΠΈ всСх сторонних Π±ΠΈΠ±Π»ΠΈΠΎΡ‚Π΅ΠΊ β€” ΠΎΠ½ΠΈ всС Π΄ΠΎΠ»ΠΆΠ½Ρ‹ Π±Ρ‹Ρ‚ΡŒ установлСны Π½Π° ΠΌΠ°ΡˆΠΈΠ½Ρ‹ с ΡˆΠ΅Π΄ΡƒΠ»Π΅Ρ€ΠΎΠΌ ΠΈ Π²ΠΎΡ€ΠΊΠ΅Ρ€Π°ΠΌΠΈ.

Ну Π° Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ просто:

$ docker-compose up --scale worker=3

ПослС Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ всё поднимСтся, ΠΌΠΎΠΆΠ½ΠΎ ΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ Π½Π° Π²Π΅Π±-интСрфСйсы:

ΠžΡΠ½ΠΎΠ²Π½Ρ‹Π΅ понятия

Если Π²Ρ‹ Π½ΠΈΡ‡Π΅Π³ΠΎ Π½Π΅ поняли Π²ΠΎ всСх этих Β«Π΄Π°Π³Π°Ρ…Β», Ρ‚ΠΎ Π²ΠΎΡ‚ ΠΊΡ€Π°Ρ‚ΠΊΠΈΠΉ словарик:

  • Scheduler β€” самый Π³Π»Π°Π²Π½Ρ‹ΠΉ дядька Π² Airflow, ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»ΠΈΡ€ΡƒΡŽΡ‰ΠΈΠΉ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π²ΠΊΠ°Π»Ρ‹Π²Π°Π»ΠΈ Ρ€ΠΎΠ±ΠΎΡ‚Ρ‹, Π° Π½Π΅ Ρ‡Π΅Π»ΠΎΠ²Π΅ΠΊ: слСдит Π·Π° расписаниСм, обновляСт Π΄Π°Π³ΠΈ, запускаСт таски.

    Π’ΠΎΠΎΠ±Ρ‰Π΅, Π² старых вСрсиях, Ρƒ Π½Π΅Π³ΠΎ Π±Ρ‹Π»ΠΈ ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΡ‹ с ΠΏΠ°ΠΌΡΡ‚ΡŒΡŽ (Π½Π΅Ρ‚, Π½Π΅ амнСзия, Π° ΡƒΡ‚Π΅Ρ‡ΠΊΠΈ) ΠΈ Π² ΠΊΠΎΠ½Ρ„ΠΈΠ³Π°Ρ… Π΄Π°ΠΆΠ΅ остался лСгаси-ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ run_duration β€” ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π» Π΅Π³ΠΎ пСрСзапуска. Но сСйчас всё Ρ…ΠΎΡ€ΠΎΡˆΠΎ.

  • DAG (ΠΎΠ½ ΠΆΠ΅ Β«Π΄Π°Π³Β») β€” Β«Π½Π°ΠΏΡ€Π°Π²Π»Π΅Π½Π½Ρ‹ΠΉ Π°Ρ†ΠΈΠΊΠ»ΠΈΡ‡Π½Ρ‹ΠΉ Π³Ρ€Π°Ρ„Β», Π½ΠΎ Ρ‚Π°ΠΊΠΎΠ΅ ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½ΠΈΠ΅ ΠΌΠ°Π»ΠΎ ΠΊΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ скаТСт, Π° ΠΏΠΎ сути это ΠΊΠΎΠ½Ρ‚Π΅ΠΉΠ½Π΅Ρ€ для Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΡ… Π΄Ρ€ΡƒΠ³ с Π΄Ρ€ΡƒΠ³ΠΎΠΌ тасков (см. Π½ΠΈΠΆΠ΅) ΠΈΠ»ΠΈ Π°Π½Π°Π»ΠΎΠ³ Package Π² SSIS ΠΈ Workflow Π² Informatica.

    Помимо Π΄Π°Π³ΠΎΠ² Π΅Ρ‰Π΅ ΠΌΠΎΠ³ΡƒΡ‚ Π±Ρ‹Ρ‚ΡŒ сабдаги, Π½ΠΎ ΠΌΡ‹ Π΄ΠΎ Π½ΠΈΡ… скорСС всСго Π½Π΅ добСрёмся.

  • DAG Run β€” ΠΈΠ½ΠΈΡ†ΠΈΠ°Π»ΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹ΠΉ Π΄Π°Π³, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌΡƒ присвоСн свой execution_date. Π”Π°Π³Ρ€Π°Π½Ρ‹ ΠΎΠ΄Π½ΠΎΠ³ΠΎ Π΄Π°Π³Π° ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΏΠΎΠ»Π½Π΅ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ ΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΡŒΠ½ΠΎ (Ссли Π²Ρ‹, ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎ, сдСлали свои таски ΠΈΠ΄Π΅ΠΌΠΏΠΎΡ‚Π΅Π½Ρ‚Π½Ρ‹ΠΌΠΈ).
  • Operator β€” это кусочки ΠΊΠΎΠ΄Π°, отвСтствСнныС Π·Π° Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ ΠΊΠ°ΠΊΠΎΠ³ΠΎ-Π»ΠΈΠ±ΠΎ ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½ΠΎΠ³ΠΎ дСйствия. Π•ΡΡ‚ΡŒ Ρ‚Ρ€ΠΈ Ρ‚ΠΈΠΏΠ° ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€ΠΎΠ²:
    • action, ΠΊΠ°ΠΊ Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€ наш Π»ΡŽΠ±ΠΈΠΌΡ‹ΠΉ PythonOperator, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π² силах Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ любой (Π²Π°Π»ΠΈΠ΄Π½Ρ‹ΠΉ) Python-ΠΊΠΎΠ΄;
    • transfer, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ пСрСвозят Π΄Π°Π½Π½Ρ‹Π΅ с мСста Π½Π° мСсто, скаТСм, MsSqlToHiveTransfer;
    • sensor ΠΆΠ΅ ΠΏΠΎΠ·Π²ΠΎΠ»ΠΈΡ‚ Ρ€Π΅Π°Π³ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ ΠΈΠ»ΠΈ ΠΏΡ€ΠΈΡ‚ΠΎΡ€ΠΌΠΎΠ·ΠΈΡ‚ΡŒ дальнСйшСС Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ Π΄Π°Π³Π° Π΄ΠΎ наступлСния ΠΊΠ°ΠΊΠΎΠ³ΠΎ-Π»ΠΈΠ±ΠΎ события. HttpSensor ΠΌΠΎΠΆΠ΅Ρ‚ Π΄Π΅Ρ€Π³Π°Ρ‚ΡŒ ΡƒΠΊΠ°Π·Π°Π½Π½Ρ‹ΠΉ эндпойнт, ΠΈ ΠΊΠΎΠ³Π΄Π° доТдСтся Π½ΡƒΠΆΠ½Ρ‹ΠΉ ΠΎΡ‚Π²Π΅Ρ‚, Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ трансфСр GoogleCloudStorageToS3Operator. ΠŸΡ‹Ρ‚Π»ΠΈΠ²Ρ‹ΠΉ ΡƒΠΌ спросит: Β«Π·Π°Ρ‡Π΅ΠΌ? Π’Π΅Π΄ΡŒ ΠΌΠΎΠΆΠ½ΠΎ Π΄Π΅Π»Π°Ρ‚ΡŒ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Ρ‹ прямо Π² ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Π΅!Β» А Π·Π°Ρ‚Π΅ΠΌ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π½Π΅ Π·Π°Π±ΠΈΠ²Π°Ρ‚ΡŒ ΠΏΡƒΠ» тасков подвисшими ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Π°ΠΌΠΈ. БСнсор запускаСтся, провСряСт ΠΈ ΡƒΠΌΠΈΡ€Π°Π΅Ρ‚ Π΄ΠΎ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰Π΅ΠΉ ΠΏΠΎΠΏΡ‹Ρ‚ΠΊΠΈ.
  • Task β€” ΠΎΠ±ΡŠΡΠ²Π»Π΅Π½Π½Ρ‹Π΅ ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Ρ‹ Π²Π½Π΅ зависимости ΠΎΡ‚ Ρ‚ΠΈΠΏΠ° ΠΈ ΠΏΡ€ΠΈΠΊΡ€Π΅ΠΏΠ»Π΅Π½Π½Ρ‹Π΅ ΠΊ Π΄Π°Π³Ρƒ ΠΏΠΎΠ²Ρ‹ΡˆΠ°ΡŽΡ‚ΡΡ Π΄ΠΎ Ρ‡ΠΈΠ½Π° таска.
  • Task instance β€” ΠΊΠΎΠ³Π΄Π° Π³Π΅Π½Π΅Ρ€Π°Π»-ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊ Ρ€Π΅ΡˆΠΈΠ», Ρ‡Ρ‚ΠΎ таски ΠΏΠΎΡ€Π° ΠΎΡ‚ΠΏΡ€Π°Π²Π»ΡΡ‚ΡŒ Π² Π±ΠΎΠΉ Π½Π° исполнитСли-Π²ΠΎΡ€ΠΊΠ΅Ρ€Ρ‹ (прямо Π½Π° мСстС, Ссли ΠΌΡ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ LocalExecutor ΠΈΠ»ΠΈ Π½Π° ΡƒΠ΄Π°Π»Ρ‘Π½Π½ΡƒΡŽ Π½ΠΎΠ΄Ρƒ Π² случаС с CeleryExecutor), ΠΎΠ½ Π½Π°Π·Π½Π°Ρ‡Π°Π΅Ρ‚ ΠΈΠΌ контСкст (Ρ‚. Π΅. ΠΊΠΎΠΌΠΏΠ»Π΅ΠΊΡ‚ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Ρ… β€” ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ΠΎΠ² выполнСния), Ρ€Π°Π·Π²ΠΎΡ€Π°Ρ‡ΠΈΠ²Π°Π΅Ρ‚ ΡˆΠ°Π±Π»ΠΎΠ½Ρ‹ ΠΊΠΎΠΌΠ°Π½Π΄ ΠΈΠ»ΠΈ запросов ΠΈ складываСт ΠΈΡ… Π² ΠΏΡƒΠ».

Π“Π΅Π½Π΅Ρ€ΠΈΡ€ΡƒΠ΅ΠΌ таски

Π‘ΠΏΠ΅Ρ€Π²Π° ΠΎΠ±ΠΎΠ·Π½Π°Ρ‡ΠΈΠΌ ΠΎΠ±Ρ‰ΡƒΡŽ схСму нашСго Π΄Π°Π³Π°, Π° Π·Π°Ρ‚Π΅ΠΌ Π±ΡƒΠ΄Π΅ΠΌ всё большС ΠΈ большС ΠΏΠΎΠ³Ρ€ΡƒΠΆΠ°Ρ‚ΡŒΡΡ Π² Π΄Π΅Ρ‚Π°Π»ΠΈ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ ΠΌΡ‹ примСняСм Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π½Π΅Ρ‚Ρ€ΠΈΠ²ΠΈΠ°Π»ΡŒΠ½Ρ‹Π΅ Ρ€Π΅ΡˆΠ΅Π½ΠΈΡ.

Π˜Ρ‚Π°ΠΊ, Π² ΠΏΡ€ΠΎΡΡ‚Π΅ΠΉΡˆΠ΅ΠΌ Π²ΠΈΠ΄Π΅ ΠΏΠΎΠ΄ΠΎΠ±Π½Ρ‹ΠΉ Π΄Π°Π³ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹Π³Π»ΡΠ΄Π΅Ρ‚ΡŒ Ρ‚Π°ΠΊ:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Π”Π°Π²Π°ΠΉΡ‚Π΅ Ρ€Π°Π·Π±ΠΈΡ€Π°Ρ‚ΡŒΡΡ:

  • Π‘ΠΏΠ΅Ρ€Π²Π° ΠΈΠΌΠΏΠΎΡ€Ρ‚ΠΈΡ€ΡƒΠ΅ΠΌ Π½ΡƒΠΆΠ½Ρ‹Π΅ Π»ΠΈΠ±Ρ‹ ΠΈ ΠΊΠΎΠ΅ Ρ‡Ρ‚ΠΎ Π΅Ρ‰Ρ‘;
  • sql_server_ds β€” это List[namedtuple[str, str]] с ΠΈΠΌΠ΅Π½Π°ΠΌΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΠ² ΠΈΠ· Airflow Connections ΠΈ Π±Π°Π·Π°ΠΌΠΈ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… ΠΌΡ‹ Π±ΡƒΠ΄Π΅ΠΌ Π·Π°Π±ΠΈΡ€Π°Ρ‚ΡŒ Π½Π°ΡˆΡƒ Ρ‚Π°Π±Π»ΠΈΡ‡ΠΊΡƒ;
  • dag β€” объявлСниС нашСго Π΄Π°Π³Π°, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ ΠΎΠ±ΡΠ·Π°Ρ‚Π΅Π»ΡŒΠ½ΠΎ Π΄ΠΎΠ»ΠΆΠ½ΠΎ Π»Π΅ΠΆΠ°Ρ‚ΡŒ Π² globals(), ΠΈΠ½Π°Ρ‡Π΅ Airflow Π΅Π³ΠΎ Π½Π΅ Π½Π°ΠΉΠ΄Π΅Ρ‚. Π”Π°Π³Ρƒ Ρ‚Π°ΠΊΠΆΠ΅ Π½ΡƒΠΆΠ½ΠΎ ΡΠΊΠ°Π·Π°Ρ‚ΡŒ:
    • Ρ‡Ρ‚ΠΎ Π΅Π³ΠΎ Π·ΠΎΠ²ΡƒΡ‚ orders β€” это имя ΠΏΠΎΡ‚ΠΎΠΌ Π±ΡƒΠ΄Π΅Ρ‚ ΠΌΠ°ΡΡ‡ΠΈΡ‚ΡŒ Π² Π²Π΅Π±-интСрфСйсС,
    • Ρ‡Ρ‚ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ ΠΎΠ½ Π±ΡƒΠ΄Π΅Ρ‚, начиная с ΠΏΠΎΠ»ΡƒΠ½ΠΎΡ‡ΠΈ восьмого июля,
    • Π° Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ ΠΎΠ½ Π΄ΠΎΠ»ΠΆΠ΅Π½, ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π½ΠΎ ΠΊΠ°ΠΆΠ΄Ρ‹Π΅ 6 часов (для ΠΊΡ€ΡƒΡ‚Ρ‹Ρ… ΠΏΠ°Ρ€Π½Π΅ΠΉ здСсь вмСсто timedelta() допустима cron-строка 0 0 0/6 ? * * *, для ΠΌΠ΅Π½Π΅Π΅ ΠΊΡ€ΡƒΡ‚Ρ‹Ρ… β€” Π²Ρ‹Ρ€Π°ΠΆΠ΅Π½ΠΈΠ΅ Π²Ρ€ΠΎΠ΄Π΅ @daily);
  • workflow() Π±ΡƒΠ΄Π΅Ρ‚ Π΄Π΅Π»Π°Ρ‚ΡŒ ΠΎΡΠ½ΠΎΠ²Π½ΡƒΡŽ Ρ€Π°Π±ΠΎΡ‚Ρƒ, Π½ΠΎ Π½Π΅ сСйчас. БСйчас ΠΌΡ‹ просто высыпСм наш контСкст Π² Π»ΠΎΠ³.
  • А Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ простая магия создания тасков:
    • ΠΏΡ€ΠΎΠ±Π΅Π³Π°Π΅ΠΌ ΠΏΠΎ нашим источникам;
    • ΠΈΠ½ΠΈΡ†ΠΈΠ°Π»ΠΈΠ·ΠΈΡ€ΡƒΠ΅ΠΌ PythonOperator, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡ‚ΡŒ Π½Π°ΡˆΡƒ ΠΏΡƒΡΡ‚Ρ‹ΡˆΠΊΡƒ workflow(). НС Π·Π°Π±Ρ‹Π²Π°ΠΉΡ‚Π΅ ΡƒΠΊΠ°Π·Ρ‹Π²Π°Ρ‚ΡŒ ΡƒΠ½ΠΈΠΊΠ°Π»ΡŒΠ½ΠΎΠ΅ (Π² Ρ€Π°ΠΌΠΊΠ°Ρ… Π΄Π°Π³Π°) имя таска ΠΈ ΠΏΠΎΠ΄Π²ΡΠ·Ρ‹Π²Π°Ρ‚ΡŒ сам Π΄Π°Π³. Π€Π»Π°Π³ provide_context Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ насыпСт Π² Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡŽ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½Ρ‹Ρ… Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ², ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΌΡ‹ Π±Π΅Ρ€Π΅ΠΆΠ½ΠΎ собСрём с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ **context.

Пока Π½Π° этом всё. Π§Ρ‚ΠΎ ΠΌΡ‹ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ»ΠΈ:

  • Π½ΠΎΠ²Ρ‹ΠΉ Π΄Π°Π³ Π² Π²Π΅Π±-интСрфСйсС,
  • ΠΏΠΎΠ»Ρ‚ΠΎΡ€Ρ‹ сотни тасков, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡ‚ΡŒΡΡ ΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΡŒΠ½ΠΎ (Ссли Ρ‚ΠΎ позволят настройки Airflow, Celery ΠΈ мощности сСрвСров).

Ну, ΠΏΠΎΡ‡Ρ‚ΠΈ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ»ΠΈ.

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅
Зависимости ΠΊΡ‚ΠΎ Π±ΡƒΠ΄Π΅Ρ‚ ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ?

Π§Ρ‚ΠΎΠ±Ρ‹ всё это Π΄Π΅Π»ΠΎ ΡƒΠΏΡ€ΠΎΡΡ‚ΠΈΡ‚ΡŒ я вкорячил Π² docker-compose.yml ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ requirements.txt Π½Π° всСх Π½ΠΎΠ΄Π°Ρ….

Π’ΠΎΡ‚ Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ понСслась:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Π‘Π΅Ρ€Ρ‹Π΅ ΠΊΠ²Π°Π΄Ρ€Π°Ρ‚ΠΈΠΊΠΈ β€” task instances, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π°Π½Π½Ρ‹Π΅ ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊΠΎΠΌ.

НСмного ΠΆΠ΄Π΅ΠΌ, Π·Π°Π΄Π°Ρ‡ΠΈ Ρ€Π°ΡΡ…Π²Π°Ρ‚Ρ‹Π²Π°ΡŽΡ‚ Π²ΠΎΡ€ΠΊΠ΅Ρ€Ρ‹:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Π—Π΅Π»Π΅Π½Ρ‹Π΅, понятноС Π΄Π΅Π»ΠΎ, β€” ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π²ΡˆΠΈΠ΅. ΠšΡ€Π°ΡΠ½Ρ‹Π΅ β€” Π½Π΅ ΠΎΡ‡Π΅Π½ΡŒ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ.

ΠšΡΡ‚Π°Ρ‚ΠΈ, Π½Π° нашСм ΠΏΡ€ΠΎΠ΄Π΅ Π½ΠΈΠΊΠ°ΠΊΠΎΠΉ ΠΏΠ°ΠΏΠΊΠΈ ./dags, ΡΠΈΠ½Ρ…Ρ€ΠΎΠ½ΠΈΠ·ΠΈΡ€ΡƒΡŽΡ‰Π΅ΠΉΡΡ ΠΌΠ΅ΠΆΠ΄Ρƒ машинами Π½Π΅Ρ‚ β€” всё Π΄Π°Π³ΠΈ Π»Π΅ΠΆΠ°Ρ‚ Π² git Π½Π° нашСм Gitlab, Π° Gitlab CI раскладываСт обновлСния Π½Π° ΠΌΠ°ΡˆΠΈΠ½Ρ‹ ΠΏΡ€ΠΈ ΠΌΡ‘Ρ€Π΄ΠΆΠ΅ Π² master.

НСмного о Flower

Пока Π²ΠΎΡ€ΠΊΠ΅Ρ€Ρ‹ молотят наши тасочки-ΠΏΡƒΡΡ‚Ρ‹ΡˆΠΊΠΈ, вспомним ΠΏΡ€ΠΎ Π΅Ρ‰Π΅ ΠΎΠ΄ΠΈΠ½ инструмСнт, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΠΌΠΎΠΆΠ΅Ρ‚ Π½Π°ΠΌ ΠΊΠΎΠ΅-Ρ‡Ρ‚ΠΎ ΠΏΠΎΠΊΠ°Π·Π°Ρ‚ΡŒ β€” Flower.

Бамая пСрвая страничка с суммарной ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠ΅ΠΉ ΠΏΠΎ Π½ΠΎΠ΄Π°ΠΌ-Π²ΠΎΡ€ΠΊΠ΅Ρ€Π°ΠΌ:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Бамая насыщСнная страничка с Π·Π°Π΄Π°Ρ‡Π°ΠΌΠΈ, ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΠ²ΡˆΠΈΠΌΠΈΡΡ Π² Ρ€Π°Π±ΠΎΡ‚Ρƒ:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Бамая скучная страничка с состояниСм нашСго Π±Ρ€ΠΎΠΊΠ΅Ρ€Π°:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Бамая яркая страничка β€” с Π³Ρ€Π°Ρ„ΠΈΠΊΠ°ΠΌΠΈ состояния тасков ΠΈ ΠΈΡ… Π²Ρ€Π΅ΠΌΠ΅Π½Π΅ΠΌ выполнСния:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Π”ΠΎΠ³Ρ€ΡƒΠΆΠ°Π΅ΠΌ Π½Π΅Π΄ΠΎΠ³Ρ€ΡƒΠΆΠ΅Π½Π½ΠΎΠ΅

Π˜Ρ‚Π°ΠΊ, всС таски ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π»ΠΈ, ΠΌΠΎΠΆΠ½ΠΎ ΡƒΠ½ΠΎΡΠΈΡ‚ΡŒ Ρ€Π°Π½Π΅Π½Ρ‹Ρ….

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

А Ρ€Π°Π½Π΅Π½Ρ‹Ρ… оказалось Π½Π΅ΠΌΠ°Π»ΠΎ β€” ΠΏΠΎ Ρ‚Π΅ΠΌ ΠΈΠ»ΠΈ ΠΈΠ½Ρ‹ΠΌ ΠΏΡ€ΠΈΡ‡ΠΈΠ½Π°ΠΌΠΈ. Π’ случаС ΠΏΡ€Π°Π²ΠΈΠ»ΡŒΠ½ΠΎΠ³ΠΎ использования Airflow Π²ΠΎΡ‚ эти самыС ΠΊΠ²Π°Π΄Ρ€Π°Ρ‚Ρ‹ говорят ΠΎ Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ Π΄Π°Π½Π½Ρ‹Π΅ ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½Π½ΠΎ Π½Π΅ Π΄ΠΎΠ΅Ρ…Π°Π»ΠΈ.

НуТно ΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ Π»ΠΎΠ³ ΠΈ ΠΏΠ΅Ρ€Π΅Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ ΡƒΠΏΠ°Π²ΡˆΠΈΠ΅ task instances.

Жмякнув Π½Π° любой ΠΊΠ²Π°Π΄Ρ€Π°Ρ‚, ΡƒΠ²ΠΈΠ΄ΠΈΠΌ доступныС Π½Π°ΠΌ дСйствия:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

МоТно Π²Π·ΡΡ‚ΡŒ, ΠΈ ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ Clear ΡƒΠΏΠ°Π²ΡˆΠ΅ΠΌΡƒ. Π’ΠΎ Π΅ΡΡ‚ΡŒ, ΠΌΡ‹ Π·Π°Π±Ρ‹Π²Π°Π΅ΠΌ ΠΎ Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ Ρ‚Π°ΠΌ Ρ‡Ρ‚ΠΎ-Ρ‚ΠΎ завалилось, ΠΈ Ρ‚ΠΎΡ‚ ΠΆΠ΅ самый инстанс таска ΡƒΠΉΠ΄Π΅Ρ‚ ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊΡƒ.

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

ΠŸΠΎΠ½ΡΡ‚Π½ΠΎ, Ρ‡Ρ‚ΠΎ Π΄Π΅Π»Π°Ρ‚ΡŒ Ρ‚Π°ΠΊ ΠΌΡ‹ΡˆΠΊΠΎΠΉ со всСми красными ΠΊΠ²Π°Π΄Ρ€Π°Ρ‚Π°ΠΌΠΈ Π½Π΅ ΠΎΡ‡Π΅Π½ΡŒ Π³ΡƒΠΌΠ°Π½Π½ΠΎ β€” Π½Π΅ этого ΠΌΡ‹ ΠΆΠ΄Π΅ΠΌ ΠΎΡ‚ Airflow. ЕстСствСнно, Ρƒ нас Π΅ΡΡ‚ΡŒ ΠΎΡ€ΡƒΠΆΠΈΠ΅ массового пораТСния: Browse/Task Instances

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Π’Ρ‹Π±Π΅Ρ€Π΅ΠΌ всё Ρ€Π°Π·ΠΎΠΌ ΠΈ ΠΎΠ±Π½ΡƒΠ»ΠΈΠΌ Π½Π°ΠΆΠΌΠ΅ΠΌ ΠΏΡ€Π°Π²ΠΈΠ»ΡŒΠ½Ρ‹ΠΉ ΠΏΡƒΠ½ΠΊΡ‚:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

ПослС очистки наши такси выглядят Ρ‚Π°ΠΊ (ΠΎΠ½ΠΈ ΡƒΠΆΠ΅ ΠΆΠ΄ΡƒΡ‚ Π½Π΅ доТдутся, ΠΊΠΎΠ³Π΄Π° ΡˆΠ΅Π΄ΡƒΠ»Π΅Ρ€ ΠΈΡ… Π·Π°ΠΏΠ»Π°Π½ΠΈΡ€ΡƒΠ΅Ρ‚):

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

БоСдинСния, Ρ…ΡƒΠΊΠΈ ΠΈ ΠΏΡ€ΠΎΡ‡ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Π΅

Π‘Π°ΠΌΠΎΠ΅ врСмя ΠΏΠΎΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ Π½Π° ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΉ DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

ВсС вСдь ΠΊΠΎΠ³Π΄Π°-Π½ΠΈΠ±ΡƒΠ΄ΡŒ Π΄Π΅Π»Π°Π»ΠΈ обновлялку ΠΎΡ‚Ρ‡Π΅Ρ‚ΠΎΠ²? Π­Ρ‚ΠΎ снова ΠΎΠ½Π°: Π΅ΡΡ‚ΡŒ список источников, ΠΎΡ‚ΠΊΡƒΠ΄Π° Π·Π°Π±Ρ€Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅; Π΅ΡΡ‚ΡŒ список, ΠΊΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ; Π½Π΅ Π·Π°Π±Ρ‹Π²Π°Π΅ΠΌ ΠΏΠΎΡΠΈΠ³Π½Π°Π»ΠΈΡ‚ΡŒ, ΠΊΠΎΠ³Π΄Π° всё ΡΠ»ΡƒΡ‡ΠΈΠ»ΠΎΡΡŒ ΠΈΠ»ΠΈ сломалось (Π½Ρƒ это Π½Π΅ ΠΏΡ€ΠΎ нас, Π½Π΅Ρ‚).

Π”Π°Π²Π°ΠΉΡ‚Π΅ снова пройдСмся ΠΏΠΎ Ρ„Π°ΠΉΠ»Ρƒ ΠΈ посмотрим Π½Π° Π½ΠΎΠ²Ρ‹Π΅ нСпонятныС ΡˆΡ‚ΡƒΠΊΠΈ:

  • from commons.operators import TelegramBotSendMessage β€” Π½Π°ΠΌ Π½ΠΈΡ‡Ρ‚ΠΎ Π½Π΅ ΠΌΠ΅ΡˆΠ°Π΅Ρ‚ Π΄Π΅Π»Π°Ρ‚ΡŒ свои ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Ρ‹, Ρ‡Π΅ΠΌ ΠΌΡ‹ ΠΈ воспользовались, сдСлав Π½Π΅Π±ΠΎΠ»ΡŒΡˆΡƒΡŽ ΠΎΠ±Ρ‘Ρ€Ρ‚ΠΎΡ‡ΠΊΡƒ для ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ сообщСний Π² Π Π°Π·Π±Π»ΠΎΠΊΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹ΠΉ. (Об этом ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Π΅ ΠΌΡ‹ Π΅Ρ‰Π΅ ΠΏΠΎΠ³ΠΎΠ²ΠΎΡ€ΠΈΠΌ Π½ΠΈΠΆΠ΅);
  • default_args={} β€” Π΄Π°Π³ ΠΌΠΎΠΆΠ΅Ρ‚ Ρ€Π°Π·Π΄Π°Π²Π°Ρ‚ΡŒ ΠΎΠ΄Π½ΠΈ ΠΈ Ρ‚Π΅ ΠΆΠ΅ Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Ρ‹ всСм своим ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Π°ΠΌ;
  • to='{{ var.value.all_the_kings_men }}' β€” ΠΏΠΎΠ»Π΅ to Ρƒ нас Π±ΡƒΠ΄Π΅Ρ‚ Π½Π΅ Π·Π°Ρ…Π°Ρ€Π΄ΠΊΠΎΠΆΠ΅Π½Π½Ρ‹ΠΌ, Π° Ρ„ΠΎΡ€ΠΌΠΈΡ€ΡƒΠ΅ΠΌΡ‹ΠΌ динамичСски с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Jinja ΠΈ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ со списком email-ΠΎΠ², ΠΊΠΎΡ‚ΠΎΡ€ΡƒΡŽ я Π·Π°Π±ΠΎΡ‚Π»ΠΈΠ²ΠΎ ΠΏΠΎΠ»ΠΎΠΆΠΈΠ» Π² Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS β€” условиС запуска ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Π°. Π’ нашСм случаС, письмо ΠΏΠΎΠ»Π΅Ρ‚ΠΈΡ‚ боссам Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Ссли всС зависимости ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π»ΠΈ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ;
  • tg_bot_conn_id='tg_main' β€” Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Ρ‹ conn_id ΠΏΡ€ΠΈΠ½ΠΈΠΌΠ°ΡŽΡ‚ Π² сСбя ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Ρ‹ соСдинСний, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΌΡ‹ создаСм Π² Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED β€” сообщСния Π² Telegram улСтят Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΏΡ€ΠΈ Π½Π°Π»ΠΈΡ‡ΠΈΠΈ ΡƒΠΏΠ°Π²ΡˆΠΈΡ… тасков;
  • task_concurrency=1 β€” Π·Π°ΠΏΡ€Π΅Ρ‰Π°Π΅ΠΌ ΠΎΠ΄Π½ΠΎΠ²Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹ΠΉ запуск Π½Π΅ΡΠΊΠΎΠ»ΡŒΠΊΠΈΡ… task instances ΠΎΠ΄Π½ΠΎΠ³ΠΎ таска. Π’ ΠΏΡ€ΠΎΡ‚ΠΈΠ²Π½ΠΎΠΌ случаС, ΠΌΡ‹ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠΌ ΠΎΠ΄Π½ΠΎΠ²Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹ΠΉ запуск Π½Π΅ΡΠΊΠΎΠ»ΡŒΠΊΠΈΡ… VerticaOperator (смотрящих Π½Π° ΠΎΠ΄Π½Ρƒ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ);
  • report_update >> [email, tg] β€” всС VerticaOperator сойдутся Π² ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ΅ письма ΠΈ сообщСния, Π²ΠΎΡ‚ Ρ‚Π°ΠΊ:
    Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

    Но Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ Ρƒ ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€ΠΎΠ²-Π½ΠΎΡ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ΠΎΠ² стоят Ρ€Π°Π·Π½Ρ‹Π΅ условия запуска, Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π±ΡƒΠ΄Π΅Ρ‚ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΎΠ΄ΠΈΠ½. Π’ Tree View всё выглядит нСсколько ΠΌΠ΅Π½Π΅Π΅ наглядно:
    Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Π‘ΠΊΠ°ΠΆΡƒ ΠΏΠ°Ρ€Ρƒ слов ΠΎ макросах ΠΈ ΠΈΡ… Π΄Ρ€ΡƒΠ·ΡŒΡΡ… β€” ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Ρ….

ΠœΠ°ΠΊΡ€ΠΎΡΡ‹ β€” это Jinja-плСйсхолдСры, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΌΠΎΠ³ΡƒΡ‚ ΠΏΠΎΠ΄ΡΡ‚Π°Π²Π»ΡΡ‚ΡŒ Ρ€Π°Π·Π½ΡƒΡŽ ΠΏΠΎΠ»Π΅Π·Π½ΡƒΡŽ ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ Π² Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Ρ‹ ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€ΠΎΠ². НапримСр, Ρ‚Π°ΠΊ:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} развСрнСтся Π² содСрТимоС ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ контСкста execution_date Π² Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ YYYY-MM-DD: 2020-07-14. Π‘Π°ΠΌΠΎΠ΅ приятноС, Ρ‡Ρ‚ΠΎ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Π΅ контСкста ΠΏΡ€ΠΈΠ±ΠΈΠ²Π°ΡŽΡ‚ΡΡ гвоздями ΠΊ ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½Π½ΠΎΠΌΡƒ инстансу таска (ΠΊΠ²Π°Π΄Ρ€Π°Ρ‚ΠΈΠΊΡƒ Π² Tree View), ΠΈ ΠΏΡ€ΠΈ пСрСзапускС плСйсхолдСры Ρ€Π°ΡΠΊΡ€ΠΎΡŽΡ‚ΡΡ Π² Ρ‚Π΅ ΠΆΠ΅ самыС значСния.

ΠŸΡ€ΠΈΡΠ²ΠΎΠ΅Π½Π½Ρ‹Π΅ значСния ΠΌΠΎΠΆΠ½ΠΎ ΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ ΠΊΠ½ΠΎΠΏΠΊΠΈ Rendered Π½Π° ΠΊΠ°ΠΆΠ΄ΠΎΠΌ таск-инстансС. Π’ΠΎΡ‚ Ρ‚Π°ΠΊ Ρƒ таска с ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΎΠΉ письма:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

А Ρ‚Π°ΠΊ Ρƒ таски с ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΎΠΉ сообщСния:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

ΠŸΠΎΠ»Π½Ρ‹ΠΉ список встроСнных макросов для послСднСй доступной вСрсии доступСн здСсь: Macros Reference

Π‘ΠΎΠ»Π΅Π΅ Ρ‚ΠΎΠ³ΠΎ, с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ², ΠΌΡ‹ ΠΌΠΎΠΆΠ΅ΠΌ ΠΎΠ±ΡŠΡΠ²Π»ΡΡ‚ΡŒ собствСнныС макросы, Π½ΠΎ это ΡƒΠΆΠ΅ совсСм другая история.

Помимо ΠΏΡ€Π΅Π΄ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½Π½Ρ‹Ρ… ΡˆΡ‚ΡƒΠΊ, ΠΌΡ‹ ΠΌΠΎΠΆΠ΅ΠΌ ΠΏΠΎΠ΄ΡΡ‚Π°Π²Π»ΡΡ‚ΡŒ значСния своих ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Ρ… (Π²Ρ‹ΡˆΠ΅ Π² ΠΊΠΎΠ΄Π΅ я ΡƒΠΆΠ΅ этим воспользовался). Π‘ΠΎΠ·Π΄Π°Π΄ΠΈΠΌ Π² Admin/Variables ΠΏΠ°Ρ€Ρƒ ΡˆΡ‚ΡƒΠΊ:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Всё, ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒΡΡ:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Π’ Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΈ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ скаляр, Π° ΠΌΠΎΠΆΠ΅Ρ‚ Π»Π΅ΠΆΠ°Ρ‚ΡŒ ΠΈ JSON. Π’ случаС JSON-Π°:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

просто ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ ΠΏΡƒΡ‚ΡŒ ΠΊ Π½ΡƒΠΆΠ½ΠΎΠΌΡƒ ΠΊΠ»ΡŽΡ‡Ρƒ: {{ var.json.bot_config.bot.token }}.

Π‘ΠΊΠ°ΠΆΡƒ Π±ΡƒΠΊΠ²Π°Π»ΡŒΠ½ΠΎ ΠΎΠ΄Π½ΠΎ слово ΠΈ ΠΏΠΎΠΊΠ°ΠΆΡƒ ΠΎΠ΄ΠΈΠ½ ΡΠΊΡ€ΠΈΠ½ΡˆΠΎΡ‚ ΠΏΡ€ΠΎ соСдинСния. Π’ΡƒΡ‚ всё элСмСнтарно: Π½Π° страницС Admin/Connections создаСм соСдинСниС, складываСм Ρ‚ΡƒΠ΄Π° наши Π»ΠΎΠ³ΠΈΠ½Ρ‹/ΠΏΠ°Ρ€ΠΎΠ»ΠΈ ΠΈ Π±ΠΎΠ»Π΅Π΅ спСцифичныС ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Ρ‹. Π’ΠΎΡ‚ Ρ‚Π°ΠΊ:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

ΠŸΠ°Ρ€ΠΎΠ»ΠΈ ΠΌΠΎΠΆΠ½ΠΎ ΡˆΠΈΡ„Ρ€ΠΎΠ²Π°Ρ‚ΡŒ (Π±ΠΎΠ»Π΅Π΅ Ρ‚Ρ‰Π°Ρ‚Π΅Π»ΡŒΠ½ΠΎ, Ρ‡Π΅ΠΌ Π² Π²Π°Ρ€ΠΈΠ°Π½Ρ‚Π΅ ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ), Π° ΠΌΠΎΠΆΠ½ΠΎ Π½Π΅ ΡƒΠΊΠ°Π·Ρ‹Π²Π°Ρ‚ΡŒ Ρ‚ΠΈΠΏ соСдинСния (ΠΊΠ°ΠΊ я сдСлал для tg_main) β€” Π΄Π΅Π»ΠΎ Π² Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ список Ρ‚ΠΈΠΏΠΎΠ² Π·Π°ΡˆΠΈΡ‚ Π² модСлях Airflow ΠΈ Ρ€Π°ΡΡˆΠΈΡ€Π΅Π½ΠΈΡŽ Π±Π΅Π· влСзания Π² исходники Π½Π΅ поддаСтся (Ссли Π²Π΄Ρ€ΡƒΠ³ я Ρ‡Π΅Π³ΠΎ-Ρ‚ΠΎ Π½Π΅ Π΄ΠΎΠ³ΡƒΠ³Π»ΠΈΠ» β€” ΠΏΡ€ΠΎΡˆΡƒ мСня ΠΏΠΎΠΏΡ€Π°Π²ΠΈΡ‚ΡŒ), Π½ΠΎ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ ΠΊΡ€Π΅Π΄Ρ‹ просто ΠΏΠΎ ΠΈΠΌΠ΅Π½ΠΈ Π½Π°ΠΌ Π½ΠΈΡ‡Ρ‚ΠΎ Π½Π΅ ΠΏΠΎΠΌΠ΅ΡˆΠ°Π΅Ρ‚.

А Π΅Ρ‰Π΅ ΠΌΠΎΠΆΠ½ΠΎ ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ нСсколько соСдинСний с ΠΎΠ΄Π½ΠΈΠΌ ΠΈΠΌΠ΅Π½Π΅ΠΌ: Π² Ρ‚Π°ΠΊΠΎΠΌ случаС ΠΌΠ΅Ρ‚ΠΎΠ΄ BaseHook.get_connection(), ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ достаСт Π½Π°ΠΌ соСдинСния ΠΏΠΎ ΠΈΠΌΠ΅Π½ΠΈ, Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΡ‚Π΄Π°Π²Π°Ρ‚ΡŒ случайного ΠΈΠ· Π½Π΅ΡΠΊΠΎΠ»ΡŒΠΊΠΈΡ… Ρ‚Ρ‘Π·ΠΎΠΊ (Π±Ρ‹Π»ΠΎ Π±Ρ‹ Π»ΠΎΠ³ΠΈΡ‡Π½Π΅Π΅ ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ Round Robin, Π½ΠΎ оставим это Π½Π° совСсти Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠΎΠ² Airflow).

Variables ΠΈ Connections, бСзусловно, классныС срСдства, Π½ΠΎ Π²Π°ΠΆΠ½ΠΎ Π½Π΅ ΠΏΠΎΡ‚Π΅Ρ€ΡΡ‚ΡŒ баланс: ΠΊΠ°ΠΊΠΈΠ΅ части Π²Π°ΡˆΠΈΡ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² Π²Ρ‹ Ρ…Ρ€Π°Π½ΠΈΡ‚Π΅ собствСнно Π² ΠΊΠΎΠ΄Π΅, Π° ΠΊΠ°ΠΊΠΈΠ΅ β€” ΠΎΡ‚Π΄Π°Π΅Ρ‚Π΅ Π½Π° Ρ…Ρ€Π°Π½Π΅Π½ΠΈΠ΅ Airflow. C ΠΎΠ΄Π½ΠΎΠΉ стороны быстро ΠΏΠΎΠΌΠ΅Π½ΡΡ‚ΡŒ Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€, ящик рассылки, ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΡƒΠ΄ΠΎΠ±Π½ΠΎ Ρ‡Π΅Ρ€Π΅Π· UI. А с Π΄Ρ€ΡƒΠ³ΠΎΠΉ β€” это всё-Ρ‚Π°ΠΊΠΈ Π²ΠΎΠ·Π²Ρ€Π°Ρ‚ ΠΊ ΠΌΡ‹ΡˆΠ΅ΠΊΠ»ΠΈΠΊΡƒ, ΠΎΡ‚ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ ΠΌΡ‹ (я) Ρ…ΠΎΡ‚Π΅Π»ΠΈ ΠΈΠ·Π±Π°Π²ΠΈΡ‚ΡŒΡΡ.

Π Π°Π±ΠΎΡ‚Π° с соСдинСниями β€” это ΠΎΠ΄Π½Π° ΠΈΠ· Π·Π°Π΄Π°Ρ‡ Ρ…ΡƒΠΊΠΎΠ². Π’ΠΎΠΎΠ±Ρ‰Π΅ Ρ…ΡƒΠΊΠΈ Airflow β€” это Ρ‚ΠΎΡ‡ΠΊΠΈ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ Π΅Π³ΠΎ ΠΊ сторонним сСрвисам ΠΈ Π±ΠΈΠ±Π»ΠΈΠΎΡ‚Π΅ΠΊΠ°ΠΌ. К ΠΏΡ€ΠΈΠΌΠ΅Ρ€Ρƒ, JiraHook ΠΎΡ‚ΠΊΡ€ΠΎΠ΅Ρ‚ для нас ΠΊΠ»ΠΈΠ΅Π½Ρ‚ для взаимодСйствия с Jira (ΠΌΠΎΠΆΠ½ΠΎ Π·Π°Π΄Π°Ρ‡ΠΊΠΈ ΠΏΠΎΠ΄Π²ΠΈΠ³Π°Ρ‚ΡŒ Ρ‚ΡƒΠ΄Π°-сюда), Π° с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ SambaHook ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡƒΡˆΠΈΡ‚ΡŒ Π»ΠΎΠΊΠ°Π»ΡŒΠ½Ρ‹ΠΉ Ρ„Π°ΠΉΠ» Π½Π° smb-Ρ‚ΠΎΡ‡ΠΊΡƒ.

Π Π°Π·Π±ΠΈΡ€Π°Π΅ΠΌ кастомный ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€

И ΠΌΡ‹ Π²ΠΏΠ»ΠΎΡ‚Π½ΡƒΡŽ ΠΏΠΎΠ΄ΠΎΠ±Ρ€Π°Π»ΠΈΡΡŒ ΠΊ Ρ‚ΠΎΠΌΡƒ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΠΎΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ Π½Π° Ρ‚ΠΎ, ΠΊΠ°ΠΊ сдСлан TelegramBotSendMessage

Код commons/operators.py с собствСнно ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€ΠΎΠΌ:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Π—Π΄Π΅ΡΡŒ, ΠΊΠ°ΠΊ ΠΈ ΠΎΡΡ‚Π°Π»ΡŒΠ½ΠΎΠ΅ Π² Airflow, всё ΠΎΡ‡Π΅Π½ΡŒ просто:

  • ΠžΡ‚Π½Π°ΡΠ»Π΅Π΄ΠΎΠ²Π°Π»ΠΈΡΡŒ ΠΎΡ‚ BaseOperator, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Ρ€Π΅Π°Π»ΠΈΠ·ΡƒΠ΅Ρ‚ довольно ΠΌΠ½ΠΎΠ³ΠΎ Airflow-спСцифичных ΡˆΡ‚ΡƒΠΊ (посмотритС Π½Π° досугС)
  • Объявили поля template_fields, Π² ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Jinja Π±ΡƒΠ΄Π΅Ρ‚ ΠΈΡΠΊΠ°Ρ‚ΡŒ макросы для ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ.
  • ΠžΡ€Π³Π°Π½ΠΈΠ·ΠΎΠ²Π°Π»ΠΈ ΠΏΡ€Π°Π²ΠΈΠ»ΡŒΠ½Ρ‹Π΅ Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Ρ‹ для __init__(), расставили умолчания, Π³Π΄Π΅ Π½Π°Π΄ΠΎ.
  • Об ΠΈΠ½ΠΈΡ†ΠΈΠ°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ ΠΏΡ€Π΅Π΄ΠΊΠ° Ρ‚ΠΎΠΆΠ΅ Π½Π΅ Π·Π°Π±Ρ‹Π»ΠΈ.
  • ΠžΡ‚ΠΊΡ€Ρ‹Π»ΠΈ ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΠΉ Ρ…ΡƒΠΊ TelegramBotHook, ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ»ΠΈ ΠΎΡ‚ Π½Π΅Π³ΠΎ ΠΎΠ±ΡŠΠ΅ΠΊΡ‚-ΠΊΠ»ΠΈΠ΅Π½Ρ‚.
  • ΠžΠ²Π΅Ρ€Ρ€Π°ΠΉΠ΄Π½ΡƒΠ»ΠΈ (ΠΏΠ΅Ρ€Π΅ΠΎΠΏΡ€Π΅Π΄Π΅Π»ΠΈΠ»ΠΈ) ΠΌΠ΅Ρ‚ΠΎΠ΄ BaseOperator.execute(), ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Airfow Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΎΠ΄Π΅Ρ€Π³ΠΈΠ²Π°Ρ‚ΡŒ, ΠΊΠΎΠ³Π΄Π° наступит врСмя Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€ β€” Π² Π½Π΅ΠΌ ΠΌΡ‹ ΠΈ Ρ€Π΅Π°Π»ΠΈΠ·ΡƒΠ΅ΠΌ основноС дСйствиС, Π½Π° Π·Π°Π±Ρ‹Π² Π·Π°Π»ΠΎΠ³ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒΡΡ. (ЛогируСмся, кстати, прямо Π² stdout ΠΈ stderr β€” Airflow всё ΠΏΠ΅Ρ€Π΅Ρ…Π²Π°Ρ‚ΠΈΡ‚, красиво ΠΎΠ±Π΅Ρ€Π½Π΅Ρ‚, Ρ€Π°Π·Π»ΠΎΠΆΠΈΡ‚, ΠΊΡƒΠ΄Π° Π½Π°Π΄ΠΎ.)

Π”Π°Π²Π°ΠΉΡ‚Π΅ ΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ, Ρ‡Ρ‚ΠΎ Ρƒ нас Π² commons/hooks.py. ΠŸΠ΅Ρ€Π²Π°Ρ Ρ‡Π°ΡΡ‚ΡŒ Ρ„Π°ΠΉΠ»ΠΈΠΊΠ°, с самим Ρ…ΡƒΠΊΠΎΠΌ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Π― Π΄Π°ΠΆΠ΅ Π½Π΅ знаю, Ρ‡Ρ‚ΠΎ Ρ‚ΡƒΡ‚ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΠ±ΡŠΡΡΠ½ΡΡ‚ΡŒ, просто ΠΎΡ‚ΠΌΠ΅Ρ‡Ρƒ Π²Π°ΠΆΠ½Ρ‹Π΅ ΠΌΠΎΠΌΠ΅Π½Ρ‚Ρ‹:

  • НаслСдуСмся, Π΄ΡƒΠΌΠ°Π΅ΠΌ Π½Π°Π΄ Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Π°ΠΌΠΈ β€” Π² Π±ΠΎΠ»ΡŒΡˆΠΈΠ½ΡΡ‚Π²Π΅ случаСв ΠΎΠ½ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠ΄ΠΈΠ½: conn_id;
  • ΠŸΠ΅Ρ€Π΅ΠΎΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ стандартныС ΠΌΠ΅Ρ‚ΠΎΠ΄Ρ‹: я ограничился get_conn(), Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ я ΠΏΠΎΠ»ΡƒΡ‡Π°ΡŽ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Ρ‹ соСдинСния ΠΏΠΎ ΠΈΠΌΠ΅Π½ΠΈ ΠΈ всСго-навсСго Π΄ΠΎΡΡ‚Π°ΡŽ ΡΠ΅ΠΊΡ†ΠΈΡŽ extra (это ΠΏΠΎΠ»Π΅ для JSON), Π² ΠΊΠΎΡ‚ΠΎΡ€ΡƒΡŽ я (ΠΏΠΎ своСй ΠΆΠ΅ инструкции!) ΠΏΠΎΠ»ΠΎΠΆΠΈΠ» Ρ‚ΠΎΠΊΠ΅Π½ Telegram-Π±ΠΎΡ‚Π°: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Боздаю экзСмпляр нашСго TelegramBot, отдавая Π΅ΠΌΡƒ ΡƒΠΆΠ΅ ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½Ρ‹ΠΉ Ρ‚ΠΎΠΊΠ΅Π½.

Π’ΠΎΡ‚ ΠΈ всё. ΠŸΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ΠΈΠ· Ρ…ΡƒΠΊΠ° ΠΌΠΎΠΆΠ½ΠΎ c ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ TelegramBotHook().clent ΠΈΠ»ΠΈ TelegramBotHook().get_conn().

И вторая Ρ‡Π°ΡΡ‚ΡŒ Ρ„Π°ΠΉΠ»ΠΈΠΊΠ°, Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ я ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ ΠΌΠΈΠΊΡ€ΠΎΠΎΠ±Ρ‘Ρ€Ρ‚ΠΎΡ‡ΠΊΡƒ для Telegram REST API, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π½Π΅ Ρ‚Π°Ρ‰ΠΈΡ‚ΡŒ Ρ‚ΠΎΡ‚ ΠΆΠ΅ python-telegram-bot Ρ€Π°Π΄ΠΈ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΌΠ΅Ρ‚ΠΎΠ΄Π° sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

ΠŸΡ€Π°Π²ΠΈΠ»ΡŒΠ½Ρ‹ΠΉ ΠΏΡƒΡ‚ΡŒ β€” ΡΠ»ΠΎΠΆΠΈΡ‚ΡŒ всё это: TelegramBotSendMessage, TelegramBotHook, TelegramBot β€” Π² ΠΏΠ»Π°Π³ΠΈΠ½, ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Π² общСдоступный Ρ€Π΅ΠΏΠΎΠ·ΠΈΡ‚ΠΎΡ€ΠΈΠΉ, ΠΈ ΠΎΡ‚Π΄Π°Ρ‚ΡŒ Π² Open Source.

Пока ΠΌΡ‹ всё это ΠΈΠ·ΡƒΡ‡Π°Π»ΠΈ, наши обновлСния ΠΎΡ‚Ρ‡Π΅Ρ‚ΠΎΠ² успСли ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ Π·Π°Π²Π°Π»ΠΈΡ‚ΡŒΡΡ ΠΈ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΡ‚ΡŒ ΠΌΠ½Π΅ Π² ΠΊΠ°Π½Π°Π» сообщСниС ΠΎΠ± ошибкС. ΠŸΠΎΠΉΠ΄Ρƒ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΡΡ‚ΡŒ, Ρ‡Ρ‚ΠΎ ΠΎΠΏΡΡ‚ΡŒ Π½Π΅ Ρ‚Π°ΠΊ…

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅
Π’ нашСм Π΄Π°Π³Π΅ Ρ‡Ρ‚ΠΎ-Ρ‚ΠΎ сломалось! А Π½ΠΈ этого Π»ΠΈ ΠΌΡ‹ ΠΆΠ΄Π°Π»ΠΈ? ИмСнно!

ΠΠ°Π»ΠΈΠ²Π°Ρ‚ΡŒ-Ρ‚ΠΎ Π±ΡƒΠ΄Π΅ΡˆΡŒ?

ЧувствуСтС, Ρ‡Ρ‚ΠΎ-Ρ‚ΠΎ я пропустил? Π’Ρ€ΠΎΠ΄Π΅ Π±Ρ‹ ΠΎΠ±Π΅Ρ‰Π°Π» Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· SQL Server Π² Vertica ΠΏΠ΅Ρ€Π΅Π»ΠΈΠ²Π°Ρ‚ΡŒ, ΠΈ Ρ‚ΡƒΡ‚ взял ΠΈ ΡΡŠΠ΅Ρ…Π°Π» с Ρ‚Π΅ΠΌΡ‹, нСгодяй!

ЗлодСяниС это Π±Ρ‹Π»ΠΎ Π½Π°ΠΌΠ΅Ρ€Π΅Π½Π½Ρ‹ΠΌ, я просто обязан Π±Ρ‹Π» Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Π°Ρ‚ΡŒ Π²Π°ΠΌ ΠΊΠΎΠ΅-ΠΊΠ°ΠΊΡƒΡŽ Ρ‚Π΅Ρ€ΠΌΠΈΠ½ΠΎΠ»ΠΎΠ³ΠΈΡŽ. Π’Π΅ΠΏΠ΅Ρ€ΡŒ ΠΌΠΎΠΆΠ½ΠΎ Π΅Ρ…Π°Ρ‚ΡŒ дальшС.

План Ρƒ нас Π±Ρ‹Π» Ρ‚Π°ΠΊΠΎΠΉ:

  1. Π‘Π΄Π΅Π»Π°Ρ‚ΡŒ Π΄Π°Π³
  2. ΠΠ°Π³Π΅Π½Π΅Ρ€ΠΈΡ‚ΡŒ таски
  3. ΠŸΠΎΡΠΌΠΎΡ‚Ρ€Π΅Ρ‚ΡŒ, ΠΊΠ°ΠΊ всё красиво
  4. ΠŸΡ€ΠΈΡΠ²Π°ΠΈΠ²Π°Ρ‚ΡŒ Π·Π°Π»ΠΈΠ²ΠΊΠ°ΠΌ Π½ΠΎΠΌΠ΅Ρ€Π° сСссий
  5. Π—Π°Π±Ρ€Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· SQL Server
  6. ΠŸΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ Π² Vertica
  7. Π‘ΠΎΠ±Ρ€Π°Ρ‚ΡŒ статистику

Π˜Ρ‚Π°ΠΊ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ всё это Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ, я сдСлал малСнькоС Π΄ΠΎΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ ΠΊ Π½Π°ΡˆΠ΅ΠΌΡƒ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Π’Π°ΠΌ ΠΌΡ‹ ΠΏΠΎΠ΄Π½ΠΈΠΌΠ°Π΅ΠΌ:

  • Vertica ΠΊΠ°ΠΊ хост dwh с самыми Π΄Π΅Ρ„ΠΎΠ»Ρ‚Π½Ρ‹ΠΌΠΈ настройками,
  • Ρ‚Ρ€ΠΈ экзСмпляра SQL Server,
  • наполняСм Π±Π°Π·Ρ‹ Π² послСдних ΠΊΠΎΠ΅-ΠΊΠ°ΠΊΠΈΠΌΠΈ Π΄Π°Π½Π½Ρ‹ΠΌΠΈ (Π½ΠΈ Π² ΠΊΠΎΠ΅ΠΌ случаС Π½Π΅ заглядывайтС Π² mssql_init.py!)

ЗапускаСм всё Π΄ΠΎΠ±Ρ€ΠΎ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Ρ‡ΡƒΡ‚ΡŒ Π±ΠΎΠ»Π΅Π΅ слоТной, Ρ‡Π΅ΠΌ Π² ΠΏΡ€ΠΎΡˆΠ»Ρ‹ΠΉ Ρ€Π°Π·, ΠΊΠΎΠΌΠ°Π½Π΄Ρ‹:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Π§Ρ‚ΠΎ Π½Π°Π³Π΅Π½Π΅Ρ€ΠΈΡ€ΠΎΠ²Π°Π» наш Ρ‡ΡƒΠ΄ΠΎΡ€Π°Π½Π΄ΠΎΠΌΠ°ΠΉΠ·Π΅Ρ€, ΠΌΠΎΠΆΠ½ΠΎ, воспользовавшись ΠΏΡƒΠ½ΠΊΡ‚ΠΎΠΌ Data Profiling/Ad Hoc Query:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅
Π“Π»Π°Π²Π½ΠΎΠ΅, Π½Π΅ ΠΏΠΎΠΊΠ°Π·Ρ‹Π²Π°Ρ‚ΡŒ это Π°Π½Π°Π»ΠΈΡ‚ΠΈΠΊΠ°ΠΌ

ΠŸΠΎΠ΄Ρ€ΠΎΠ±Π½ΠΎ ΠΎΡΡ‚Π°Π½Π°Π²Π»ΠΈΠ²Π°Ρ‚ΡŒΡΡ Π½Π° ETL-сСссиях я Π½Π΅ Π±ΡƒΠ΄Ρƒ, Ρ‚Π°ΠΌ всё Ρ‚Ρ€ΠΈΠ²ΠΈΠ°Π»ΡŒΠ½ΠΎ: Π΄Π΅Π»Π°Π΅ΠΌ Π±Π°Π·Ρƒ, Π² Π½Π΅ΠΉ Ρ‚Π°Π±Π»ΠΈΡ‡ΠΊΡƒ, ΠΎΠ±ΠΎΡ€Π°Ρ‡ΠΈΠ²Π°Π΅ΠΌ всё ΠΌΠ΅Π½Π΅Π΄ΠΆΠ΅Ρ€ΠΎΠΌ контСкста, ΠΈ Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ Π΄Π΅Π»Π°Π΅ΠΌ Ρ‚Π°ΠΊ:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Настала ΠΏΠΎΡ€Π° Π·Π°Π±Ρ€Π°Ρ‚ΡŒ наши Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· Π½Π°ΡˆΠΈΡ… ΠΏΠΎΠ»ΡƒΡ‚ΠΎΡ€Π° сотСн Ρ‚Π°Π±Π»ΠΈΡ†. Π‘Π΄Π΅Π»Π°Π΅ΠΌ это с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ ΠΎΡ‡Π΅Π½ΡŒ Π½Π΅Π·Π°Ρ‚Π΅ΠΉΠ»ΠΈΠ²Ρ‹Ρ… строчСк:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Π‘ ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Ρ…ΡƒΠΊΠ° ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠΌ ΠΈΠ· Airflow pymssql-ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚
  2. Π’ запрос подставим ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½ΠΈΠ΅ Π² Π²ΠΈΠ΄Π΅ Π΄Π°Ρ‚Ρ‹ β€” Π² Ρ„ΡƒΠ½ΠΊΡ†ΠΈΡŽ Π΅Ρ‘ подбросит ΡˆΠ°Π±Π»ΠΎΠ½ΠΈΠ·Π°Ρ‚ΠΎΡ€.
  3. Π‘ΠΊΠ°Ρ€ΠΌΠ»ΠΈΠ²Π°Π΅ΠΌ наш запрос pandas, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ достанСт для нас DataFrame β€” ΠΎΠ½ Π½Π°ΠΌ пригодится Π² дальнСйшСм.

Π― ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽ подстановку {dt} вмСсто ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Π° запроса %s Π½Π΅ ΠΏΠΎΡ‚ΠΎΠΌΡƒ, Ρ‡Ρ‚ΠΎ я Π·Π»ΠΎΠ±Π½Ρ‹ΠΉ Π‘ΡƒΡ€Π°Ρ‚ΠΈΠ½ΠΎ, Π° ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ pandas Π½Π΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΡΠΎΠ²Π»Π°Π΄Π°Ρ‚ΡŒ с pymssql ΠΈ подсовываСт послСднСму params: List, хотя Ρ‚ΠΎΡ‚ ΠΎΡ‡Π΅Π½ΡŒ Ρ…ΠΎΡ‡Π΅Ρ‚ tuple.
Π’Π°ΠΊΠΆΠ΅ ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚Π΅ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅, Ρ‡Ρ‚ΠΎ Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊ pymssql Ρ€Π΅ΡˆΠΈΠ» большС Π΅Π³ΠΎ Π½Π΅ ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΈΠ²Π°Ρ‚ΡŒ, ΠΈ самоС врСмя ΡΡŠΠ΅Ρ…Π°Ρ‚ΡŒ Π½Π° pyodbc.

ΠŸΠΎΡΠΌΠΎΡ‚Ρ€ΠΈΠΌ, Ρ‡Π΅ΠΌ Airflow нашпиговал Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Ρ‹ Π½Π°ΡˆΠΈΡ… Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΉ:

Apache Airflow: Π΄Π΅Π»Π°Π΅ΠΌ ETL ΠΏΡ€ΠΎΡ‰Π΅

Если Π΄Π°Π½Π½Ρ‹Ρ… Π½Π΅ оказалось, Ρ‚ΠΎ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠ°Ρ‚ΡŒ смысла Π½Π΅Ρ‚. Но ΡΡ‡ΠΈΡ‚Π°Ρ‚ΡŒ Π·Π°Π»ΠΈΠ²ΠΊΡƒ ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎΠΉ Ρ‚ΠΎΠΆΠ΅ странно. Но это ΠΈ Π½Π΅ ошибка. А-Π°-Π°, Ρ‡Ρ‚ΠΎ Π΄Π΅Π»Π°Ρ‚ΡŒ?! А Π²ΠΎΡ‚ Ρ‡Ρ‚ΠΎ:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException скаТСт Airflow, Ρ‡Ρ‚ΠΎ ошибки, собствСнно Π½Π΅Ρ‚, Π° таск ΠΌΡ‹ пропускаСм. Π’ интСрфСйсС Π±ΡƒΠ΄Π΅Ρ‚ Π½Π΅ Π·Π΅Π»Π΅Π½Ρ‹ΠΉ ΠΈ Π½Π΅ красный ΠΊΠ²Π°Π΄Ρ€Π°Ρ‚ΠΈΠΊ, Π° Ρ†Π²Π΅Ρ‚Π° pink.

ΠŸΠΎΠ΄Π±Ρ€ΠΎΡΠΈΠΌ нашим Π΄Π°Π½Π½Ρ‹ΠΌ нСсколько ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

А имСнно:

  • Π‘Π”, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΉ ΠΌΡ‹ Π·Π°Π±Ρ€Π°Π»ΠΈ Π·Π°ΠΊΠ°Π·Ρ‹,
  • Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ нашСй Π·Π°Π»ΠΈΠ²Π°ΡŽΡ‰Π΅ΠΉ сСссии (ΠΎΠ½Π° Π±ΡƒΠ΄Π΅Ρ‚ Ρ€Π°Π·Π½ΠΎΠΉ Π½Π° ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ таск),
  • Π₯эш ΠΎΡ‚ источника ΠΈ ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° Π·Π°ΠΊΠ°Π·Π° β€” Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π² ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎΠΉ Π±Π°Π·Π΅ (Π³Π΄Π΅ всё ссыпСтся Π² ΠΎΠ΄Π½Ρƒ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ) Ρƒ нас Π±Ρ‹Π» ΡƒΠ½ΠΈΠΊΠ°Π»ΡŒΠ½Ρ‹ΠΉ ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ Π·Π°ΠΊΠ°Π·Π°.

ΠžΡΡ‚Π°Π»ΡΡ прСдпослСдний шаг: Π·Π°Π»ΠΈΡ‚ΡŒ всё Π² Vertica. А, ΠΊΠ°ΠΊ Π½ΠΈ странно, ΠΎΠ΄ΠΈΠ½ ΠΈΠ· самых эффСктных эффСктивных способов ΡΠ΄Π΅Π»Π°Ρ‚ΡŒ это β€” Ρ‡Π΅Ρ€Π΅Π· CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ΠœΡ‹ Π΄Π΅Π»Π°Π΅ΠΌ спСцприёмник StringIO.
  2. pandas любСзно слоТит Π² Π½Π΅Π³ΠΎ наш DataFrame Π² Π²ΠΈΠ΄Π΅ CSV-строк.
  3. ΠžΡ‚ΠΊΡ€ΠΎΠ΅ΠΌ соСдинСниС ΠΊ нашСй любимой Vertica Ρ…ΡƒΠΊΠΎΠΌ.
  4. А Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ copy() ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΠΌ наши Π΄Π°Π½Π½Ρ‹Π΅ прямо Π² Π’Π΅Ρ€Ρ‚ΠΈΠΊΡƒ!

Из Π΄Ρ€Π°ΠΉΠ²Π΅Ρ€Π° Π·Π°Π±Π΅Ρ€Π΅ΠΌ, сколько строчСк Π·Π°ΡΡ‹ΠΏΠ°Π»ΠΎΡΡŒ, ΠΈ скаТСм ΠΌΠ΅Π½Π΅Π΄ΠΆΠ΅Ρ€Ρƒ сСссии, Ρ‡Ρ‚ΠΎ всё ОК:

session.loaded_rows = cursor.rowcount
session.successful = True

Π’ΠΎΡ‚ ΠΈ всё.

На ΠΏΡ€ΠΎΠ΄Π΅ ΠΌΡ‹ создаСм Ρ†Π΅Π»Π΅Π²ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ‡ΠΊΡƒ Π²Ρ€ΡƒΡ‡Π½ΡƒΡŽ. Π—Π΄Π΅ΡΡŒ ΠΆΠ΅ я ΠΏΠΎΠ·Π²ΠΎΠ»ΠΈΠ» сСбС нСбольшой Π°Π²Ρ‚ΠΎΠΌΠ°Ρ‚:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Π― с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ VerticaOperator() создаю схСму Π‘Π” ΠΈ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ (Ссли ΠΈΡ… Π΅Ρ‰Π΅ Π½Π΅Ρ‚, СстСствСнно). Π“Π»Π°Π²Π½ΠΎΠ΅, ΠΏΡ€Π°Π²ΠΈΠ»ΡŒΠ½ΠΎ Ρ€Π°ΡΡΡ‚Π°Π²ΠΈΡ‚ΡŒ зависимости:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Подводим ΠΈΡ‚ΠΎΠ³ΠΈ

β€” Ну Π²ΠΎΡ‚, β€” сказал ΠΌΡ‹ΡˆΠΎΠ½ΠΎΠΊ, β€” Π½Π΅ ΠΏΡ€Π°Π²Π΄Π° Π»ΠΈ, Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ
Π’Ρ‹ убСдился, Ρ‡Ρ‚ΠΎ Π² лСсу я самый ΡΡ‚Ρ€Π°ΡˆΠ½Ρ‹ΠΉ Π·Π²Π΅Ρ€ΡŒ?

ДТулия Π”ΠΎΠ½Π°Π»ΡŒΠ΄ΡΠΎΠ½, Β«Π“Ρ€ΡƒΡ„Ρ„Π°Π»ΠΎΒ»

Π”ΡƒΠΌΠ°ΡŽ, Ссли Π±Ρ‹ ΠΌΡ‹ с ΠΌΠΎΠΈΠΌΠΈ ΠΊΠΎΠ»Π»Π΅Π³Π°ΠΌΠΈ устроили сорСвнованиС: ΠΊΡ‚ΠΎ быстрСС составит ΠΈ запустит с нуля ETL-процСсс: ΠΎΠ½ΠΈ со своими SSIS ΠΈ ΠΌΡ‹ΡˆΠΊΠΎΠΉ ΠΈ я с Airflow… А ΠΏΠΎΡ‚ΠΎΠΌ Π±Ρ‹ ΠΌΡ‹ Π΅Ρ‰Π΅ сравнили удобство сопровоТдСния… Π£Ρ…, Π΄ΡƒΠΌΠ°ΡŽ, Π²Ρ‹ ΡΠΎΠ³Π»Π°ΡΠΈΡ‚Π΅ΡΡŒ, Ρ‡Ρ‚ΠΎ я ΠΎΠ±ΠΎΠΉΠ΄Ρƒ ΠΈΡ… ΠΏΠΎ всСм Ρ„Ρ€ΠΎΠ½Ρ‚Π°ΠΌ!

Если ΠΆΠ΅ Ρ‡ΡƒΡ‚ΡŒ-Ρ‡ΡƒΡ‚ΡŒ ΠΏΠΎΡΠ΅Ρ€ΡŒΠ΅Π·Π½Π΅Π΅, Ρ‚ΠΎ Apache Airflow β€” Π·Π° счСт описания процСссов Π² Π²ΠΈΠ΄Π΅ ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΠ½ΠΎΠ³ΠΎ ΠΊΠΎΠ΄Π° β€” сдСлал мою Ρ€Π°Π±ΠΎΡ‚Ρƒ Π³ΠΎΡ€Π°Π·Π΄ΠΎ ΡƒΠ΄ΠΎΠ±Π½Π΅Π΅ ΠΈ приятнСС.

Π•Π³ΠΎ ΠΆΠ΅ нСограничСнная Ρ€Π°ΡΡˆΠΈΡ€ΡΠ΅ΠΌΠΎΡΡ‚ΡŒ: ΠΊΠ°ΠΊ Π² ΠΏΠ»Π°Π½Π΅ ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ², Ρ‚Π°ΠΊ ΠΈ ΠΏΡ€Π΅Π΄Ρ€Π°ΡΠΏΠΎΠ»ΠΎΠΆΠ΅Π½Π½ΠΎΡΡ‚ΡŒ ΠΊ ΠΌΠ°ΡΡˆΡ‚Π°Π±ΠΈΡ€ΡƒΠ΅ΠΌΠΎΡΡ‚ΠΈ β€” Π΄Π°Ρ‘Ρ‚ Π²Π°ΠΌ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ ΠΏΡ€ΠΈΠΌΠ΅Π½ΡΡ‚ΡŒ Airflow практичСски Π² любой области: Ρ…ΠΎΡ‚ΡŒ Π² ΠΏΠΎΠ»Π½ΠΎΠΌ Ρ†ΠΈΠΊΠ»Π΅ сбора, ΠΏΠΎΠ΄Π³ΠΎΡ‚ΠΎΠ²ΠΊΠΈ ΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ Π΄Π°Π½Π½Ρ‹Ρ…, Ρ…ΠΎΡ‚ΡŒ Π² запускС Ρ€Π°ΠΊΠ΅Ρ‚ (Π½Π° ΠœΠ°Ρ€Ρ, ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎ ΠΆΠ΅).

Π§Π°ΡΡ‚ΡŒ Π·Π°ΠΊΠ»ΡŽΡ‡ΠΈΡ‚Π΅Π»ΡŒΠ½Π°Ρ, справочно-информационная

Π“Ρ€Π°Π±Π»ΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΌΡ‹ собрали Π·Π° вас

  • start_date. Π”Π°, это ΡƒΠΆΠ΅ Π»ΠΎΠΊΠ°Π»ΡŒΠ½Ρ‹ΠΉ мСмасик. Π§Π΅Ρ€Π΅Π· Π³Π»Π°Π²Π½Ρ‹ΠΉ Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚ Π΄Π°Π³Π° start_date проходят всС. ΠšΡ€Π°Ρ‚ΠΊΠΎ, Ссли ΡƒΠΊΠ°Π·Π°Ρ‚ΡŒ Π² start_date Ρ‚Π΅ΠΊΡƒΡ‰ΡƒΡŽ Π΄Π°Ρ‚Ρƒ, Π° Π² schedule_interval β€” ΠΎΠ΄ΠΈΠ½ дСнь, Ρ‚ΠΎ DAG запустится Π·Π°Π²Ρ‚Ρ€Π° Π½Π΅ Ρ€Π°Π½ΡŒΡˆΠ΅.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    И большС Π½ΠΈΠΊΠ°ΠΊΠΈΡ… ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌ.

    Π‘ Π½ΠΈΠΌ ΠΆΠ΅ связана ΠΈ Π΅Ρ‰Π΅ ΠΎΠ΄Π½Π° ошибка выполнСния: Task is missing the start_date parameter, которая Ρ‡Π°Ρ‰Π΅ всСго Π³ΠΎΠ²ΠΎΡ€ΠΈΡ‚ ΠΎ Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ Π²Ρ‹ Π·Π°Π±Ρ‹Π»ΠΈ ΠΏΡ€ΠΈΠ²ΡΠ·Π°Ρ‚ΡŒ ΠΊ ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Ρƒ Π΄Π°Π³.

  • Всё Π½Π° ΠΎΠ΄Π½ΠΎΠΉ машинС. Π”Π°, ΠΈ Π±Π°Π·Ρ‹ (самого Airflow ΠΈ нашСй ΠΎΠ±ΠΌΠ°Π·ΠΊΠΈ), ΠΈ Π²Π΅Π±-сСрвСр, ΠΈ ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊ, ΠΈ Π²ΠΎΡ€ΠΊΠ΅Ρ€Ρ‹. И ΠΎΠ½ΠΎ Π΄Π°ΠΆΠ΅ Ρ€Π°Π±ΠΎΡ‚Π°Π»ΠΎ. Но со Π²Ρ€Π΅ΠΌΠ΅Π½Π΅ΠΌ количСство Π·Π°Π΄Π°Ρ‡ Ρƒ сСрвисов росло, ΠΈ ΠΊΠΎΠ³Π΄Π° PostgreSQL стал ΠΎΡ‚Π΄Π°Π²Π°Ρ‚ΡŒ ΠΎΡ‚Π²Π΅Ρ‚ ΠΏΠΎ индСксу Π·Π° 20 с вмСсто 5 мс, ΠΌΡ‹ Π΅Π³ΠΎ взяли ΠΈ унСсли.
  • LocalExecutor. Π”Π°, ΠΌΡ‹ сидим Π½Π° Π½Ρ‘ΠΌ Π΄ΠΎ сих ΠΏΠΎΡ€, ΠΈ ΠΌΡ‹ ΡƒΠΆΠ΅ подошли ΠΊ ΠΊΡ€Π°ΡŽ пропасти. LocalExecutor’а Π½Π°ΠΌ Π΄ΠΎ сих ΠΏΠΎΡ€ Ρ…Π²Π°Ρ‚Π°Π»ΠΎ, Π½ΠΎ сСйчас ΠΏΡ€ΠΈΡˆΠ»Π° ΠΏΠΎΡ€Π° Ρ€Π°ΡΡˆΠΈΡ€ΠΈΡ‚ΡŒΡΡ ΠΌΠΈΠ½ΠΈΠΌΡƒΠΌ ΠΎΠ΄Π½ΠΈΠΌ Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠΌ, ΠΈ придСтся ΠΏΠΎΠ΄Π½Π°ΠΏΡ€ΡΡ‡ΡŒΡΡ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΠ΅Ρ€Π΅Π΅Ρ…Π°Ρ‚ΡŒ Π½Π° CeleryExecutor. А Π²Π²ΠΈΠ΄Ρƒ Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎ с Π½ΠΈΠΌ ΠΌΠΎΠΆΠ½ΠΎ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ ΠΈ Π½Π° ΠΎΠ΄Π½ΠΎΠΉ машиной, Ρ‚ΠΎ Π½ΠΈΡ‡Π΅Π³ΠΎ Π½Π΅ останавливаСт ΠΎΡ‚ использования Celery Π΄Π°ΠΆΠ΅ Π½Π΅ сСрвСрС, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ «СстСствСнно, Π½ΠΈΠΊΠΎΠ³Π΄Π° Π½Π΅ ΠΏΠΎΠΉΠ΄Π΅Ρ‚ Π² ΠΏΡ€ΠΎΠ΄, чСсслово!Β»
  • НСиспользованиС встроСнных срСдств:
    • Connections для хранСния ΡƒΡ‡Π΅Ρ‚Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… сСрвисов,
    • SLA Misses для рСагирования Π½Π° таски, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π½Π΅ ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π»ΠΈ воврСмя,
    • XCom для ΠΎΠ±ΠΌΠ΅Π½Π° ΠΌΠ΅Ρ‚Π°Π΄Π°Π½Π½Ρ‹ΠΌΠΈ (я сказал ΠΌΠ΅Ρ‚Π°Π΄Π°Π½Π½Ρ‹ΠΌΠΈ!) ΠΌΠ΅ΠΆΠ΄Ρƒ тасками Π΄Π°Π³Π°.
  • Π—Π»ΠΎΡƒΠΏΠΎΡ‚Ρ€Π΅Π±Π»Π΅Π½ΠΈΠ΅ ΠΏΠΎΡ‡Ρ‚ΠΎΠΉ. Ну Ρ‡Ρ‚ΠΎ Ρ‚ΡƒΡ‚ ΡΠΊΠ°Π·Π°Ρ‚ΡŒ? Π‘Ρ‹Π»ΠΈ настроСны оповСщСния Π½Π° всС ΠΏΠΎΠ²Ρ‚ΠΎΡ€Ρ‹ ΡƒΠΏΠ°Π²ΡˆΠΈΡ… тасков. Π’Π΅ΠΏΠ΅Ρ€ΡŒ Π² ΠΌΠΎΡ‘ΠΌ Ρ€Π°Π±ΠΎΡ‡Π΅ΠΌ Gmail >90k писСм ΠΎΡ‚ Airflow, ΠΈ Π²Π΅Π±-ΠΌΠΎΡ€Π΄Π° ΠΏΠΎΡ‡Ρ‚Ρ‹ отказываСтся Π±Ρ€Π°Ρ‚ΡŒ ΠΈ ΡƒΠ΄Π°Π»ΡΡ‚ΡŒ большС Ρ‡Π΅ΠΌ ΠΏΠΎ 100 ΡˆΡ‚ΡƒΠΊ Π·Π° Ρ€Π°Π·.

Π‘ΠΎΠ»ΡŒΡˆΠ΅ ΠΏΠΎΠ΄Π²ΠΎΠ΄Π½Ρ‹Ρ… ΠΊΠ°ΠΌΠ½Π΅ΠΉ: Apache Airflow Pitfails

БрСдства Π΅Ρ‰Ρ‘ большСй Π°Π²Ρ‚ΠΎΠΌΠ°Ρ‚ΠΈΠ·Π°Ρ†ΠΈΠΈ

Для Ρ‚ΠΎΠ³ΠΎ Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π½Π°ΠΌ Π΅Ρ‰Π΅ большС Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ Π³ΠΎΠ»ΠΎΠ²ΠΎΠΉ, Π° Π½Π΅ Ρ€ΡƒΠΊΠ°ΠΌΠΈ, Airflow Π·Π°Π³ΠΎΡ‚ΠΎΠ²ΠΈΠ»Π° для нас Π²ΠΎΡ‚ Ρ‡Ρ‚ΠΎ:

  • REST API β€” ΠΎΠ½ Π΄ΠΎ сих ΠΏΠΎΡ€ ΠΈΠΌΠ΅Π΅Ρ‚ статус Experimental, Ρ‡Ρ‚ΠΎ Π½Π΅ ΠΌΠ΅ΡˆΠ°Π΅Ρ‚ Π΅ΠΌΡƒ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ. Π‘ Π΅Π³ΠΎ ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ ΠΌΠΎΠΆΠ½ΠΎ Π½Π΅ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΏΠΎΠ»ΡƒΡ‡Π°Ρ‚ΡŒ ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ ΠΎ Π΄Π°Π³Π°Ρ… ΠΈ тасках, Π½ΠΎ ΠΎΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ/Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ Π΄Π°Π³, ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ DAG Run ΠΈΠ»ΠΈ ΠΏΡƒΠ».
  • CLI β€” Ρ‡Π΅Ρ€Π΅Π· ΠΊΠΎΠΌΠ°Π½Π΄Π½ΡƒΡŽ строку доступны ΠΌΠ½ΠΎΠ³ΠΈΠ΅ срСдства, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π½Π΅ просто Π½Π΅ΡƒΠ΄ΠΎΠ±Π½Ρ‹ Π² ΠΎΠ±Ρ€Π°Ρ‰Π΅Π½ΠΈΠΈ Ρ‡Π΅Ρ€Π΅Π· WebUI, Π° Π²ΠΎΠΎΠ±Ρ‰Π΅ ΠΎΡ‚ΡΡƒΡ‚ΡΡ‚Π²ΡƒΡŽΡ‚. НапримСр:
    • backfill Π½ΡƒΠΆΠ΅Π½ для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠ³ΠΎ запуска инстансов тасков.
      НапримСр, ΠΏΡ€ΠΈΡˆΠ»ΠΈ Π°Π½Π°Π»ΠΈΡ‚ΠΈΠΊΠΈ, говорят: «А Ρƒ вас, Ρ‚ΠΎΠ²Π°Ρ€ΠΈΡ‰, Π΅Ρ€ΡƒΠ½Π΄Π° Π² Π΄Π°Π½Π½Ρ‹Ρ… с 1 ΠΏΠΎ 13 января! Π§ΠΈΠ½ΠΈ-Ρ‡ΠΈΠ½ΠΈ-Ρ‡ΠΈΠ½ΠΈ-Ρ‡ΠΈΠ½ΠΈ!Β». А Ρ‚Ρ‹ Ρ‚Π°ΠΊΠΎΠΉ Ρ…ΠΎΠ±Π°:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • ΠžΠ±ΡΠ»ΡƒΠΆΠΈΠ²Π°Π½ΠΈΠ΅ Π±Π°Π·Ρ‹: initdb, resetdb, upgradedb, checkdb.
    • run, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ позволяСт Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ ΠΎΠ΄ΠΈΠ½ инстанс таска, Π΄Π° Π΅Ρ‰Π΅ ΠΈ Π·Π°Π±ΠΈΡ‚ΡŒ Π½Π° всё зависимости. Π‘ΠΎΠ»Π΅Π΅ Ρ‚ΠΎΠ³ΠΎ, ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ Π΅Π³ΠΎ Ρ‡Π΅Ρ€Π΅Π· LocalExecutor, Π΄Π°ΠΆΠ΅ Ссли Ρƒ вас Celery-кластСр.
    • ΠŸΡ€ΠΈΠΌΠ΅Ρ€Π½ΠΎ Ρ‚ΠΎ ΠΆΠ΅ самоС Π΄Π΅Π»Π°Π΅Ρ‚ test, Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Π΅Ρ‰Π΅ ΠΈ Π² Π±Π°Π· Π½ΠΈΡ‡Π΅Π³ΠΎ Π½Π΅ ΠΏΠΈΡˆΠ΅Ρ‚.
    • connections позволяСт массово ΡΠΎΠ·Π΄Π°Π²Π°Ρ‚ΡŒ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΈΠ· шСлла.
  • Python API β€” довольно Ρ…Π°Ρ€Π΄ΠΊΠΎΡ€Π½Ρ‹ΠΉ способ взаимодСйствия, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΠΏΡ€Π΅Π΄Π½Π°Π·Π½Π°Ρ‡Π΅Π½ для ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ², Π° Π½Π΅ копошСния Π² Π½Ρ‘ΠΌ Ρ€ΡƒΡ‡Ρ‘Π½ΠΊΠ°ΠΌΠΈ. Но ΠΊΡ‚ΠΎ ΠΆ Π½Π°ΠΌ ΠΏΠΎΠΌΠ΅ΡˆΠ°Π΅Ρ‚ ΠΏΠΎΠΉΡ‚ΠΈ Π² /home/airflow/dags, Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ ipython ΠΈ Π½Π°Ρ‡Π°Ρ‚ΡŒ Π±Π΅ΡΠΏΡ€Π΅Π΄Π΅Π»ΡŒΠ½ΠΈΡ‡Π°Ρ‚ΡŒ? МоТно, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€, ΡΠΊΡΠΏΠΎΡ€Ρ‚ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ всС ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ Ρ‚Π°ΠΊΠΎΠΌ ΠΊΠΎΠ΄ΠΎΠΌ:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ ΠΊ Π±Π°Π·Π΅ ΠΌΠ΅Ρ‚Π°Π΄Π°Π½Π½Ρ‹Ρ… Airflow. ΠŸΠΈΡΠ°Ρ‚ΡŒ Π² Π½Π΅Ρ‘ я Π½Π΅ Ρ€Π΅ΠΊΠΎΠΌΠ΅Π½Π΄ΡƒΡŽ, Π° Π²ΠΎΡ‚ Π΄ΠΎΡΡ‚Π°Π²Π°Ρ‚ΡŒ состояния тасков для Ρ€Π°Π·Π»ΠΈΡ‡Π½Ρ‹Ρ… спСцифичСских ΠΌΠ΅Ρ‚Ρ€ΠΈΠΊ ΠΌΠΎΠΆΠ½ΠΎ Π·Π½Π°Ρ‡ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎ быстрСС ΠΈ ΠΏΡ€ΠΎΡ‰Π΅, Ρ‡Π΅ΠΌ Ρ‡Π΅Ρ€Π΅Π· любой ΠΈΠ· API.

    Π‘ΠΊΠ°ΠΆΠ΅ΠΌ, Π΄Π°Π»Π΅ΠΊΠΎ Π½Π΅ всС наши таски ΠΈΠ΄Π΅ΠΌΠΏΠΎΡ‚Π΅Π½Ρ‚Π½Ρ‹, Π° ΠΌΠΎΠ³ΡƒΡ‚ ΠΈΠ½ΠΎΠ³Π΄Π° ΠΏΠ°Π΄Π°Ρ‚ΡŒ ΠΈ это Π½ΠΎΡ€ΠΌΠ°Π»ΡŒΠ½ΠΎ. Но нСсколько Π·Π°Π²Π°Π»ΠΎΠ² β€” это ΡƒΠΆΠ΅ ΠΏΠΎΠ΄ΠΎΠ·Ρ€ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎ, ΠΈ Π½Π°Π΄ΠΎ Π±Ρ‹ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΈΡ‚ΡŒ.

    ΠžΡΡ‚ΠΎΡ€ΠΎΠΆΠ½ΠΎ, SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Бсылки

Ну ΠΈ СстСствСнно ΠΏΠ΅Ρ€Π²Ρ‹Π΅ Π΄Π΅ΡΡΡ‚ΡŒ ссылок ΠΈΠ· Π²Ρ‹Π΄Π°Ρ‡ΠΈ Π³ΡƒΠ³Π»Π° содСрТимоС ΠΏΠ°ΠΏΠΊΠΈ Airflow ΠΈΠ· ΠΌΠΎΠΈΡ… Π·Π°ΠΊΠ»Π°Π΄ΠΎΠΊ.

  • Apache Airflow Documentation β€” ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎ, Π½Π°Π΄ΠΎ Π½Π°Ρ‡Π°Ρ‚ΡŒ с ΠΎΡ„. Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΠΈ, Π½ΠΎ ΠΊΡ‚ΠΎ ΠΆΠ΅ Ρ‡ΠΈΡ‚Π°Π΅Ρ‚ инструкции?
  • Best Practices β€” Π½Ρƒ хотя Π±Ρ‹ Ρ€Π΅ΠΊΠΎΠΌΠ΅Π½Π΄Π°Ρ†ΠΈΠΈ ΠΎΡ‚ создатСлСй ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°ΠΉΡ‚Π΅.
  • The Airflow UI β€” самоС Π½Π°Ρ‡Π°Π»ΠΎ: ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»ΡŒΡΠΊΠΈΠΉ интСрфСйс Π² ΠΊΠ°Ρ€Ρ‚ΠΈΠ½ΠΊΠ°Ρ…
  • Understanding Apache Airflow’s key concepts β€” Ρ…ΠΎΡ€ΠΎΡˆΠΎ расписаны Π±Π°Π·ΠΎΠ²Ρ‹Π΅ понятия, Ссли (Π²Π΄Ρ€ΡƒΠ³!) Π²Ρ‹ Ρ‡Ρ‚ΠΎ-Ρ‚ΠΎ Π½Π΅ поняли Ρƒ мСня.
  • Tianlong’s Blog β€” A Guide On How To Build An Airflow Server/Cluster β€” ΠΊΡ€Π°Ρ‚ΠΊΠΈΠΉ Π³Π°ΠΉΠ΄ ΠΏΠΎ настройкС Airflow-кластСра.
  • Running Apache Airflow At Lyft β€” ΠΏΠΎΡ‡Ρ‚ΠΈ такая ΠΆΠ΅ интСрСсная ΡΡ‚Π°Ρ‚ΡŒΡ, Ρ€Π°Π·Π²Π΅ Ρ‡Ρ‚ΠΎ Ρ„ΠΎΡ€ΠΌΠ°Π»ΠΈΠ·ΠΌΠ° побольшС, Π° ΠΏΡ€ΠΈΠΌΠ΅Ρ€ΠΎΠ² помСньшС.
  • How Apache Airflow Distributes Jobs on Celery workers β€” ΠΎ Ρ€Π°Π±ΠΎΡ‚Π΅ Π² связкС с Celery.
  • DAG Writing Best Practices in Apache Airflow β€” ΠΏΡ€ΠΎ ΠΈΠ΄Π΅ΠΌΠΏΠΎΡ‚Π΅Π½Ρ‚Π½ΠΎΡΡ‚ΡŒ тасков, Π·Π°Π³Ρ€ΡƒΠ·ΠΊΡƒ ΠΏΠΎ ID вмСсто Π΄Π°Ρ‚Ρ‹, трансформации, структуру Ρ„Π°ΠΉΠ»ΠΎΠ² ΠΈ ΠΏΡ€ΠΎΡ‡ΠΈΠ΅ интСрСсныС Π²Π΅Ρ‰ΠΈ.
  • Managing Dependencies in Apache Airflow β€” зависимости тасков ΠΈ Trigger Rule, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ я упомянул лишь вскользь.
  • Airflow: When Your DAG is Far Behind The Schedule β€” ΠΊΠ°ΠΊ ΠΏΡ€Π΅ΠΎΠ΄ΠΎΠ»Π΅Π²Π°Ρ‚ΡŒ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Β«Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚, ΠΊΠ°ΠΊ Π·Π°Π΄ΡƒΠΌΠ°Π½ΠΎΒ» Ρƒ ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊΠ°, Π·Π°Π³Ρ€ΡƒΠΆΠ°Ρ‚ΡŒ потСрянныС Π΄Π°Π½Π½Ρ‹Π΅ ΠΈ Ρ€Π°ΡΡΡ‚Π°Π²Π»ΡΡ‚ΡŒ ΠΏΡ€ΠΈΠΎΡ€ΠΈΡ‚Π΅Ρ‚Ρ‹ тасков.
  • Useful SQL queries for Apache Airflow β€” ΠΏΠΎΠ»Π΅Π·Π½Ρ‹Π΅ SQL-запросы ΠΊ ΠΌΠ΅Ρ‚Π°Π΄Π°Π½Π½Ρ‹ΠΌ Airflow.
  • Get started developing workflows with Apache Airflow β€” Π΅ΡΡ‚ΡŒ ΠΏΠΎΠ»Π΅Π·Π½Ρ‹ΠΉ Ρ€Π°Π·Π΄Π΅Π» ΠΏΡ€ΠΎ созданиС кастомного сСнсора.
  • Building the Fetchr Data Science Infra on AWS with Presto and Airflow β€” интСрСсная короткая Π·Π°ΠΌΠ΅Ρ‚ΠΊΠ° ΠΎ построСнии инфраструктуры Π½Π° AWS для Data Science.
  • 7 Common Errors to Check when Debugging Airflow DAGs β€” распространСнныС ошибки (ΠΊΠΎΠ³Π΄Π° ΠΊΠΎΠ΅-ΠΊΡ‚ΠΎ всё-Ρ‚Π°ΠΊΠΈ Π½Π΅ Ρ‡ΠΈΡ‚Π°Π΅Ρ‚ инструкций).
  • Store and access password using Apache Airflow β€” ΡƒΠ»Ρ‹Π±Π½ΠΈΡ‚Π΅ΡΡŒ, ΠΊΠ°ΠΊ люди костылят Ρ…Ρ€Π°Π½Π΅Π½ΠΈΠ΅ ΠΏΠ°Ρ€ΠΎΠ»Π΅ΠΉ, хотя ΠΌΠΎΠΆΠ½ΠΎ просто ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ Connections.
  • The Zen of Python and Apache Airflow β€” нСявный проброс DAG, заброс контСкста Π² Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΈ, снова ΠΏΡ€ΠΎ зависимости, Π° Π΅Ρ‰Π΅ ΠΏΡ€ΠΎ пропуск запусков тасков.
  • Airflow: Lesser Known Tips, Tricks, and Best Practises β€” ΠΎΠ± использовании default arguments ΠΈ params Π² ΡˆΠ°Π±Π»ΠΎΠ½Π°Ρ…, Π° Ρ‚Π°ΠΊΠΆΠ΅ ΠΎ Variables ΠΈ Connections.
  • Profiling the Airflow Scheduler β€” рассказ ΠΎ Ρ‚ΠΎΠΌ, ΠΊΠ°ΠΊ ΠΏΠ»Π°Π½ΠΈΡ€ΠΎΠ²Ρ‰ΠΈΠΊ готовят ΠΊ Airflow 2.0.
  • Apache Airflow with 3 Celery workers in docker-compose β€” Π½Π΅ΠΌΠ½ΠΎΠΆΠΊΠΎ ΡƒΡΡ‚Π°Ρ€Π΅Π²ΡˆΠ°Ρ ΡΡ‚Π°Ρ‚ΡŒΡ ΠΏΡ€ΠΎ Π΄Π΅ΠΏΠ»ΠΎΠΉ нашСго кластСра Π² docker-compose.
  • 4 Templating Tasks Using the Airflow Context β€” динамичСскиС таск с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ шаблонов ΠΈ проброса контСкста.
  • Error Notifications in Airflow β€” стандартныС ΠΈ кастомныС оповСщСния ΠΏΠΎΡ‡Ρ‚ΠΎΠΉ ΠΈ Slack.
  • Airflow Workshop: слоТныС DAG’и Π±Π΅Π· костылСй β€” ВСтвлСния тасков, макросы ΠΈ XCom.

И ссылки, задСйствованныС Π² ΡΡ‚Π°Ρ‚ΡŒΠ΅:

Π˜ΡΡ‚ΠΎΡ‡Π½ΠΈΠΊ: habr.com