ETL процСс Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΈΠΌΠ΅ΠΉΠ» Π² Apache Airflow

ETL процСс Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΈΠΌΠ΅ΠΉΠ» Π² Apache Airflow

ΠšΠΎΠ»ΠΊΠΎΡ‚ΠΎ ΠΈ Π΄Π° сС Ρ€Π°Π·Π²ΠΈΠ²Π°Ρ‚ Ρ‚Π΅Ρ…Π½ΠΎΠ»ΠΎΠ³ΠΈΠΈΡ‚Π΅, ΠΏΠΎΡ€Π΅Π΄ΠΈΡ†Π° ΠΎΡ‚ остарСли ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ΠΈ Π²ΠΈΠ½Π°Π³ΠΈ изостават ΠΎΡ‚ Ρ€Π°Π·Π²ΠΈΡ‚ΠΈΠ΅Ρ‚ΠΎ. Π’ΠΎΠ²Π° ΠΌΠΎΠΆΠ΅ Π΄Π° сС дълТи Π½Π° ΠΏΠ»Π°Π²Π΅Π½ ΠΏΡ€Π΅Ρ…ΠΎΠ΄, Ρ‡ΠΎΠ²Π΅ΡˆΠΊΠΈ Ρ„Π°ΠΊΡ‚ΠΎΡ€, Ρ‚Π΅Ρ…Π½ΠΎΠ»ΠΎΠ³ΠΈΡ‡Π½ΠΈ Π½ΡƒΠΆΠ΄ΠΈ ΠΈΠ»ΠΈ Π½Π΅Ρ‰ΠΎ Π΄Ρ€ΡƒΠ³ΠΎ. Π’ областта Π½Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ°Ρ‚Π° Π½Π° Π΄Π°Π½Π½ΠΈ ΠΈΠ·Ρ‚ΠΎΡ‡Π½ΠΈΡ†ΠΈΡ‚Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ са Π½Π°ΠΉ-ΠΏΠΎΠΊΠ°Π·Π°Ρ‚Π΅Π»Π½ΠΈ Π² Ρ‚Π°Π·ΠΈ част. ΠšΠΎΠ»ΠΊΠΎΡ‚ΠΎ ΠΈ Π΄Π° ΠΌΠ΅Ρ‡Ρ‚Π°Π΅ΠΌ Π΄Π° сС ΠΎΡ‚ΡŠΡ€Π²Π΅ΠΌ ΠΎΡ‚ Ρ‚ΠΎΠ²Π°, Π½ΠΎ засСга част ΠΎΡ‚ Π΄Π°Π½Π½ΠΈΡ‚Π΅ сС ΠΈΠ·ΠΏΡ€Π°Ρ‰Π°Ρ‚ Π² ΠΌΠ΅ΡΠΈΠ½Π΄ΠΆΡŠΡ€ΠΈ ΠΈ ΠΈΠΌΠ΅ΠΉΠ»ΠΈ, Π΄Π° Π½Π΅ Π³ΠΎΠ²ΠΎΡ€ΠΈΠΌ Π·Π° ΠΏΠΎ-Π°Ρ€Ρ…Π°ΠΈΡ‡Π½ΠΈ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ΠΈ. Каня Π²ΠΈ Π΄Π° Ρ€Π°Π·Π³Π»ΠΎΠ±ΠΈΡ‚Π΅ Π΅Π΄Π½Π° ΠΎΡ‚ ΠΎΠΏΡ†ΠΈΠΈΡ‚Π΅ Π·Π° Apache Airflow, ΠΈΠ»ΡŽΡΡ‚Ρ€ΠΈΡ€Π°ΠΉΠΊΠΈ ΠΊΠ°ΠΊ ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° Π²Π·Π΅ΠΌΠ΅Ρ‚Π΅ Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΈΠΌΠ΅ΠΉΠ»ΠΈ.

праистория

