ΠΠ°ΠΊ Π±Ρ ΡΠΈΠ»ΡΠ½ΠΎ Π½Π΅ ΡΠ°Π·Π²ΠΈΠ²Π°Π»ΠΈΡΡ ΡΠ΅Ρ Π½ΠΎΠ»ΠΎΠ³ΠΈΠΈ, Π·Π° ΡΠ°Π·Π²ΠΈΡΠΈΠ΅ΠΌ Π²ΡΠ΅Π³Π΄Π° ΡΡΠ½Π΅ΡΡΡ Π²Π΅ΡΠ΅Π½ΠΈΡΠ° ΡΡΡΠ°ΡΠ΅Π²ΡΠΈΡ ΠΏΠΎΠ΄Ρ ΠΎΠ΄ΠΎΠ². ΠΡΠΎ ΠΌΠΎΠΆΠ΅Ρ Π±ΡΡΡ ΠΎΠ±ΡΡΠ»ΠΎΠ²Π»Π΅Π½ΠΎ ΠΏΠ»Π°Π²Π½ΡΠΌ ΠΏΠ΅ΡΠ΅Ρ ΠΎΠ΄ΠΎΠΌ, ΡΠ΅Π»ΠΎΠ²Π΅ΡΠ΅ΡΠΊΠΈΠΌ ΡΠ°ΠΊΡΠΎΡΠΎΠΌ, ΡΠ΅Ρ Π½ΠΎΠ»ΠΎΠ³ΠΈΡΠ΅ΡΠΊΠΈΠΌΠΈ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎΡΡΡΠΌΠΈ ΠΈΠ»ΠΈ ΡΠ΅ΠΌ-ΡΠΎ Π΄ΡΡΠ³ΠΈΠΌ. Π ΠΎΠ±Π»Π°ΡΡΠΈ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠΈ Π΄Π°Π½Π½ΡΡ Π½Π°ΠΈΠ±ΠΎΠ»Π΅Π΅ ΠΏΠΎΠΊΠ°Π·Π°ΡΠ΅Π»ΡΠ½ΡΠΌΠΈ Π² ΡΡΠΎΠΉ ΡΠ°ΡΡΠΈ ΡΠ²Π»ΡΡΡΡΡ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠΈ Π΄Π°Π½Π½ΡΡ . ΠΠ°ΠΊ Π±Ρ ΠΌΡ Π½Π΅ ΠΌΠ΅ΡΡΠ°Π»ΠΈ ΠΎΡ ΡΡΠΎΠ³ΠΎ ΠΈΠ·Π±Π°Π²ΠΈΡΡΡΡ, Π½ΠΎ ΠΏΠΎΠΊΠ° ΡΠ°ΡΡΡ Π΄Π°Π½Π½ΡΡ ΠΏΠ΅ΡΠ΅ΡΡΠ»Π°Π΅ΡΡΡ Π² ΠΌΠ΅ΡΡΠ΅Π½Π΄ΠΆΠ΅ΡΠ°Ρ ΠΈ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΡΡ ΠΏΠΈΡΡΠΌΠ°Ρ , Π½Π΅ Π³ΠΎΠ²ΠΎΡΡ ΠΈ ΠΏΡΠΎ Π±ΠΎΠ»Π΅Π΅ Π°ΡΡ Π°ΠΈΡΠ½ΡΠ΅ ΡΠΎΡΠΌΠ°ΡΡ. ΠΡΠΈΠ³Π»Π°ΡΠ°Ρ ΠΏΠΎΠ΄ ΠΊΠ°Ρ ΡΠ°Π·ΠΎΠ±ΡΠ°ΡΡ ΠΎΠ΄ΠΈΠ½ ΠΈΠ· Π²Π°ΡΠΈΠ°Π½ΡΠΎΠ² Π΄Π»Ρ Apache Airflow, ΠΈΠ»Π»ΡΡΡΡΠΈΡΡΡΡΠΈΠΉ, ΠΊΠ°ΠΊ ΠΌΠΎΠΆΠ½ΠΎ Π·Π°Π±ΠΈΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΡΡ ΠΏΠΈΡΠ΅ΠΌ.
ΠΡΠ΅Π΄ΡΡΡΠΎΡΠΈΡ
ΠΠ½ΠΎΠ³ΠΈΠ΅ Π΄Π°Π½Π½ΡΠ΅ Π΄ΠΎ ΡΠΈΡ ΠΏΠΎΡ ΠΏΠ΅ΡΠ΅Π΄Π°ΡΡΡΡ ΡΠ΅ΡΠ΅Π· ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΡΡ ΠΏΠΎΡΡΡ, Π½Π°ΡΠΈΠ½Π°Ρ Ρ ΠΌΠ΅ΠΆΠ»ΠΈΡΠ½ΠΎΡΡΠ½ΡΡ ΠΊΠΎΠΌΠΌΡΠ½ΠΈΠΊΠ°ΡΠΈΠΉ ΠΈ Π·Π°ΠΊΠ°Π½ΡΠΈΠ²Π°Ρ ΡΡΠ°Π½Π΄Π°ΡΡΠ°ΠΌΠΈ Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΠΈΡ ΠΌΠ΅ΠΆΠ΄Ρ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΡΠΌΠΈ. Π₯ΠΎΡΠΎΡΠΎ, Π΅ΡΠ»ΠΈ ΡΠ΄Π°Π΅ΡΡΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ Π½Π°ΠΏΠΈΡΠ°ΡΡ ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡ ΠΈΠ»ΠΈ ΠΏΠΎΡΠ°Π΄ΠΈΡΡ Π»ΡΠ΄Π΅ΠΉ Π² ΠΎΡΠΈΡΠ΅, ΠΊΠΎΡΠΎΡΡΠ΅ ΡΡΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ Π±ΡΠ΄ΡΡ Π²Π½ΠΎΡΠΈΡΡ Π² Π±ΠΎΠ»Π΅Π΅ ΡΠ΄ΠΎΠ±Π½ΡΠ΅ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠΈ, Π½ΠΎ Π·Π°ΡΠ°ΡΡΡΡ ΡΠ°ΠΊΠΎΠΉ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΠΈ ΠΌΠΎΠΆΠ΅Ρ ΠΏΡΠΎΡΡΠΎ Π½Π΅ Π±ΡΡΡ. ΠΠΎΠ½ΠΊΡΠ΅ΡΠ½Π°Ρ Π·Π°Π΄Π°ΡΠ°, Ρ ΠΊΠΎΡΠΎΡΠΎΠΉ ΡΡΠΎΠ»ΠΊΠ½ΡΠ»ΡΡ Ρ, β ΡΡΠΎ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΠ΅ Π½Π΅Π±Π΅Π·ΡΠ·Π²Π΅ΡΡΠ½ΠΎΠΉ CRM ΡΠΈΡΡΠ΅ΠΌΡ ΠΊ Ρ ΡΠ°Π½ΠΈΠ»ΠΈΡΡ Π΄Π°Π½Π½ΡΡ , Π° Π΄Π°Π»Π΅Π΅ β ΠΊ ΡΠΈΡΡΠ΅ΠΌΠ΅ OLAP. Π’Π°ΠΊ ΠΈΡΡΠΎΡΠΈΡΠ΅ΡΠΊΠΈ ΡΠ»ΠΎΠΆΠΈΠ»ΠΎΡΡ, ΡΡΠΎ Π΄Π»Ρ Π½Π°ΡΠ΅ΠΉ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΠΈ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠ΅ ΡΡΠΎΠΉ ΡΠΈΡΡΠ΅ΠΌΡ Π±ΡΠ»ΠΎ ΡΠ΄ΠΎΠ±Π½ΠΎ Π² ΠΎΡΠ΄Π΅Π»ΡΠ½ΠΎ Π²Π·ΡΡΠΎΠΉ ΠΎΠ±Π»Π°ΡΡΠΈ Π±ΠΈΠ·Π½Π΅ΡΠ°. ΠΠΎΡΡΠΎΠΌΡ Π²ΡΠ΅ΠΌ ΠΎΡΠ΅Π½Ρ Ρ ΠΎΡΠ΅Π»ΠΎΡΡ ΠΈΠΌΠ΅ΡΡ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΎΠΏΠ΅ΡΠΈΡΠΎΠ²Π°ΡΡ Π΄Π°Π½Π½ΡΠΌΠΈ ΠΈ ΠΈΠ· ΡΡΠΎΠΉ ΡΡΠΎΡΠΎΠ½Π½Π΅ΠΉ ΡΠΈΡΡΠ΅ΠΌΡ Π² ΡΠΎΠΌ ΡΠΈΡΠ»Π΅. Π ΠΏΠ΅ΡΠ²ΡΡ ΠΎΡΠ΅ΡΠ΅Π΄Ρ, ΠΊΠΎΠ½Π΅ΡΠ½ΠΎ, Π±ΡΠ»Π° ΠΈΠ·ΡΡΠ΅Π½Π° Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ ΠΈΠ· ΠΎΡΠΊΡΡΡΠΎΠ³ΠΎ API. Π ΡΠΎΠΆΠ°Π»Π΅Π½ΠΈΡ, API Π½Π΅ ΠΏΠΎΠΊΡΡΠ²Π°Π»ΠΎ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΠ΅ Π²ΡΠ΅Ρ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΡΡ Π΄Π°Π½Π½ΡΡ , Π΄Π° ΠΈ, Π²ΡΡΠ°ΠΆΠ°ΡΡΡ ΠΏΡΠΎΡΡΡΠΌ ΡΠ·ΡΠΊΠΎΠΌ, Π±ΡΠ»ΠΎ Π²ΠΎ ΠΌΠ½ΠΎΠ³ΠΎΠΌ ΠΊΡΠΈΠ²ΠΎΠ²Π°ΡΠΎ, Π° ΡΠ΅Ρ Π½ΠΈΡΠ΅ΡΠΊΠ°Ρ ΠΏΠΎΠ΄Π΄Π΅ΡΠΆΠΊΠ° Π½Π΅ Π·Π°Ρ ΠΎΡΠ΅Π»Π° ΠΈΠ»ΠΈ Π½Π΅ ΡΠΌΠΎΠ³Π»Π° ΠΏΠΎΠΉΡΠΈ Π½Π°Π²ΡΡΡΠ΅ΡΡ Π΄Π»Ρ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»Π΅Π½ΠΈΡ Π±ΠΎΠ»Π΅Π΅ ΠΈΡΡΠ΅ΡΠΏΡΠ²Π°ΡΡΠ΅Π³ΠΎ ΡΡΠ½ΠΊΡΠΈΠΎΠ½Π°Π»Π°. ΠΠ°ΡΠΎ Π΄Π°Π½Π½Π°Ρ ΡΠΈΡΡΠ΅ΠΌΠ° ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΠ»Π° Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΏΠ΅ΡΠΈΠΎΠ΄ΠΈΡΠ΅ΡΠΊΠΎΠ³ΠΎ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π½Π΅Π΄ΠΎΡΡΠ°ΡΡΠΈΡ Π΄Π°Π½Π½ΡΡ Π½Π° ΠΏΠΎΡΡΡ Π² Π²ΠΈΠ΄Π΅ ΡΡΡΠ»ΠΊΠΈ Π΄Π»Ρ Π²ΡΠ³ΡΡΠ·ΠΊΠΈ Π°ΡΡ ΠΈΠ²Π°.
ΠΡΠΆΠ½ΠΎ ΠΎΡΠΌΠ΅ΡΠΈΡΡ, ΡΡΠΎ ΡΡΠΎ Π±ΡΠ» Π½Π΅ Π΅Π΄ΠΈΠ½ΡΡΠ²Π΅Π½Π½ΡΠΉ ΠΊΠ΅ΠΉΡ, ΠΏΠΎ ΠΊΠΎΡΠΎΡΠΎΠΌΡ Π±ΠΈΠ·Π½Π΅Ρ Ρ ΠΎΡΠ΅Π» ΡΠΎΠ±ΠΈΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΠΏΠΎΡΡΠΎΠ²ΡΡ ΠΏΠΈΡΠ΅ΠΌ ΠΈΠ»ΠΈ ΠΌΠ΅ΡΡΠ΅Π½Π΄ΠΆΠ΅ΡΠΎΠ². ΠΠ΄Π½Π°ΠΊΠΎ, Π² Π΄Π°Π½Π½ΠΎΠΌ ΡΠ»ΡΡΠ°Π΅ ΠΌΡ Π½Π΅ ΠΌΠΎΠ³Π»ΠΈ ΠΏΠΎΠ²Π»ΠΈΡΡΡ Π½Π° ΡΡΠΎΡΠΎΠ½Π½ΡΡ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΡ, ΠΊΠΎΡΠΎΡΠ°Ρ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΠ΅Ρ ΡΠ°ΡΡΡ Π΄Π°Π½Π½ΡΡ ΡΠΎΠ»ΡΠΊΠΎ ΡΠ°ΠΊΠΈΠΌ ΡΠΏΠΎΡΠΎΠ±ΠΎΠΌ.
Apache Airflow
ΠΠ»Ρ ΠΏΠΎΡΡΡΠΎΠ΅Π½ΠΈΡ ETL ΠΏΡΠΎΡΠ΅ΡΡΠΎΠ² ΠΌΡ ΡΠ°ΡΠ΅ Π²ΡΠ΅Π³ΠΎ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ Apache Airflow. ΠΠ»Ρ ΡΠΎΠ³ΠΎ ΡΡΠΎΠ±Ρ ΡΠΈΡΠ°ΡΠ΅Π»Ρ, Π½Π΅Π·Π½Π°ΠΊΠΎΠΌΡΠΉ Ρ ΡΡΠΎΠΉ ΡΠ΅Ρ Π½ΠΎΠ»ΠΎΠ³ΠΈΠ΅ΠΉ, Π»ΡΡΡΠ΅ ΠΏΠΎΠ½ΡΠ», ΠΊΠ°ΠΊ ΡΡΠΎ Π²ΡΠ³Π»ΡΠ΄ΠΈΡ Π² ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ΅ ΠΈ Π² ΡΠ΅Π»ΠΎΠΌ, ΠΎΠΏΠΈΡΡ ΠΏΠ°ΡΡ Π²Π²ΠΎΠ΄Π½ΡΡ .
Apache Airflow β ΡΠ²ΠΎΠ±ΠΎΠ΄Π½Π°Ρ ΠΏΠ»Π°ΡΡΠΎΡΠΌΠ°, ΠΊΠΎΡΠΎΡΠ°Ρ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ Π΄Π»Ρ ΠΏΠΎΡΡΡΠΎΠ΅Π½ΠΈΡ, Π²ΡΠΏΠΎΠ»Π½Π΅Π½ΠΈΡ ΠΈ ΠΌΠΎΠ½ΠΈΡΠΎΡΠΈΠ½Π³Π° ETL (Extract-Transform-Loading) ΠΏΡΠΎΡΠ΅ΡΡΠΎΠ² Π½Π° ΡΠ·ΡΠΊΠ΅ Python. ΠΡΠ½ΠΎΠ²Π½ΡΠΌ ΠΏΠΎΠ½ΡΡΠΈΠ΅ΠΌ Π² Airflow ΡΠ²Π»ΡΠ΅ΡΡΡ ΠΎΡΠΈΠ΅Π½ΡΠΈΡΠΎΠ²Π°Π½Π½ΡΠΉ Π°ΡΠΈΠΊΠ»ΠΈΡΠ½ΡΠΉ Π³ΡΠ°Ρ, Π³Π΄Π΅ Π²Π΅ΡΡΠΈΠ½Ρ Π³ΡΠ°ΡΠ° β ΠΊΠΎΠ½ΠΊΡΠ΅ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠ΅ΡΡΡ, Π° ΡΠ΅Π±ΡΠ° Π³ΡΠ°ΡΠ° β ΠΏΠΎΡΠΎΠΊ ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ ΠΈΠ»ΠΈ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΠΈ. ΠΡΠΎΡΠ΅ΡΡ ΠΌΠΎΠΆΠ΅Ρ ΠΏΡΠΎΡΡΠΎ Π²ΡΠ·ΡΠ²Π°ΡΡ Π»ΡΠ±ΡΡ Python ΡΡΠ½ΠΊΡΠΈΡ, Π° ΠΌΠΎΠΆΠ΅Ρ ΠΈΠΌΠ΅ΡΡ Π±ΠΎΠ»Π΅Π΅ ΡΠ»ΠΎΠΆΠ½ΡΡ Π»ΠΎΠ³ΠΈΠΊΡ ΠΈΠ· ΠΏΠΎΡΠ»Π΅Π΄ΠΎΠ²Π°ΡΠ΅Π»ΡΠ½ΠΎΠ³ΠΎ Π²ΡΠ·ΠΎΠ²Π° Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΈΡ ΡΡΠ½ΠΊΡΠΈΠΉ Π² ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ΅ ΠΊΠ»Π°ΡΡΠ°. ΠΠ»Ρ Π½Π°ΠΈΠ±ΠΎΠ»Π΅Π΅ ΡΠ°ΡΡΡΡ ΠΎΠΏΠ΅ΡΠ°ΡΠΈΠΉ ΡΠΆΠ΅ Π΅ΡΡΡ ΠΌΠ½ΠΎΠΆΠ΅ΡΡΠ²ΠΎ Π³ΠΎΡΠΎΠ²ΡΡ Π½Π°ΡΠ°Π±ΠΎΡΠΎΠΊ, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΌΠΎΠΆΠ½ΠΎ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡ Π² ΠΊΠ°ΡΠ΅ΡΡΠ²Π΅ ΠΏΡΠΎΡΠ΅ΡΡΠΎΠ². Π ΡΠ°ΠΊΠΈΠΌ Π½Π°ΡΠ°Π±ΠΎΡΠΊΠ°ΠΌ ΠΎΡΠ½ΠΎΡΡΡΡΡ:
- ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΡ β Π΄Π»Ρ ΠΏΠ΅ΡΠ΅Π³ΠΎΠ½Π° Π΄Π°Π½Π½ΡΡ ΠΈΠ· ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΌΠ΅ΡΡΠ° Π² Π΄ΡΡΠ³ΠΎΠ΅, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ ΠΈΠ· ΡΠ°Π±Π»ΠΈΡΡ ΠΠ Π² Ρ ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅ Π΄Π°Π½Π½ΡΡ ;
- ΡΠ΅Π½ΡΠΎΡΡ β Π΄Π»Ρ ΠΎΠΆΠΈΠ΄Π°Π½ΠΈΡ Π½Π°ΡΡΡΠΏΠ»Π΅Π½ΠΈΡ ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½Π½ΠΎΠ³ΠΎ ΡΠΎΠ±ΡΡΠΈΡ ΠΈ Π½Π°ΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ ΠΏΠΎΡΠΎΠΊΠ° ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ Π² ΠΏΠΎΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ Π²Π΅ΡΡΠΈΠ½Ρ Π³ΡΠ°ΡΠ°;
- Ρ ΡΠΊΠΈ β Π΄Π»Ρ Π±ΠΎΠ»Π΅Π΅ Π½ΠΈΠ·ΠΊΠΎΡΡΠΎΠ²Π½Π΅Π²ΡΡ ΠΎΠΏΠ΅ΡΠ°ΡΠΈΠΉ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ ΠΈΠ· ΡΠ°Π±Π»ΠΈΡΡ ΠΠ (ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡΡΡΡ Π² ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠ°Ρ );
- ΠΈ Ρ.Π΄.
ΠΠΏΠΈΡΡΠ²Π°ΡΡ Apache Airflow ΠΏΠΎΠ΄ΡΠΎΠ±Π½ΠΎ Π² ΡΡΠΎΠΉ ΡΡΠ°ΡΡΠ΅ Π±ΡΠ΄Π΅Ρ Π½Π΅ΡΠ΅Π»Π΅ΡΠΎΠΎΠ±ΡΠ°Π·Π½ΠΎ. ΠΡΠ°ΡΠΊΠΈΠ΅ Π²Π²Π΅Π΄Π΅Π½ΠΈΡ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΡΠΌΠΎΡΡΠ΅ΡΡ
Π₯ΡΠΊ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Π ΠΏΠ΅ΡΠ²ΡΡ ΠΎΡΠ΅ΡΠ΅Π΄Ρ, Π΄Π»Ρ ΡΠ΅ΡΠ΅Π½ΠΈΡ Π·Π°Π΄Π°ΡΠΈ Π½ΡΠΆΠ½ΠΎ Π½Π°ΠΏΠΈΡΠ°ΡΡ Ρ ΡΠΊ, Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΌΡ ΠΌΠΎΠ³Π»ΠΈ Π±Ρ:
- ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ°ΡΡΡΡ ΠΊ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΠ΅;
- Π½Π°Ρ ΠΎΠ΄ΠΈΡΡ Π½ΡΠΆΠ½ΠΎΠ΅ ΠΏΠΈΡΡΠΌΠΎ;
- ΠΏΠΎΠ»ΡΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
ΠΠΎΠ³ΠΈΠΊΠ° ΡΠ°ΠΊΠ°Ρ: ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ, Π½Π°Ρ ΠΎΠ΄ΠΈΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΡΠ°ΠΌΠΎΠ΅ Π°ΠΊΡΡΠ°Π»ΡΠ½ΠΎΠ΅ ΠΏΠΈΡΡΠΌΠΎ, Π΅ΡΠ»ΠΈ Π΅ΡΡΡ Π΄ΡΡΠ³ΠΈΠ΅ β ΠΈΠ³Π½ΠΎΡΠΈΡΡΠ΅ΠΌ ΠΈΡ . ΠΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ ΠΈΠΌΠ΅Π½Π½ΠΎ ΡΠ°ΠΊΠ°Ρ ΡΡΠ½ΠΊΡΠΈΡ, ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎ Π±ΠΎΠ»Π΅Π΅ ΠΏΠΎΠ·Π΄Π½ΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ° ΡΠΎΠ΄Π΅ΡΠΆΠ°Ρ Π²ΡΠ΅ Π΄Π°Π½Π½ΡΠ΅ ΡΠ°Π½Π½ΠΈΡ . ΠΡΠ»ΠΈ ΡΡΠΎ Π½Π΅ ΡΠ°ΠΊ, ΡΠΎ ΠΌΠΎΠΆΠ½ΠΎ Π²ΠΎΠ·Π²ΡΠ°ΡΠ°ΡΡ ΠΌΠ°ΡΡΠΈΠ² Π²ΡΠ΅Ρ ΠΏΠΈΡΠ΅ΠΌ ΠΈΠ»ΠΈ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°ΡΡ ΠΏΠ΅ΡΠ²ΠΎΠ΅, Π° ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ β ΠΏΡΠΈ ΡΠ»Π΅Π΄ΡΡΡΠ΅ΠΌ ΠΏΡΠΎΡ ΠΎΠ΄Π΅. Π ΠΎΠ±ΡΠ΅ΠΌ, Π²ΡΠ΅ ΠΊΠ°ΠΊ Π²ΡΠ΅Π³Π΄Π° Π·Π°Π²ΠΈΡΠΈΡ ΠΎΡ Π·Π°Π΄Π°ΡΠΈ.
ΠΠΎΠ±Π°Π²Π»ΡΠ΅ΠΌ ΠΊ Ρ ΡΠΊΡ Π΄Π²Π΅ Π²ΡΠΏΠΎΠΌΠΎΠ³Π°ΡΠ΅Π»ΡΠ½ΡΠ΅ ΡΡΠ½ΠΊΡΠΈΠΈ: Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΈ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°. Π ΡΠ»ΠΎΠ²Ρ, ΠΈΡ ΠΌΠΎΠΆΠ½ΠΎ Π²ΡΠ½Π΅ΡΡΠΈ Π² ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ, ΡΡΠΎ Π·Π°Π²ΠΈΡΠΈΡ ΠΎΡ ΡΠ°ΡΡΠΎΡΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΡ Π΄Π°Π½Π½ΠΎΠ³ΠΎ ΡΡΠ½ΠΊΡΠΈΠΎΠ½Π°Π»Π°. Π§ΡΠΎ Π΅ΡΠ΅ Π΄ΠΎΠΏΠΈΡΡΠ²Π°ΡΡ Π² Ρ ΡΠΊ, ΠΎΠΏΡΡΡ ΠΆΠ΅, Π·Π°Π²ΠΈΡΠΈΡ ΠΎΡ Π·Π°Π΄Π°ΡΠΈ: Π΅ΡΠ»ΠΈ Π² ΠΏΠΈΡΡΠΌΠ΅ ΠΏΡΠΈΡ ΠΎΠ΄ΡΡ ΡΡΠ°Π·Ρ ΡΠ°ΠΉΠ»Ρ, ΡΠΎ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΊΠ°ΡΠΈΠ²Π°ΡΡ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΈΡΡΠΌΡ, Π΅ΡΠ»ΠΈ Π΄Π°Π½Π½ΡΠ΅ ΠΏΡΠΈΡ ΠΎΠ΄ΡΡ Π² ΠΏΠΈΡΡΠΌΠ΅, ΡΠΎ Π½ΡΠΆΠ½ΠΎ ΠΏΠ°ΡΡΠΈΡΡ ΠΏΠΈΡΡΠΌΠΎ ΠΈ Ρ.Π΄. Π ΠΌΠΎΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅, ΠΏΠΈΡΡΠΌΠΎ ΠΏΡΠΈΡ ΠΎΠ΄ΠΈΡ Ρ ΠΎΠ΄Π½ΠΎΠΉ ΡΡΡΠ»ΠΊΠΎΠΉ Π½Π° Π°ΡΡ ΠΈΠ², ΠΊΠΎΡΠΎΡΡΠΉ ΠΌΠ½Π΅ Π½ΡΠΆΠ½ΠΎ ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ Π² ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½Π½ΠΎΠ΅ ΠΌΠ΅ΡΡΠΎ ΠΈ Π·Π°ΠΏΡΡΡΠΈΡΡ Π΄Π°Π»ΡΠ½Π΅ΠΉΡΠΈΠΉ ΠΏΡΠΎΡΠ΅ΡΡ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠΈ.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
ΠΠΎΠ΄ ΠΏΡΠΎΡΡΠΎΠΉ, ΠΏΠΎΡΡΠΎΠΌΡ Π²ΡΡΠ΄ Π»ΠΈ Π½ΡΠΆΠ΄Π°Π΅ΡΡΡ Π² Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡΠ΅Π»ΡΠ½ΡΡ ΠΏΠΎΡΡΠ½Π΅Π½ΠΈΡΡ . Π Π°ΡΡΠΊΠ°ΠΆΡ Π»ΠΈΡΡ ΠΏΡΠΎ ΠΌΠ°Π³ΠΈΡΠ΅ΡΠΊΡΡ ΡΡΡΠΎΡΠΊΡ imap_conn_id. Apache Airflow Ρ ΡΠ°Π½ΠΈΡ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΠΉ (Π»ΠΎΠ³ΠΈΠ½, ΠΏΠ°ΡΠΎΠ»Ρ, Π°Π΄ΡΠ΅Ρ ΠΈ Π΄ΡΡΠ³ΠΈΠ΅ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΡ), ΠΊ ΠΊΠΎΡΠΎΡΡΠΌ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΠ±ΡΠ°ΡΠ°ΡΡΡΡ ΠΏΠΎ ΡΡΡΠΎΠΊΠΎΠ²ΠΎΠΌΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΡ. ΠΠΈΠ·ΡΠ°Π»ΡΠ½ΠΎ ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΠ΅ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡΠΌΠΈ Π²ΡΠ³Π»ΡΠ΄ΠΈΡ Π²ΠΎΡ ΡΠ°ΠΊ
Π‘Π΅Π½ΡΠΎΡ Π΄Π»Ρ ΠΎΠΆΠΈΠ΄Π°Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΠΎΡΠΊΠΎΠ»ΡΠΊΡ ΠΌΡ ΡΠΆΠ΅ ΡΠΌΠ΅Π΅ΠΌ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ°ΡΡΡΡ ΠΈ ΠΏΠΎΠ»ΡΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΠΏΠΎΡΡΡ, ΡΠ΅ΠΏΠ΅ΡΡ ΠΌΠΎΠΆΠ΅ΠΌ Π½Π°ΠΏΠΈΡΠ°ΡΡ ΡΠ΅Π½ΡΠΎΡ Π΄Π»Ρ ΠΈΡ ΠΎΠΆΠΈΠ΄Π°Π½ΠΈΡ. ΠΠ°ΠΏΠΈΡΠ°ΡΡ ΡΡΠ°Π·Ρ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ, ΠΊΠΎΡΠΎΡΡΠΉ Π±ΡΠ΄Π΅Ρ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°ΡΡ Π΄Π°Π½Π½ΡΠ΅, Π΅ΡΠ»ΠΈ ΠΎΠ½ΠΈ ΠΈΠΌΠ΅ΡΡΡΡ, Π² ΠΌΠΎΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ Π½Π΅ ΠΏΠΎΠ»ΡΡΠΈΠ»ΠΎΡΡ, ΡΠ°ΠΊ ΠΊΠ°ΠΊ Π½Π° ΠΎΡΠ½ΠΎΠ²Π°Π½ΠΈΠΈ ΠΏΠΎΠ»ΡΡΠ΅Π½Π½ΡΡ Π΄Π°Π½Π½ΡΡ ΠΈΠ· ΠΏΠΎΡΡΡ ΡΠ°Π±ΠΎΡΠ°ΡΡ ΠΈ Π΄ΡΡΠ³ΠΈΠ΅ ΠΏΡΠΎΡΠ΅ΡΡΡ, Π² ΡΠΎΠΌ ΡΠΈΡΠ»Π΅, Π±Π΅ΡΡΡΠΈΠ΅ ΡΠ²ΡΠ·Π°Π½Π½ΡΠ΅ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· Π΄ΡΡΠ³ΠΈΡ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠΎΠ² (API, ΡΠ΅Π»Π΅ΡΠΎΠ½ΠΈΡ, Π²Π΅Π± ΠΌΠ΅ΡΡΠΈΠΊΠΈ ΠΈ Ρ.Π΄.). ΠΡΠΈΠ²Π΅Π΄Ρ ΠΏΡΠΈΠΌΠ΅Ρ. Π CRM ΡΠΈΡΡΠ΅ΠΌΠ΅ ΠΏΠΎΡΠ²ΠΈΠ»ΡΡ Π½ΠΎΠ²ΡΠΉ ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΠ΅Π»Ρ, ΠΈ ΠΌΡ Π΅ΡΠ΅ Π½Π΅ Π·Π½Π°Π΅ΠΌ ΠΏΡΠΎ Π΅Π³ΠΎ UUID. Π’ΠΎΠ³Π΄Π° ΠΏΡΠΈ ΠΏΠΎΠΏΡΡΠΊΠ΅ ΠΏΠΎΠ»ΡΡΠΈΡΡ Π΄Π°Π½Π½ΡΠ΅ Ρ SIP-ΡΠ΅Π»Π΅ΡΠΎΠ½ΠΈΠΈ ΠΌΡ ΠΏΠΎΠ»ΡΡΠΈΠΌ Π·Π²ΠΎΠ½ΠΊΠΈ, ΠΏΡΠΈΠ²ΡΠ·Π°Π½Π½ΡΠ΅ ΠΊ Π΅Π³ΠΎ UUID, Π½ΠΎ ΠΊΠΎΡΡΠ΅ΠΊΡΠ½ΠΎ ΡΠΎΡ ΡΠ°Π½ΠΈΡΡ ΠΈ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡ ΠΈΡ Π½Π΅ ΡΠΌΠΎΠΆΠ΅ΠΌ. Π ΡΠ°ΠΊΠΈΡ Π²ΠΎΠΏΡΠΎΡΠ°Ρ Π²Π°ΠΆΠ½ΠΎ ΠΈΠΌΠ΅ΡΡ Π² Π²ΠΈΠ΄Ρ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΡ Π΄Π°Π½Π½ΡΡ , ΠΎΡΠΎΠ±Π΅Π½Π½ΠΎ, Π΅ΡΠ»ΠΈ ΠΎΠ½ΠΈ ΠΈΠ· ΡΠ°Π·Π½ΡΡ ΠΈΡΡΠΎΡΠ½ΠΈΠΊΠΎΠ². ΠΡΠΎ, ΠΊΠΎΠ½Π΅ΡΠ½ΠΎ, Π½Π΅Π΄ΠΎΡΡΠ°ΡΠΎΡΠ½ΡΠ΅ ΠΌΠ΅ΡΡ ΡΠΎΡ ΡΠ°Π½Π΅Π½ΠΈΡ ΡΠ΅Π»ΠΎΡΡΠ½ΠΎΡΡΠΈ Π΄Π°Π½Π½ΡΡ , Π½ΠΎ Π² Π½Π΅ΠΊΠΎΡΠΎΡΡΡ ΡΠ»ΡΡΠ°ΡΡ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΡΠ΅. ΠΠ° ΠΈ Π² Ρ ΠΎΠ»ΠΎΡΡΡΡ Π·Π°Π½ΠΈΠΌΠ°ΡΡ ΡΠ΅ΡΡΡΡΡ ΡΠΎΠΆΠ΅ Π½Π΅ΡΠ°ΡΠΈΠΎΠ½Π°Π»ΡΠ½ΠΎ.
Π’Π°ΠΊΠΈΠΌ ΠΎΠ±ΡΠ°Π·ΠΎΠΌ, Π½Π°Ρ ΡΠ΅Π½ΡΠΎΡ Π±ΡΠ΄Π΅Ρ Π·Π°ΠΏΡΡΠΊΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ Π²Π΅ΡΡΠΈΠ½Ρ Π³ΡΠ°ΡΠ°, Π΅ΡΠ»ΠΈ Π΅ΡΡΡ ΡΠ²Π΅ΠΆΠ°Ρ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ Π½Π° ΠΏΠΎΡΡΠ΅, Π° ΡΠ°ΠΊΠΆΠ΅ ΠΏΠΎΠΌΠ΅ΡΠ°ΡΡ Π½Π΅Π°ΠΊΡΡΠ°Π»ΡΠ½ΠΎΠΉ ΠΏΡΠ΅Π΄ΡΠ΄ΡΡΡΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
ΠΠΎΠ»ΡΡΠ°Π΅ΠΌ ΠΈ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ Π΄Π°Π½Π½ΡΠ΅
ΠΠ»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠΈ Π΄Π°Π½Π½ΡΡ ΠΌΠΎΠΆΠ½ΠΎ Π½Π°ΠΏΠΈΡΠ°ΡΡ ΠΎΡΠ΄Π΅Π»ΡΠ½ΡΠΉ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ, ΠΌΠΎΠΆΠ½ΠΎ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡ Π³ΠΎΡΠΎΠ²ΡΠ΅. ΠΠΎΡΠΊΠΎΠ»ΡΠΊΡ ΠΏΠΎΠΊΠ° Π»ΠΎΠ³ΠΈΠΊΠ° ΡΡΠΈΠ²ΠΈΠ°Π»ΡΠ½Π°Ρ β Π·Π°Π±ΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°, ΡΠΎ Π΄Π»Ρ ΠΏΡΠΈΠΌΠ΅ΡΠ° ΠΏΡΠ΅Π΄Π»Π°Π³Π°Ρ ΡΡΠ°Π½Π΄Π°ΡΡΠ½ΡΠΉ PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
ΠΡΡΠ°ΡΠΈ, Π΅ΡΠ»ΠΈ Π²Π°ΡΠ° ΠΊΠΎΡΠΏΠΎΡΠ°ΡΠΈΠ²Π½Π°Ρ ΠΏΠΎΡΡΠ° ΡΠΎΠΆΠ΅ Π½Π° mail.ru, ΡΠΎ Π²Π°ΠΌ Π±ΡΠ΄Π΅Ρ Π½Π΅Π΄ΠΎΡΡΡΠΏΠ΅Π½ ΠΏΠΎΠΈΡΠΊ ΠΏΠΈΡΠ΅ΠΌ ΠΏΠΎ ΡΠ΅ΠΌΠ΅, ΠΎΡΠΏΡΠ°Π²ΠΈΡΠ΅Π»Ρ ΠΈ Ρ.Π΄. ΠΠ½ΠΈ Π΅ΡΠ΅ Π² Π΄Π°Π»Π΅ΠΊΠΎΠΌ 2016 ΠΎΠ±Π΅ΡΠ°Π»ΠΈ Π²Π²Π΅ΡΡΠΈ, Π½ΠΎ, Π²ΠΈΠ΄ΠΈΠΌΠΎ, ΠΏΠ΅ΡΠ΅Π΄ΡΠΌΠ°Π»ΠΈ. Π― ΡΠ΅ΡΠΈΠ» ΡΡΡ ΠΏΡΠΎΠ±Π»Π΅ΠΌΡ, ΡΠΎΠ·Π΄Π°Π² ΠΏΠΎΠ΄ Π½ΡΠΆΠ½ΡΠ΅ ΠΏΠΈΡΡΠΌΠ° ΠΎΡΠ΄Π΅Π»ΡΠ½ΡΡ ΠΏΠ°ΠΏΠΊΡ ΠΈ Π½Π°ΡΡΡΠΎΠΈΠ² Π² Π²Π΅Π±-ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡΠ΅ ΠΏΠΎΡΡΡ ΡΠΈΠ»ΡΡΡ Π½Π° Π½ΡΠΆΠ½ΡΠ΅ ΠΏΠΈΡΡΠΌΠ°. Π’Π°ΠΊΠΈΠΌ ΠΎΠ±ΡΠ°Π·ΠΎΠΌ, Π² ΡΡΡ ΠΏΠ°ΠΏΠΊΡ ΠΏΠΎΠΏΠ°Π΄Π°ΡΡ ΡΠΎΠ»ΡΠΊΠΎ Π½ΡΠΆΠ½ΡΠ΅ ΠΏΠΈΡΡΠΌΠ° ΠΈ ΡΡΠ»ΠΎΠ²ΠΈΡ Π΄Π»Ρ ΠΏΠΎΠΈΡΠΊΠ° Π² ΠΌΠΎΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ ΠΏΡΠΎΡΡΠΎ (UNSEEN).
Π Π΅Π·ΡΠΌΠΈΡΡΡ, ΠΌΡ ΠΈΠΌΠ΅Π΅ΠΌ ΡΠ»Π΅Π΄ΡΡΡΡΡ ΠΏΠΎΡΠ»Π΅Π΄ΠΎΠ²Π°ΡΠ΅Π»ΡΠ½ΠΎΡΡΡ: ΠΏΡΠΎΠ²Π΅ΡΡΠ΅ΠΌ, Π΅ΡΡΡ Π»ΠΈ Π½ΠΎΠ²ΡΠ΅ ΠΏΠΈΡΡΠΌΠ°, ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²ΡΡΡΠΈΠ΅ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ, Π΅ΡΠ»ΠΈ Π΅ΡΡΡ, ΡΠΎ ΡΠΊΠ°ΡΠΈΠ²Π°Π΅ΠΌ Π°ΡΡ
ΠΈΠ² ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°.
ΠΠΎΠ΄ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΠΌΠΈ ΠΌΠ½ΠΎΠ³ΠΎΡΠΎΡΠΈΡΠΌΠΈ ΠΎΠΏΡΡΠ΅Π½ΠΎ, ΡΡΠΎ ΡΡΠΎΡ Π°ΡΡ
ΠΈΠ² Π±ΡΠ΄Π΅Ρ ΡΠ°ΡΠΏΠ°ΠΊΠΎΠ²Π°Π½, Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· Π°ΡΡ
ΠΈΠ²Π° ΠΎΡΠΈΡΠ΅Π½Ρ ΠΈ ΠΎΠ±ΡΠ°Π±ΠΎΡΠ°Π½Ρ, ΠΈ Π² ΠΈΡΠΎΠ³Π΅ Π²ΡΠ΅ ΡΡΠΎ Π΄Π΅Π»ΠΎ ΡΠΉΠ΄Π΅Ρ Π΄Π°Π»Π΅Π΅ Π½Π° ΠΊΠΎΠ½Π²Π΅ΠΉΠ΅Ρ ETL ΠΏΡΠΎΡΠ΅ΡΡΠ°, Π½ΠΎ ΡΡΠΎ ΡΠΆΠ΅ Π²ΡΡ
ΠΎΠ΄ΠΈΡ Π·Π° ΡΠ°ΠΌΠΊΠΈ ΡΠ΅ΠΌΡ ΡΡΠ°ΡΡΠΈ. ΠΡΠ»ΠΈ ΠΏΠΎΠ»ΡΡΠΈΠ»ΠΎΡΡ ΠΈΠ½ΡΠ΅ΡΠ΅ΡΠ½ΠΎ ΠΈ ΠΏΠΎΠ»Π΅Π·Π½ΠΎ, ΡΠΎ Ρ ΡΠ°Π΄ΠΎΡΡΡΡ ΠΏΡΠΎΠ΄ΠΎΠ»ΠΆΡ ΠΎΠΏΠΈΡΡΠ²Π°ΡΡ ETL ΡΠ΅ΡΠ΅Π½ΠΈΡ ΠΈ ΠΈΡ
ΡΠ°ΡΡΠΈ Π΄Π»Ρ Apache Airflow.
ΠΡΡΠΎΡΠ½ΠΈΠΊ: habr.com