Много Π΄Π°Π½Π½ΠΈ всС ΠΎΡ‰Π΅ сС ΠΏΡ€Π΅Ρ…Π²ΡŠΡ€Π»ΡΡ‚ Ρ‡Ρ€Π΅Π· Π΅Π»Π΅ΠΊΡ‚Ρ€ΠΎΠ½Π½Π° ΠΏΠΎΡ‰Π°, ΠΎΡ‚ мСТдуличностни ΠΊΠΎΠΌΡƒΠ½ΠΈΠΊΠ°Ρ†ΠΈΠΈ Π΄ΠΎ стандарти Π·Π° взаимодСйствиС ΠΌΠ΅ΠΆΠ΄Ρƒ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΠΈΡ‚Π΅. Π”ΠΎΠ±Ρ€Π΅ Π΅, Π°ΠΊΠΎ Π΅ възмоТно Π΄Π° Π½Π°ΠΏΠΈΡˆΠ΅Ρ‚Π΅ интСрфСйс Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΈΠ»ΠΈ Π΄Π° поставитС Ρ…ΠΎΡ€Π° Π² офиса, ΠΊΠΎΠΈΡ‚ΠΎ Ρ‰Π΅ Π²ΡŠΠ²Π΅ΠΆΠ΄Π°Ρ‚ Ρ‚Π°Π·ΠΈ информация Π² ΠΏΠΎ-ΡƒΠ΄ΠΎΠ±Π½ΠΈ ΠΈΠ·Ρ‚ΠΎΡ‡Π½ΠΈΡ†ΠΈ, Π½ΠΎ чСсто Ρ‚ΠΎΠ²Π° ΠΌΠΎΠΆΠ΅ просто Π΄Π° Π½Π΅ Π΅ възмоТно. ΠšΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½Π°Ρ‚Π° Π·Π°Π΄Π°Ρ‡Π°, с която сС ΡΠ±Π»ΡŠΡΠΊΠ°Ρ…, бСшС ΡΠ²ΡŠΡ€Π·Π²Π°Π½Π΅Ρ‚ΠΎ Π½Π° прословутата CRM систСма към Ρ…Ρ€Π°Π½ΠΈΠ»ΠΈΡ‰Π΅Ρ‚ΠΎ Π½Π° Π΄Π°Π½Π½ΠΈ, Π° слСд Ρ‚ΠΎΠ²Π° ΠΈ към OLAP систСмата. Π˜ΡΡ‚ΠΎΡ€ΠΈΡ‡Π΅ΡΠΊΠΈ сС случи Ρ‚Π°ΠΊΠ°, Ρ‡Π΅ Π·Π° Π½Π°ΡˆΠ°Ρ‚Π° компания ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°Π½Π΅Ρ‚ΠΎ Π½Π° Ρ‚Π°Π·ΠΈ систСма бСшС ΡƒΠ΄ΠΎΠ±Π½ΠΎ Π² ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½Π° област Π½Π° бизнСса. Π‘Π»Π΅Π΄ΠΎΠ²Π°Ρ‚Π΅Π»Π½ΠΎ всички наистина искаха Π΄Π° ΠΌΠΎΠ³Π°Ρ‚ Π΄Π° работят ΠΈ с Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ Ρ‚Π°Π·ΠΈ систСма Π½Π° Ρ‚Ρ€Π΅Ρ‚Π° страна. На ΠΏΡŠΡ€Π²ΠΎ място, Ρ€Π°Π·Π±ΠΈΡ€Π° сС, бСшС ΠΏΡ€ΠΎΡƒΡ‡Π΅Π½Π° Π²ΡŠΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚Ρ‚Π° Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΎΡ‚Π²ΠΎΡ€Π΅Π½ API. Π—Π° съТалСниС API Π½Π΅ ΠΏΠΎΠΊΡ€ΠΈΠ²Π°ΡˆΠ΅ ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅Ρ‚ΠΎ Π½Π° всички Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΈ Π΄Π°Π½Π½ΠΈ ΠΈ, с прости Π΄ΡƒΠΌΠΈ, бСшС Π² ΠΌΠ½ΠΎΠ³ΠΎ ΠΎΡ‚Π½ΠΎΡˆΠ΅Π½ΠΈΡ ΠΊΡ€ΠΈΠ² ΠΈ тСхничСската ΠΏΠΎΠ΄Π΄Ρ€ΡŠΠΆΠΊΠ° Π½Π΅ искашС ΠΈΠ»ΠΈ Π½Π΅ моТСшС Π΄Π° сС срСщнС Π½Π°ΠΏΠΎΠ»ΠΎΠ²ΠΈΠ½Π°, Π·Π° Π΄Π° прСдостави ΠΏΠΎ-ΠΈΠ·Ρ‡Π΅Ρ€ΠΏΠ°Ρ‚Π΅Π»Π½Π° функционалност. Но Ρ‚Π°Π·ΠΈ систСма прСдостави Π²ΡŠΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ Π·Π° ΠΏΠ΅Ρ€ΠΈΠΎΠ΄ΠΈΡ‡Π½ΠΎ ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° липсващитС Π΄Π°Π½Π½ΠΈ ΠΏΠΎ ΠΏΠΎΡ‰Π°Ρ‚Π° ΠΏΠΎΠ΄ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π° Π½Π° Π²Ρ€ΡŠΠ·ΠΊΠ° Π·Π° Ρ€Π°Π·Ρ‚ΠΎΠ²Π°Ρ€Π²Π°Π½Π΅ Π½Π° Π°Ρ€Ρ…ΠΈΠ²Π°.

Врябва Π΄Π° сС ΠΎΡ‚Π±Π΅Π»Π΅ΠΆΠΈ, Ρ‡Π΅ Ρ‚ΠΎΠ²Π° Π½Π΅ Π΅ СдинствСният случай, Π² ΠΊΠΎΠΉΡ‚ΠΎ Π±ΠΈΠ·Π½Π΅ΡΡŠΡ‚ иска Π΄Π° ΡΡŠΠ±ΠΈΡ€Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΈΠΌΠ΅ΠΉΠ»ΠΈ ΠΈΠ»ΠΈ ΠΌΠ΅ΡΠΈΠ½Π΄ΠΆΡŠΡ€ΠΈ. Π’ Ρ‚ΠΎΠ·ΠΈ случай ΠΎΠ±Π°Ρ‡Π΅ Π½Π΅ Π±ΠΈΡ…ΠΌΠ΅ ΠΌΠΎΠ³Π»ΠΈ Π΄Π° повлияСм Π½Π° Ρ‚Ρ€Π΅Ρ‚Π° компания, която прСдоставя част ΠΎΡ‚ Π΄Π°Π½Π½ΠΈΡ‚Π΅ само ΠΏΠΎ Ρ‚ΠΎΠ·ΠΈ Π½Π°Ρ‡ΠΈΠ½.

Π²ΡŠΠ·Π΄ΡƒΡˆΠ΅Π½ ΠΏΠΎΡ‚ΠΎΠΊ Apache

Π—Π° ΠΈΠ·Π³Ρ€Π°ΠΆΠ΄Π°Π½Π΅ Π½Π° ETL процСси Π½Π°ΠΉ-чСсто ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ Apache Airflow. Π—Π° Π΄Π° ΠΌΠΎΠΆΠ΅ читатСлят, ΠΊΠΎΠΉΡ‚ΠΎ Π½Π΅ Π΅ Π·Π°ΠΏΠΎΠ·Π½Π°Ρ‚ с Ρ‚Π°Π·ΠΈ тСхнология, Π΄Π° Ρ€Π°Π·Π±Π΅Ρ€Π΅ ΠΏΠΎ-Π΄ΠΎΠ±Ρ€Π΅ ΠΊΠ°ΠΊ ΠΈΠ·Π³Π»Π΅ΠΆΠ΄Π° Π² контСкста ΠΈ ΠΊΠ°Ρ‚ΠΎ цяло, Ρ‰Π΅ опиша няколко ΡƒΠ²ΠΎΠ΄Π½ΠΈ.

Apache Airflow Π΅ Π±Π΅Π·ΠΏΠ»Π°Ρ‚Π½Π° ΠΏΠ»Π°Ρ‚Ρ„ΠΎΡ€ΠΌΠ°, която сС ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π° Π·Π° ΠΈΠ·Π³Ρ€Π°ΠΆΠ΄Π°Π½Π΅, изпълнСниС ΠΈ наблюдСниС Π½Π° ETL (Extract-Transform-Loading) процСси Π² Python. ΠžΡΠ½ΠΎΠ²Π½Π°Ρ‚Π° концСпция Π² Airflow Π΅ насочСн Π°Ρ†ΠΈΠΊΠ»ΠΈΡ‡Π΅Π½ Π³Ρ€Π°Ρ„, ΠΊΡŠΠ΄Π΅Ρ‚ΠΎ Π²ΡŠΡ€Ρ…ΠΎΠ²Π΅Ρ‚Π΅ Π½Π° Π³Ρ€Π°Ρ„ΠΈΠΊΠ°Ρ‚Π° са спСцифични процСси, Π° Ρ€ΡŠΠ±ΠΎΠ²Π΅Ρ‚Π΅ Π½Π° Π³Ρ€Π°Ρ„ΠΈΠΊΠ°Ρ‚Π° са ΠΏΠΎΡ‚ΠΎΠΊ ΠΎΡ‚ ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ» ΠΈΠ»ΠΈ информация. Π•Π΄ΠΈΠ½ процСс ΠΌΠΎΠΆΠ΅ просто Π΄Π° ΠΈΠ·Π²ΠΈΠΊΠ° всяка функция Π½Π° Python ΠΈΠ»ΠΈ ΠΌΠΎΠΆΠ΅ Π΄Π° ΠΈΠΌΠ° ΠΏΠΎ-слоТна Π»ΠΎΠ³ΠΈΠΊΠ° ΠΎΡ‚ послСдоватСлно ΠΈΠ·Π²ΠΈΠΊΠ²Π°Π½Π΅ Π½Π° няколко Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΈ Π² контСкста Π½Π° клас. Π—Π° Π½Π°ΠΉ-чСститС ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ Π²Π΅Ρ‡Π΅ ΠΈΠΌΠ° ΠΌΠ½ΠΎΠ³ΠΎ Π³ΠΎΡ‚ΠΎΠ²ΠΈ Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ, ΠΊΠΎΠΈΡ‚ΠΎ ΠΌΠΎΠ³Π°Ρ‚ Π΄Π° сС ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°Ρ‚ ΠΊΠ°Ρ‚ΠΎ процСси. Π’Π°ΠΊΠΈΠ²Π° Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ Π²ΠΊΠ»ΡŽΡ‡Π²Π°Ρ‚:

  • ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€ΠΈ - Π·Π° ΠΏΡ€Π΅Ρ…Π²ΡŠΡ€Π»ΡΠ½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ Π΅Π΄Π½ΠΎ място Π½Π° Π΄Ρ€ΡƒΠ³ΠΎ, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€ ΠΎΡ‚ Ρ‚Π°Π±Π»ΠΈΡ†Π° Π½Π° Π±Π°Π·Π° Π΄Π°Π½Π½ΠΈ към Ρ…Ρ€Π°Π½ΠΈΠ»ΠΈΡ‰Π΅ Π·Π° Π΄Π°Π½Π½ΠΈ;
  • сСнзори - Π·Π° ΠΈΠ·Ρ‡Π°ΠΊΠ²Π°Π½Π΅ Π½Π° Π½Π°ΡΡ‚ΡŠΠΏΠ²Π°Π½Π΅Ρ‚ΠΎ Π½Π° ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½ΠΎ ΡΡŠΠ±ΠΈΡ‚ΠΈΠ΅ ΠΈ насочванС Π½Π° ΠΏΠΎΡ‚ΠΎΠΊΠ° Π½Π° ΡƒΠΏΡ€Π°Π²Π»Π΅Π½ΠΈΠ΅ към слСдващитС Π²ΡŠΡ€Ρ…ΠΎΠ²Π΅ Π½Π° Π³Ρ€Π°Ρ„Π°;
  • ΠΊΡƒΠΊΠΈΡ‡ΠΊΠΈ - Π·Π° ΠΎΠΏΠ΅Ρ€Π°Ρ†ΠΈΠΈ ΠΎΡ‚ ΠΏΠΎ-ниско Π½ΠΈΠ²ΠΎ, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€ Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ Ρ‚Π°Π±Π»ΠΈΡ†Π° Π½Π° Π±Π°Π·Π° Π΄Π°Π½Π½ΠΈ (ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π° сС Π² ΠΈΠ·Ρ€Π°Π·ΠΈ);
  • ΠΈ Ρ‚.Π½.

Π‘ΠΈ Π±ΠΈΠ»ΠΎ нСумСстно Π΄Π° описвамС Apache Airflow ΠΏΠΎΠ΄Ρ€ΠΎΠ±Π½ΠΎ Π² Ρ‚Π°Π·ΠΈ статия. ΠœΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° Π²ΠΈΠ΄ΠΈΡ‚Π΅ ΠΊΡ€Π°Ρ‚ΠΊΠΈ въвСдСния Ρ‚ΡƒΠΊ ΠΈΠ»ΠΈ Ρ‚ΡƒΠΊ.

ΠšΡƒΠΊΠ° Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ

ΠŸΡŠΡ€Π²ΠΎ, Π·Π° Π΄Π° Ρ€Π΅ΡˆΠΈΠΌ ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠ°, трябва Π΄Π° напишСм ΠΊΡƒΠΊΠ°, с която ΠΌΠΎΠΆΠ΅ΠΌ:

  • ΡΠ²ΡŠΡ€ΠΆΠ΅Ρ‚Π΅ сС с ΠΈΠΌΠ΅ΠΉΠ»
  • Π½Π°ΠΌΠ΅Ρ€ΠΈ ΠΏΡ€Π°Π²ΠΈΠ»Π½Π°Ρ‚Π° Π±ΡƒΠΊΠ²Π°
  • ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Ρ‚ Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ писмото.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Π›ΠΎΠ³ΠΈΠΊΠ°Ρ‚Π° Π΅ слСдната: ΡΠ²ΡŠΡ€Π·Π²Π°ΠΌΠ΅ сС, Π½Π°ΠΌΠΈΡ€Π°ΠΌΠ΅ послСдната Π½Π°ΠΉ-подходяща Π±ΡƒΠΊΠ²Π°, Π°ΠΊΠΎ ΠΈΠΌΠ° Π΄Ρ€ΡƒΠ³ΠΈ, Π³ΠΈ ΠΈΠ³Π½ΠΎΡ€ΠΈΡ€Π°ΠΌΠ΅. Π’Π°Π·ΠΈ функция сС ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°, Π·Π°Ρ‰ΠΎΡ‚ΠΎ ΠΏΠΎ-ΠΊΡŠΡΠ½ΠΈΡ‚Π΅ писма ΡΡŠΠ΄ΡŠΡ€ΠΆΠ°Ρ‚ всички Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΏΠΎ-Ρ€Π°Π½Π½ΠΈΡ‚Π΅. Ако Ρ‚ΠΎΠ²Π° Π½Π΅ Π΅ Ρ‚Π°ΠΊΠ°, Ρ‚ΠΎΠ³Π°Π²Π° ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° Π²ΡŠΡ€Π½Π΅Ρ‚Π΅ масив ΠΎΡ‚ всички Π±ΡƒΠΊΠ²ΠΈ ΠΈΠ»ΠΈ Π΄Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΈΡ‚Π΅ ΠΏΡŠΡ€Π²Π°Ρ‚Π°, Π° останалитС ΠΏΡ€ΠΈ слСдващото ΠΏΡ€Π΅ΠΌΠΈΠ½Π°Π²Π°Π½Π΅. ΠšΠ°Ρ‚ΠΎ цяло всичко, ΠΊΠ°ΠΊΡ‚ΠΎ Π²ΠΈΠ½Π°Π³ΠΈ, зависи ΠΎΡ‚ Π·Π°Π΄Π°Ρ‡Π°Ρ‚Π°.

ДобавямС Π΄Π²Π΅ ΠΏΠΎΠΌΠΎΡ‰Π½ΠΈ Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΈ към ΠΊΡƒΠΊΠ°Ρ‚Π°: Π·Π° изтСглянС Π½Π° Ρ„Π°ΠΉΠ» ΠΈ Π·Π° изтСглянС Π½Π° Ρ„Π°ΠΉΠ» Ρ‡Ρ€Π΅Π· Π²Ρ€ΡŠΠ·ΠΊΠ° ΠΎΡ‚ ΠΈΠΌΠ΅ΠΉΠ». ΠœΠ΅ΠΆΠ΄Ρƒ Π΄Ρ€ΡƒΠ³ΠΎΡ‚ΠΎ, Ρ‚Π΅ ΠΌΠΎΠ³Π°Ρ‚ Π΄Π° Π±ΡŠΠ΄Π°Ρ‚ прСмСстСни Π½Π° ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€Π°, зависи ΠΎΡ‚ чСстотата Π½Π° ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°Π½Π΅ Π½Π° Ρ‚Π°Π·ΠΈ функционалност. Какво Π΄Ρ€ΡƒΠ³ΠΎ Π΄Π° Π΄ΠΎΠ±Π°Π²ΠΈΡ‚Π΅ към ΠΊΡƒΠΊΠ°Ρ‚Π°, ΠΎΡ‚Π½ΠΎΠ²ΠΎ зависи ΠΎΡ‚ Π·Π°Π΄Π°Ρ‡Π°Ρ‚Π°: Π°ΠΊΠΎ Ρ„Π°ΠΉΠ»ΠΎΠ²Π΅Ρ‚Π΅ са ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈ Π²Π΅Π΄Π½Π°Π³Π° Π² писмото, Ρ‚ΠΎΠ³Π°Π²Π° ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° ΠΈΠ·Ρ‚Π΅Π³Π»ΠΈΡ‚Π΅ ΠΏΡ€ΠΈΠΊΠ°Ρ‡Π΅Π½ΠΈ Ρ„Π°ΠΉΠ»ΠΎΠ²Π΅ към писмото, Π°ΠΊΠΎ Π΄Π°Π½Π½ΠΈΡ‚Π΅ са ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈ Π² писмото, Ρ‚ΠΎΠ³Π°Π²Π° трябва Π΄Π° Π°Π½Π°Π»ΠΈΠ·ΠΈΡ€Π°Ρ‚Π΅ писмото, ΠΈ Ρ‚.Π½. Π’ моя случай писмото ΠΈΠ΄Π²Π° с Π΅Π΄Π½Π° Π²Ρ€ΡŠΠ·ΠΊΠ° към Π°Ρ€Ρ…ΠΈΠ²Π°, която трябва Π΄Π° поставя Π½Π° ΠΎΠΏΡ€Π΅Π΄Π΅Π»Π΅Π½ΠΎ място ΠΈ Π΄Π° Π·Π°ΠΏΠΎΡ‡Π½Π° ΠΏΠΎ-Π½Π°Ρ‚Π°Ρ‚ΡŠΡˆΠ½Π°Ρ‚Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ°.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

ΠšΠΎΠ΄ΡŠΡ‚ Π΅ прост, Ρ‚Π°ΠΊΠ° Ρ‡Π΅ Π΅Π΄Π²Π° Π»ΠΈ сС Π½ΡƒΠΆΠ΄Π°Π΅ ΠΎΡ‚ Π΄ΠΎΠΏΡŠΠ»Π½ΠΈΡ‚Π΅Π»Π½ΠΎ обяснСниС. ΠŸΡ€ΠΎΡΡ‚ΠΎ Ρ‰Π΅ Π²ΠΈ Ρ€Π°Π·ΠΊΠ°ΠΆΠ° Π·Π° магичСския Ρ€Π΅Π΄ imap_conn_id. Apache Airflow ΡΡŠΡ…Ρ€Π°Π½ΡΠ²Π° ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ΠΈ Π½Π° Π²Ρ€ΡŠΠ·ΠΊΠ°Ρ‚Π° (Π²Π»ΠΈΠ·Π°Π½Π΅, ΠΏΠ°Ρ€ΠΎΠ»Π°, адрСс ΠΈ Π΄Ρ€ΡƒΠ³ΠΈ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ΠΈ), ΠΊΠΎΠΈΡ‚ΠΎ ΠΌΠΎΠ³Π°Ρ‚ Π΄Π° Π±ΡŠΠ΄Π°Ρ‚ Π΄ΠΎΡΡ‚ΡŠΠΏΠ½ΠΈ Ρ‡Ρ€Π΅Π· ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ Π½Π° Π½ΠΈΠ·. Π’ΠΈΠ·ΡƒΠ°Π»Π½ΠΎ ΡƒΠΏΡ€Π°Π²Π»Π΅Π½ΠΈΠ΅Ρ‚ΠΎ Π½Π° Π²Ρ€ΡŠΠ·ΠΊΠ°Ρ‚Π° ΠΈΠ·Π³Π»Π΅ΠΆΠ΄Π° Ρ‚Π°ΠΊΠ°

ETL процСс Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΈΠΌΠ΅ΠΉΠ» Π² Apache Airflow

Π‘Π΅Π½Π·ΠΎΡ€ Π·Π° ΠΈΠ·Ρ‡Π°ΠΊΠ²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ

Въй ΠΊΠ°Ρ‚ΠΎ Π²Π΅Ρ‡Π΅ Π·Π½Π°Π΅ΠΌ ΠΊΠ°ΠΊ Π΄Π° сС ΡΠ²ΡŠΡ€Π·Π²Π°ΠΌΠ΅ ΠΈ Π΄Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°ΠΌΠ΅ Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ ΠΏΠΎΡ‰Π°Ρ‚Π°, сСга ΠΌΠΎΠΆΠ΅ΠΌ Π΄Π° напишСм сСнзор, ΠΊΠΎΠΉΡ‚ΠΎ Π΄Π° Π³ΠΈ Ρ‡Π°ΠΊΠ°. Π’ моя случай Π½Π΅ сС ΠΏΠΎΠ»ΡƒΡ‡ΠΈ Π²Π΅Π΄Π½Π°Π³Π° Π΄Π° напиша ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€, ΠΊΠΎΠΉΡ‚ΠΎ Ρ‰Π΅ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π²Π° Π΄Π°Π½Π½ΠΈΡ‚Π΅, Π°ΠΊΠΎ ΠΈΠΌΠ° Ρ‚Π°ΠΊΠΈΠ²Π°, Π·Π°Ρ‰ΠΎΡ‚ΠΎ Π΄Ρ€ΡƒΠ³ΠΈ процСси работят въз основа Π½Π° Π΄Π°Π½Π½ΠΈΡ‚Π΅, ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈ ΠΎΡ‚ ΠΏΠΎΡ‰Π°Ρ‚Π°, Π²ΠΊΠ»ΡŽΡ‡ΠΈΡ‚Π΅Π»Π½ΠΎ Ρ‚Π΅Π·ΠΈ, ΠΊΠΎΠΈΡ‚ΠΎ Π²Π·Π΅ΠΌΠ°Ρ‚ ΡΠ²ΡŠΡ€Π·Π°Π½ΠΈ Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ Π΄Ρ€ΡƒΠ³ΠΈ ΠΈΠ·Ρ‚ΠΎΡ‡Π½ΠΈΡ†ΠΈ (API, тСлСфония , ΡƒΠ΅Π± ΠΏΠΎΠΊΠ°Π·Π°Ρ‚Π΅Π»ΠΈ ΠΈ Ρ‚.Π½.) ΠΈ Ρ‚.Π½.). Π©Π΅ Π²ΠΈ Π΄Π°ΠΌ ΠΏΡ€ΠΈΠΌΠ΅Ρ€. Π’ CRM систСмата сС появи Π½ΠΎΠ² ΠΏΠΎΡ‚Ρ€Π΅Π±ΠΈΡ‚Π΅Π» ΠΈ всС ΠΎΡ‰Π΅ Π½Π΅ Π·Π½Π°Π΅ΠΌ Π·Π° нСговия UUID. Π‘Π»Π΅Π΄ Ρ‚ΠΎΠ²Π°, ΠΊΠΎΠ³Π°Ρ‚ΠΎ сС ΠΎΠΏΠΈΡ‚Π²Π°ΠΌΠ΅ Π΄Π° ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠΌ Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ SIP тСлСфония, Ρ‰Π΅ ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°ΠΌΠ΅ обаТдания, ΡΠ²ΡŠΡ€Π·Π°Π½ΠΈ с нСговия UUID, Π½ΠΎ няма Π΄Π° ΠΌΠΎΠΆΠ΅ΠΌ Π΄Π° Π³ΠΈ Π·Π°ΠΏΠ°Π·ΠΈΠΌ ΠΈ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ ΠΏΡ€Π°Π²ΠΈΠ»Π½ΠΎ. ΠŸΡ€ΠΈ Ρ‚Π°ΠΊΠΈΠ²Π° Π²ΡŠΠΏΡ€ΠΎΡΠΈ Π΅ Π²Π°ΠΆΠ½ΠΎ Π΄Π° сС ΠΈΠΌΠ° ΠΏΡ€Π΅Π΄Π²ΠΈΠ΄ зависимостта Π½Π° Π΄Π°Π½Π½ΠΈΡ‚Π΅, особСно Π°ΠΊΠΎ са ΠΎΡ‚ Ρ€Π°Π·Π»ΠΈΡ‡Π½ΠΈ ΠΈΠ·Ρ‚ΠΎΡ‡Π½ΠΈΡ†ΠΈ. Π’ΠΎΠ²Π°, Ρ€Π°Π·Π±ΠΈΡ€Π° сС, са Π½Π΅Π΄ΠΎΡΡ‚Π°Ρ‚ΡŠΡ‡Π½ΠΈ ΠΌΠ΅Ρ€ΠΊΠΈ Π·Π° Π·Π°ΠΏΠ°Π·Π²Π°Π½Π΅ цСлостта Π½Π° Π΄Π°Π½Π½ΠΈΡ‚Π΅, Π½ΠΎ Π² някои случаи са Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΈ. Π”Π°, ΠΈ бСздСйствиСто Π·Π° Π·Π°Π΅ΠΌΠ°Π½Π΅ Π½Π° рСсурси ΡΡŠΡ‰ΠΎ Π΅ ΠΈΡ€Π°Ρ†ΠΈΠΎΠ½Π°Π»Π½ΠΎ.

По Ρ‚ΠΎΠ·ΠΈ Π½Π°Ρ‡ΠΈΠ½ Π½Π°ΡˆΠΈΡΡ‚ сСнзор Ρ‰Π΅ стартира слСдващитС Π²ΡŠΡ€Ρ…ΠΎΠ²Π΅ Π½Π° Π³Ρ€Π°Ρ„ΠΈΠΊΠ°Ρ‚Π°, Π°ΠΊΠΎ ΠΈΠΌΠ° свСТа информация Π² ΠΏΠΎΡ‰Π°Ρ‚Π°, Π° ΡΡŠΡ‰ΠΎ Ρ‚Π°ΠΊΠ° Ρ‰Π΅ ΠΌΠ°Ρ€ΠΊΠΈΡ€Π° ΠΏΡ€Π΅Π΄ΠΈΡˆΠ½Π°Ρ‚Π° информация ΠΊΠ°Ρ‚ΠΎ нСумСстна.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

НиС ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°ΠΌΠ΅ ΠΈ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ Π΄Π°Π½Π½ΠΈ

Π—Π° Π΄Π° ΠΏΠΎΠ»ΡƒΡ‡Π°Π²Π°Ρ‚Π΅ ΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π²Π°Ρ‚Π΅ Π΄Π°Π½Π½ΠΈ, ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° Π½Π°ΠΏΠΈΡˆΠ΅Ρ‚Π΅ ΠΎΡ‚Π΄Π΅Π»Π΅Π½ ΠΎΠΏΠ΅Ρ€Π°Ρ‚ΠΎΡ€, ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°Ρ‚Π΅ Π³ΠΎΡ‚ΠΎΠ²ΠΈ. Въй ΠΊΠ°Ρ‚ΠΎ Π»ΠΎΠ³ΠΈΠΊΠ°Ρ‚Π° всС ΠΎΡ‰Π΅ Π΅ Ρ‚Ρ€ΠΈΠ²ΠΈΠ°Π»Π½Π° - Π΄Π° Π²Π·Π΅ΠΌΠ΅ΠΌ Π΄Π°Π½Π½ΠΈ ΠΎΡ‚ писмото, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€, ΠΏΡ€Π΅Π΄Π»Π°Π³Π°ΠΌ стандартния PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

ΠœΠ΅ΠΆΠ΄Ρƒ Π΄Ρ€ΡƒΠ³ΠΎΡ‚ΠΎ, Π°ΠΊΠΎ Π²Π°ΡˆΠ°Ρ‚Π° ΠΊΠΎΡ€ΠΏΠΎΡ€Π°Ρ‚ΠΈΠ²Π½Π° ΠΏΠΎΡ‰Π° ΡΡŠΡ‰ΠΎ Π΅ Π½Π° mail.ru, Ρ‚ΠΎΠ³Π°Π²Π° няма Π΄Π° ΠΌΠΎΠΆΠ΅Ρ‚Π΅ Π΄Π° Ρ‚ΡŠΡ€ΡΠΈΡ‚Π΅ писма ΠΏΠΎ Ρ‚Π΅ΠΌΠ°, ΠΏΠΎΠ΄Π°Ρ‚Π΅Π» ΠΈ Ρ‚.Π½. ΠžΡ‰Π΅ ΠΏΡ€Π΅Π· 2016 Π³. ΠΎΠ±Π΅Ρ‰Π°Ρ…Π° Π΄Π° Π³ΠΎ Π²ΡŠΠ²Π΅Π΄Π°Ρ‚, Π½ΠΎ явно са размислили. Π Π΅ΡˆΠΈΡ… Ρ‚ΠΎΠ·ΠΈ ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌ, ΠΊΠ°Ρ‚ΠΎ ΡΡŠΠ·Π΄Π°Π΄ΠΎΡ… ΠΎΡ‚Π΄Π΅Π»Π½Π° ΠΏΠ°ΠΏΠΊΠ° Π·Π° Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΈΡ‚Π΅ писма ΠΈ настроих Ρ„ΠΈΠ»Ρ‚ΡŠΡ€ Π·Π° Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΈΡ‚Π΅ писма Π² ΡƒΠ΅Π± интСрфСйса Π½Π° ΠΏΠΎΡ‰Π°Ρ‚Π°. По Ρ‚ΠΎΠ·ΠΈ Π½Π°Ρ‡ΠΈΠ½ Π² Ρ‚Π°Π·ΠΈ ΠΏΠ°ΠΏΠΊΠ° Π²Π»ΠΈΠ·Π°Ρ‚ само Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΈΡ‚Π΅ Π±ΡƒΠΊΠ²ΠΈ ΠΈ условия Π·Π° Ρ‚ΡŠΡ€ΡΠ΅Π½Π΅, Π² моя случай просто (ΠΠ•Π’Π˜Π”Π˜ΠœΠž).

ΠžΠ±ΠΎΠ±Ρ‰Π°Π²Π°ΠΉΠΊΠΈ, ΠΈΠΌΠ°ΠΌΠ΅ слСдната послСдоватСлност: провСрявамС Π΄Π°Π»ΠΈ ΠΈΠΌΠ° Π½ΠΎΠ²ΠΈ писма, ΠΊΠΎΠΈΡ‚ΠΎ отговарят Π½Π° условията, Π°ΠΊΠΎ ΠΈΠΌΠ°, Ρ‚ΠΎΠ³Π°Π²Π° изтСглямС Π°Ρ€Ρ…ΠΈΠ²Π°, ΠΊΠ°Ρ‚ΠΎ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ Π²Ρ€ΡŠΠ·ΠΊΠ°Ρ‚Π° ΠΎΡ‚ послСдното писмо.
Под послСднитС Ρ‚ΠΎΡ‡ΠΊΠΈ Π΅ пропуснато, Ρ‡Π΅ Ρ‚ΠΎΠ·ΠΈ Π°Ρ€Ρ…ΠΈΠ² Ρ‰Π΅ бъдС Ρ€Π°Π·ΠΎΠΏΠ°ΠΊΠΎΠ²Π°Π½, Π΄Π°Π½Π½ΠΈΡ‚Π΅ ΠΎΡ‚ Π°Ρ€Ρ…ΠΈΠ²Π° Ρ‰Π΅ Π±ΡŠΠ΄Π°Ρ‚ изчистСни ΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π΅Π½ΠΈ ΠΈ Π² Ρ€Π΅Π·ΡƒΠ»Ρ‚Π°Ρ‚ Π½Π° Ρ‚ΠΎΠ²Π° всичко Ρ‰Π΅ ΠΎΡ‚ΠΈΠ΄Π΅ ΠΏΠΎ-Π½Π°Ρ‚Π°Ρ‚ΡŠΠΊ Π² ΠΊΠΎΠ½Π²Π΅ΠΉΠ΅Ρ€Π° Π½Π° ETL процСса, Π½ΠΎ Ρ‚ΠΎΠ²Π° Π²Π΅Ρ‡Π΅ Π΅ ΠΎΡ‚Π²ΡŠΠ΄ ΠΎΠ±Ρ…Π²Π°Ρ‚Π° Π½Π° статията. Ако сС ΠΎΠΊΠ°Π·Π° интСрСсно ΠΈ ΠΏΠΎΠ»Π΅Π·Π½ΠΎ, Ρ‚ΠΎΠ³Π°Π²Π° с удоволствиС Ρ‰Π΅ ΠΏΡ€ΠΎΠ΄ΡŠΠ»ΠΆΠ° Π΄Π° описвам ETL Ρ€Π΅ΡˆΠ΅Π½ΠΈΡ ΠΈ Ρ‚Π΅Ρ…Π½ΠΈΡ‚Π΅ части Π·Π° Apache Airflow.

Π˜Π·Ρ‚ΠΎΡ‡Π½ΠΈΠΊ: www.habr.com

ДобавянС Π½Π° Π½ΠΎΠ² ΠΊΠΎΠΌΠ΅Π½Ρ‚Π°Ρ€