ΠΠΎΠ»ΠΊΠΎΡΠΎ ΠΈ Π΄Π° ΡΠ΅ ΡΠ°Π·Π²ΠΈΠ²Π°Ρ ΡΠ΅Ρ Π½ΠΎΠ»ΠΎΠ³ΠΈΠΈΡΠ΅, ΠΏΠΎΡΠ΅Π΄ΠΈΡΠ° ΠΎΡ ΠΎΡΡΠ°ΡΠ΅Π»ΠΈ ΠΏΠΎΠ΄Ρ ΠΎΠ΄ΠΈ Π²ΠΈΠ½Π°Π³ΠΈ ΠΈΠ·ΠΎΡΡΠ°Π²Π°Ρ ΠΎΡ ΡΠ°Π·Π²ΠΈΡΠΈΠ΅ΡΠΎ. Π’ΠΎΠ²Π° ΠΌΠΎΠΆΠ΅ Π΄Π° ΡΠ΅ Π΄ΡΠ»ΠΆΠΈ Π½Π° ΠΏΠ»Π°Π²Π΅Π½ ΠΏΡΠ΅Ρ ΠΎΠ΄, ΡΠΎΠ²Π΅ΡΠΊΠΈ ΡΠ°ΠΊΡΠΎΡ, ΡΠ΅Ρ Π½ΠΎΠ»ΠΎΠ³ΠΈΡΠ½ΠΈ Π½ΡΠΆΠ΄ΠΈ ΠΈΠ»ΠΈ Π½Π΅ΡΠΎ Π΄ΡΡΠ³ΠΎ. Π ΠΎΠ±Π»Π°ΡΡΡΠ° Π½Π° ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠ°ΡΠ° Π½Π° Π΄Π°Π½Π½ΠΈ ΠΈΠ·ΡΠΎΡΠ½ΠΈΡΠΈΡΠ΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΡΠ° Π½Π°ΠΉ-ΠΏΠΎΠΊΠ°Π·Π°ΡΠ΅Π»Π½ΠΈ Π² ΡΠ°Π·ΠΈ ΡΠ°ΡΡ. ΠΠΎΠ»ΠΊΠΎΡΠΎ ΠΈ Π΄Π° ΠΌΠ΅ΡΡΠ°Π΅ΠΌ Π΄Π° ΡΠ΅ ΠΎΡΡΡΠ²Π΅ΠΌ ΠΎΡ ΡΠΎΠ²Π°, Π½ΠΎ Π·Π°ΡΠ΅Π³Π° ΡΠ°ΡΡ ΠΎΡ Π΄Π°Π½Π½ΠΈΡΠ΅ ΡΠ΅ ΠΈΠ·ΠΏΡΠ°ΡΠ°Ρ Π² ΠΌΠ΅ΡΠΈΠ½Π΄ΠΆΡΡΠΈ ΠΈ ΠΈΠΌΠ΅ΠΉΠ»ΠΈ, Π΄Π° Π½Π΅ Π³ΠΎΠ²ΠΎΡΠΈΠΌ Π·Π° ΠΏΠΎ-Π°ΡΡ Π°ΠΈΡΠ½ΠΈ ΡΠΎΡΠΌΠ°ΡΠΈ. ΠΠ°Π½Ρ Π²ΠΈ Π΄Π° ΡΠ°Π·Π³Π»ΠΎΠ±ΠΈΡΠ΅ Π΅Π΄Π½Π° ΠΎΡ ΠΎΠΏΡΠΈΠΈΡΠ΅ Π·Π° Apache Airflow, ΠΈΠ»ΡΡΡΡΠΈΡΠ°ΠΉΠΊΠΈ ΠΊΠ°ΠΊ ΠΌΠΎΠΆΠ΅ΡΠ΅ Π΄Π° Π²Π·Π΅ΠΌΠ΅ΡΠ΅ Π΄Π°Π½Π½ΠΈ ΠΎΡ ΠΈΠΌΠ΅ΠΉΠ»ΠΈ.
ΠΏΡΠ°ΠΈΡΡΠΎΡΠΈΡ
ΠΠ½ΠΎΠ³ΠΎ Π΄Π°Π½Π½ΠΈ Π²ΡΠ΅ ΠΎΡΠ΅ ΡΠ΅ ΠΏΡΠ΅Ρ Π²ΡΡΠ»ΡΡ ΡΡΠ΅Π· Π΅Π»Π΅ΠΊΡΡΠΎΠ½Π½Π° ΠΏΠΎΡΠ°, ΠΎΡ ΠΌΠ΅ΠΆΠ΄ΡΠ»ΠΈΡΠ½ΠΎΡΡΠ½ΠΈ ΠΊΠΎΠΌΡΠ½ΠΈΠΊΠ°ΡΠΈΠΈ Π΄ΠΎ ΡΡΠ°Π½Π΄Π°ΡΡΠΈ Π·Π° Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΠΈΠ΅ ΠΌΠ΅ΠΆΠ΄Ρ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΠΈΡΠ΅. ΠΠΎΠ±ΡΠ΅ Π΅, Π°ΠΊΠΎ Π΅ Π²ΡΠ·ΠΌΠΎΠΆΠ½ΠΎ Π΄Π° Π½Π°ΠΏΠΈΡΠ΅ΡΠ΅ ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡ Π·Π° ΠΏΠΎΠ»ΡΡΠ°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΈΠ»ΠΈ Π΄Π° ΠΏΠΎΡΡΠ°Π²ΠΈΡΠ΅ Ρ ΠΎΡΠ° Π² ΠΎΡΠΈΡΠ°, ΠΊΠΎΠΈΡΠΎ ΡΠ΅ Π²ΡΠ²Π΅ΠΆΠ΄Π°Ρ ΡΠ°Π·ΠΈ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ Π² ΠΏΠΎ-ΡΠ΄ΠΎΠ±Π½ΠΈ ΠΈΠ·ΡΠΎΡΠ½ΠΈΡΠΈ, Π½ΠΎ ΡΠ΅ΡΡΠΎ ΡΠΎΠ²Π° ΠΌΠΎΠΆΠ΅ ΠΏΡΠΎΡΡΠΎ Π΄Π° Π½Π΅ Π΅ Π²ΡΠ·ΠΌΠΎΠΆΠ½ΠΎ. ΠΠΎΠ½ΠΊΡΠ΅ΡΠ½Π°ΡΠ° Π·Π°Π΄Π°ΡΠ°, Ρ ΠΊΠΎΡΡΠΎ ΡΠ΅ ΡΠ±Π»ΡΡΠΊΠ°Ρ , Π±Π΅ΡΠ΅ ΡΠ²ΡΡΠ·Π²Π°Π½Π΅ΡΠΎ Π½Π° ΠΏΡΠΎΡΠ»ΠΎΠ²ΡΡΠ°ΡΠ° CRM ΡΠΈΡΡΠ΅ΠΌΠ° ΠΊΡΠΌ Ρ ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅ΡΠΎ Π½Π° Π΄Π°Π½Π½ΠΈ, Π° ΡΠ»Π΅Π΄ ΡΠΎΠ²Π° ΠΈ ΠΊΡΠΌ OLAP ΡΠΈΡΡΠ΅ΠΌΠ°ΡΠ°. ΠΡΡΠΎΡΠΈΡΠ΅ΡΠΊΠΈ ΡΠ΅ ΡΠ»ΡΡΠΈ ΡΠ°ΠΊΠ°, ΡΠ΅ Π·Π° Π½Π°ΡΠ°ΡΠ° ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΡ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°Π½Π΅ΡΠΎ Π½Π° ΡΠ°Π·ΠΈ ΡΠΈΡΡΠ΅ΠΌΠ° Π±Π΅ΡΠ΅ ΡΠ΄ΠΎΠ±Π½ΠΎ Π² ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½Π° ΠΎΠ±Π»Π°ΡΡ Π½Π° Π±ΠΈΠ·Π½Π΅ΡΠ°. Π‘Π»Π΅Π΄ΠΎΠ²Π°ΡΠ΅Π»Π½ΠΎ Π²ΡΠΈΡΠΊΠΈ Π½Π°ΠΈΡΡΠΈΠ½Π° ΠΈΡΠΊΠ°Ρ Π° Π΄Π° ΠΌΠΎΠ³Π°Ρ Π΄Π° ΡΠ°Π±ΠΎΡΡΡ ΠΈ Ρ Π΄Π°Π½Π½ΠΈ ΠΎΡ ΡΠ°Π·ΠΈ ΡΠΈΡΡΠ΅ΠΌΠ° Π½Π° ΡΡΠ΅ΡΠ° ΡΡΡΠ°Π½Π°. ΠΠ° ΠΏΡΡΠ²ΠΎ ΠΌΡΡΡΠΎ, ΡΠ°Π·Π±ΠΈΡΠ° ΡΠ΅, Π±Π΅ΡΠ΅ ΠΏΡΠΎΡΡΠ΅Π½Π° Π²ΡΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡΠ° Π·Π° ΠΏΠΎΠ»ΡΡΠ°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ ΠΎΡΠ²ΠΎΡΠ΅Π½ API. ΠΠ° ΡΡΠΆΠ°Π»Π΅Π½ΠΈΠ΅ API Π½Π΅ ΠΏΠΎΠΊΡΠΈΠ²Π°ΡΠ΅ ΠΏΠΎΠ»ΡΡΠ°Π²Π°Π½Π΅ΡΠΎ Π½Π° Π²ΡΠΈΡΠΊΠΈ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΈ Π΄Π°Π½Π½ΠΈ ΠΈ, Ρ ΠΏΡΠΎΡΡΠΈ Π΄ΡΠΌΠΈ, Π±Π΅ΡΠ΅ Π² ΠΌΠ½ΠΎΠ³ΠΎ ΠΎΡΠ½ΠΎΡΠ΅Π½ΠΈΡ ΠΊΡΠΈΠ² ΠΈ ΡΠ΅Ρ Π½ΠΈΡΠ΅ΡΠΊΠ°ΡΠ° ΠΏΠΎΠ΄Π΄ΡΡΠΆΠΊΠ° Π½Π΅ ΠΈΡΠΊΠ°ΡΠ΅ ΠΈΠ»ΠΈ Π½Π΅ ΠΌΠΎΠΆΠ΅ΡΠ΅ Π΄Π° ΡΠ΅ ΡΡΠ΅ΡΠ½Π΅ Π½Π°ΠΏΠΎΠ»ΠΎΠ²ΠΈΠ½Π°, Π·Π° Π΄Π° ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²ΠΈ ΠΏΠΎ-ΠΈΠ·ΡΠ΅ΡΠΏΠ°ΡΠ΅Π»Π½Π° ΡΡΠ½ΠΊΡΠΈΠΎΠ½Π°Π»Π½ΠΎΡΡ. ΠΠΎ ΡΠ°Π·ΠΈ ΡΠΈΡΡΠ΅ΠΌΠ° ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²ΠΈ Π²ΡΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ Π·Π° ΠΏΠ΅ΡΠΈΠΎΠ΄ΠΈΡΠ½ΠΎ ΠΏΠΎΠ»ΡΡΠ°Π²Π°Π½Π΅ Π½Π° Π»ΠΈΠΏΡΠ²Π°ΡΠΈΡΠ΅ Π΄Π°Π½Π½ΠΈ ΠΏΠΎ ΠΏΠΎΡΠ°ΡΠ° ΠΏΠΎΠ΄ ΡΠΎΡΠΌΠ°ΡΠ° Π½Π° Π²ΡΡΠ·ΠΊΠ° Π·Π° ΡΠ°Π·ΡΠΎΠ²Π°ΡΠ²Π°Π½Π΅ Π½Π° Π°ΡΡ ΠΈΠ²Π°.
Π’ΡΡΠ±Π²Π° Π΄Π° ΡΠ΅ ΠΎΡΠ±Π΅Π»Π΅ΠΆΠΈ, ΡΠ΅ ΡΠΎΠ²Π° Π½Π΅ Π΅ Π΅Π΄ΠΈΠ½ΡΡΠ²Π΅Π½ΠΈΡΡ ΡΠ»ΡΡΠ°ΠΉ, Π² ΠΊΠΎΠΉΡΠΎ Π±ΠΈΠ·Π½Π΅ΡΡΡ ΠΈΡΠΊΠ° Π΄Π° ΡΡΠ±ΠΈΡΠ° Π΄Π°Π½Π½ΠΈ ΠΎΡ ΠΈΠΌΠ΅ΠΉΠ»ΠΈ ΠΈΠ»ΠΈ ΠΌΠ΅ΡΠΈΠ½Π΄ΠΆΡΡΠΈ. Π ΡΠΎΠ·ΠΈ ΡΠ»ΡΡΠ°ΠΉ ΠΎΠ±Π°ΡΠ΅ Π½Π΅ Π±ΠΈΡ ΠΌΠ΅ ΠΌΠΎΠ³Π»ΠΈ Π΄Π° ΠΏΠΎΠ²Π»ΠΈΡΠ΅ΠΌ Π½Π° ΡΡΠ΅ΡΠ° ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΡ, ΠΊΠΎΡΡΠΎ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Ρ ΡΠ°ΡΡ ΠΎΡ Π΄Π°Π½Π½ΠΈΡΠ΅ ΡΠ°ΠΌΠΎ ΠΏΠΎ ΡΠΎΠ·ΠΈ Π½Π°ΡΠΈΠ½.
Π²ΡΠ·Π΄ΡΡΠ΅Π½ ΠΏΠΎΡΠΎΠΊ Apache
ΠΠ° ΠΈΠ·Π³ΡΠ°ΠΆΠ΄Π°Π½Π΅ Π½Π° ETL ΠΏΡΠΎΡΠ΅ΡΠΈ Π½Π°ΠΉ-ΡΠ΅ΡΡΠΎ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ Apache Airflow. ΠΠ° Π΄Π° ΠΌΠΎΠΆΠ΅ ΡΠΈΡΠ°ΡΠ΅Π»ΡΡ, ΠΊΠΎΠΉΡΠΎ Π½Π΅ Π΅ Π·Π°ΠΏΠΎΠ·Π½Π°Ρ Ρ ΡΠ°Π·ΠΈ ΡΠ΅Ρ Π½ΠΎΠ»ΠΎΠ³ΠΈΡ, Π΄Π° ΡΠ°Π·Π±Π΅ΡΠ΅ ΠΏΠΎ-Π΄ΠΎΠ±ΡΠ΅ ΠΊΠ°ΠΊ ΠΈΠ·Π³Π»Π΅ΠΆΠ΄Π° Π² ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ° ΠΈ ΠΊΠ°ΡΠΎ ΡΡΠ»ΠΎ, ΡΠ΅ ΠΎΠΏΠΈΡΠ° Π½ΡΠΊΠΎΠ»ΠΊΠΎ ΡΠ²ΠΎΠ΄Π½ΠΈ.
Apache Airflow Π΅ Π±Π΅Π·ΠΏΠ»Π°ΡΠ½Π° ΠΏΠ»Π°ΡΡΠΎΡΠΌΠ°, ΠΊΠΎΡΡΠΎ ΡΠ΅ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π° Π·Π° ΠΈΠ·Π³ΡΠ°ΠΆΠ΄Π°Π½Π΅, ΠΈΠ·ΠΏΡΠ»Π½Π΅Π½ΠΈΠ΅ ΠΈ Π½Π°Π±Π»ΡΠ΄Π΅Π½ΠΈΠ΅ Π½Π° ETL (Extract-Transform-Loading) ΠΏΡΠΎΡΠ΅ΡΠΈ Π² Python. ΠΡΠ½ΠΎΠ²Π½Π°ΡΠ° ΠΊΠΎΠ½ΡΠ΅ΠΏΡΠΈΡ Π² Airflow Π΅ Π½Π°ΡΠΎΡΠ΅Π½ Π°ΡΠΈΠΊΠ»ΠΈΡΠ΅Π½ Π³ΡΠ°Ρ, ΠΊΡΠ΄Π΅ΡΠΎ Π²ΡΡΡ ΠΎΠ²Π΅ΡΠ΅ Π½Π° Π³ΡΠ°ΡΠΈΠΊΠ°ΡΠ° ΡΠ° ΡΠΏΠ΅ΡΠΈΡΠΈΡΠ½ΠΈ ΠΏΡΠΎΡΠ΅ΡΠΈ, Π° ΡΡΠ±ΠΎΠ²Π΅ΡΠ΅ Π½Π° Π³ΡΠ°ΡΠΈΠΊΠ°ΡΠ° ΡΠ° ΠΏΠΎΡΠΎΠΊ ΠΎΡ ΠΊΠΎΠ½ΡΡΠΎΠ» ΠΈΠ»ΠΈ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ. ΠΠ΄ΠΈΠ½ ΠΏΡΠΎΡΠ΅Ρ ΠΌΠΎΠΆΠ΅ ΠΏΡΠΎΡΡΠΎ Π΄Π° ΠΈΠ·Π²ΠΈΠΊΠ° Π²ΡΡΠΊΠ° ΡΡΠ½ΠΊΡΠΈΡ Π½Π° Python ΠΈΠ»ΠΈ ΠΌΠΎΠΆΠ΅ Π΄Π° ΠΈΠΌΠ° ΠΏΠΎ-ΡΠ»ΠΎΠΆΠ½Π° Π»ΠΎΠ³ΠΈΠΊΠ° ΠΎΡ ΠΏΠΎΡΠ»Π΅Π΄ΠΎΠ²Π°ΡΠ΅Π»Π½ΠΎ ΠΈΠ·Π²ΠΈΠΊΠ²Π°Π½Π΅ Π½Π° Π½ΡΠΊΠΎΠ»ΠΊΠΎ ΡΡΠ½ΠΊΡΠΈΠΈ Π² ΠΊΠΎΠ½ΡΠ΅ΠΊΡΡΠ° Π½Π° ΠΊΠ»Π°Ρ. ΠΠ° Π½Π°ΠΉ-ΡΠ΅ΡΡΠΈΡΠ΅ ΠΎΠΏΠ΅ΡΠ°ΡΠΈΠΈ Π²Π΅ΡΠ΅ ΠΈΠΌΠ° ΠΌΠ½ΠΎΠ³ΠΎ Π³ΠΎΡΠΎΠ²ΠΈ ΡΠ°Π·ΡΠ°Π±ΠΎΡΠΊΠΈ, ΠΊΠΎΠΈΡΠΎ ΠΌΠΎΠ³Π°Ρ Π΄Π° ΡΠ΅ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°Ρ ΠΊΠ°ΡΠΎ ΠΏΡΠΎΡΠ΅ΡΠΈ. Π’Π°ΠΊΠΈΠ²Π° ΡΠ°Π·ΡΠ°Π±ΠΎΡΠΊΠΈ Π²ΠΊΠ»ΡΡΠ²Π°Ρ:
- ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠΈ - Π·Π° ΠΏΡΠ΅Ρ Π²ΡΡΠ»ΡΠ½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ Π΅Π΄Π½ΠΎ ΠΌΡΡΡΠΎ Π½Π° Π΄ΡΡΠ³ΠΎ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ ΠΎΡ ΡΠ°Π±Π»ΠΈΡΠ° Π½Π° Π±Π°Π·Π° Π΄Π°Π½Π½ΠΈ ΠΊΡΠΌ Ρ ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅ Π·Π° Π΄Π°Π½Π½ΠΈ;
- ΡΠ΅Π½Π·ΠΎΡΠΈ - Π·Π° ΠΈΠ·ΡΠ°ΠΊΠ²Π°Π½Π΅ Π½Π° Π½Π°ΡΡΡΠΏΠ²Π°Π½Π΅ΡΠΎ Π½Π° ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½ΠΎ ΡΡΠ±ΠΈΡΠΈΠ΅ ΠΈ Π½Π°ΡΠΎΡΠ²Π°Π½Π΅ Π½Π° ΠΏΠΎΡΠΎΠΊΠ° Π½Π° ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΠ΅ ΠΊΡΠΌ ΡΠ»Π΅Π΄Π²Π°ΡΠΈΡΠ΅ Π²ΡΡΡ ΠΎΠ²Π΅ Π½Π° Π³ΡΠ°ΡΠ°;
- ΠΊΡΠΊΠΈΡΠΊΠΈ - Π·Π° ΠΎΠΏΠ΅ΡΠ°ΡΠΈΠΈ ΠΎΡ ΠΏΠΎ-Π½ΠΈΡΠΊΠΎ Π½ΠΈΠ²ΠΎ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ Π·Π° ΠΏΠΎΠ»ΡΡΠ°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ ΠΎΡ ΡΠ°Π±Π»ΠΈΡΠ° Π½Π° Π±Π°Π·Π° Π΄Π°Π½Π½ΠΈ (ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π° ΡΠ΅ Π² ΠΈΠ·ΡΠ°Π·ΠΈ);
- ΠΈ Ρ.Π½.
ΠΠΈ Π±ΠΈΠ»ΠΎ Π½Π΅ΡΠΌΠ΅ΡΡΠ½ΠΎ Π΄Π° ΠΎΠΏΠΈΡΠ²Π°ΠΌΠ΅ Apache Airflow ΠΏΠΎΠ΄ΡΠΎΠ±Π½ΠΎ Π² ΡΠ°Π·ΠΈ ΡΡΠ°ΡΠΈΡ. ΠΠΎΠΆΠ΅ΡΠ΅ Π΄Π° Π²ΠΈΠ΄ΠΈΡΠ΅ ΠΊΡΠ°ΡΠΊΠΈ Π²ΡΠ²Π΅Π΄Π΅Π½ΠΈΡ
ΠΡΠΊΠ° Π·Π° ΠΏΠΎΠ»ΡΡΠ°Π²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ
ΠΡΡΠ²ΠΎ, Π·Π° Π΄Π° ΡΠ΅ΡΠΈΠΌ ΠΏΡΠΎΠ±Π»Π΅ΠΌΠ°, ΡΡΡΠ±Π²Π° Π΄Π° Π½Π°ΠΏΠΈΡΠ΅ΠΌ ΠΊΡΠΊΠ°, Ρ ΠΊΠΎΡΡΠΎ ΠΌΠΎΠΆΠ΅ΠΌ:
- ΡΠ²ΡΡΠΆΠ΅ΡΠ΅ ΡΠ΅ Ρ ΠΈΠΌΠ΅ΠΉΠ»
- Π½Π°ΠΌΠ΅ΡΠΈ ΠΏΡΠ°Π²ΠΈΠ»Π½Π°ΡΠ° Π±ΡΠΊΠ²Π°
- ΠΏΠΎΠ»ΡΡΠ°Π²Π°Ρ Π΄Π°Π½Π½ΠΈ ΠΎΡ ΠΏΠΈΡΠΌΠΎΡΠΎ.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
ΠΠΎΠ³ΠΈΠΊΠ°ΡΠ° Π΅ ΡΠ»Π΅Π΄Π½Π°ΡΠ°: ΡΠ²ΡΡΠ·Π²Π°ΠΌΠ΅ ΡΠ΅, Π½Π°ΠΌΠΈΡΠ°ΠΌΠ΅ ΠΏΠΎΡΠ»Π΅Π΄Π½Π°ΡΠ° Π½Π°ΠΉ-ΠΏΠΎΠ΄Ρ ΠΎΠ΄ΡΡΠ° Π±ΡΠΊΠ²Π°, Π°ΠΊΠΎ ΠΈΠΌΠ° Π΄ΡΡΠ³ΠΈ, Π³ΠΈ ΠΈΠ³Π½ΠΎΡΠΈΡΠ°ΠΌΠ΅. Π’Π°Π·ΠΈ ΡΡΠ½ΠΊΡΠΈΡ ΡΠ΅ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°, Π·Π°ΡΠΎΡΠΎ ΠΏΠΎ-ΠΊΡΡΠ½ΠΈΡΠ΅ ΠΏΠΈΡΠΌΠ° ΡΡΠ΄ΡΡΠΆΠ°Ρ Π²ΡΠΈΡΠΊΠΈ Π΄Π°Π½Π½ΠΈ ΠΎΡ ΠΏΠΎ-ΡΠ°Π½Π½ΠΈΡΠ΅. ΠΠΊΠΎ ΡΠΎΠ²Π° Π½Π΅ Π΅ ΡΠ°ΠΊΠ°, ΡΠΎΠ³Π°Π²Π° ΠΌΠΎΠΆΠ΅ΡΠ΅ Π΄Π° Π²ΡΡΠ½Π΅ΡΠ΅ ΠΌΠ°ΡΠΈΠ² ΠΎΡ Π²ΡΠΈΡΠΊΠΈ Π±ΡΠΊΠ²ΠΈ ΠΈΠ»ΠΈ Π΄Π° ΠΎΠ±ΡΠ°Π±ΠΎΡΠΈΡΠ΅ ΠΏΡΡΠ²Π°ΡΠ°, Π° ΠΎΡΡΠ°Π½Π°Π»ΠΈΡΠ΅ ΠΏΡΠΈ ΡΠ»Π΅Π΄Π²Π°ΡΠΎΡΠΎ ΠΏΡΠ΅ΠΌΠΈΠ½Π°Π²Π°Π½Π΅. ΠΠ°ΡΠΎ ΡΡΠ»ΠΎ Π²ΡΠΈΡΠΊΠΎ, ΠΊΠ°ΠΊΡΠΎ Π²ΠΈΠ½Π°Π³ΠΈ, Π·Π°Π²ΠΈΡΠΈ ΠΎΡ Π·Π°Π΄Π°ΡΠ°ΡΠ°.
ΠΠΎΠ±Π°Π²ΡΠΌΠ΅ Π΄Π²Π΅ ΠΏΠΎΠΌΠΎΡΠ½ΠΈ ΡΡΠ½ΠΊΡΠΈΠΈ ΠΊΡΠΌ ΠΊΡΠΊΠ°ΡΠ°: Π·Π° ΠΈΠ·ΡΠ΅Π³Π»ΡΠ½Π΅ Π½Π° ΡΠ°ΠΉΠ» ΠΈ Π·Π° ΠΈΠ·ΡΠ΅Π³Π»ΡΠ½Π΅ Π½Π° ΡΠ°ΠΉΠ» ΡΡΠ΅Π· Π²ΡΡΠ·ΠΊΠ° ΠΎΡ ΠΈΠΌΠ΅ΠΉΠ». ΠΠ΅ΠΆΠ΄Ρ Π΄ΡΡΠ³ΠΎΡΠΎ, ΡΠ΅ ΠΌΠΎΠ³Π°Ρ Π΄Π° Π±ΡΠ΄Π°Ρ ΠΏΡΠ΅ΠΌΠ΅ΡΡΠ΅Π½ΠΈ Π½Π° ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡΠ°, Π·Π°Π²ΠΈΡΠΈ ΠΎΡ ΡΠ΅ΡΡΠΎΡΠ°ΡΠ° Π½Π° ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°Π½Π΅ Π½Π° ΡΠ°Π·ΠΈ ΡΡΠ½ΠΊΡΠΈΠΎΠ½Π°Π»Π½ΠΎΡΡ. ΠΠ°ΠΊΠ²ΠΎ Π΄ΡΡΠ³ΠΎ Π΄Π° Π΄ΠΎΠ±Π°Π²ΠΈΡΠ΅ ΠΊΡΠΌ ΠΊΡΠΊΠ°ΡΠ°, ΠΎΡΠ½ΠΎΠ²ΠΎ Π·Π°Π²ΠΈΡΠΈ ΠΎΡ Π·Π°Π΄Π°ΡΠ°ΡΠ°: Π°ΠΊΠΎ ΡΠ°ΠΉΠ»ΠΎΠ²Π΅ΡΠ΅ ΡΠ° ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈ Π²Π΅Π΄Π½Π°Π³Π° Π² ΠΏΠΈΡΠΌΠΎΡΠΎ, ΡΠΎΠ³Π°Π²Π° ΠΌΠΎΠΆΠ΅ΡΠ΅ Π΄Π° ΠΈΠ·ΡΠ΅Π³Π»ΠΈΡΠ΅ ΠΏΡΠΈΠΊΠ°ΡΠ΅Π½ΠΈ ΡΠ°ΠΉΠ»ΠΎΠ²Π΅ ΠΊΡΠΌ ΠΏΠΈΡΠΌΠΎΡΠΎ, Π°ΠΊΠΎ Π΄Π°Π½Π½ΠΈΡΠ΅ ΡΠ° ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈ Π² ΠΏΠΈΡΠΌΠΎΡΠΎ, ΡΠΎΠ³Π°Π²Π° ΡΡΡΠ±Π²Π° Π΄Π° Π°Π½Π°Π»ΠΈΠ·ΠΈΡΠ°ΡΠ΅ ΠΏΠΈΡΠΌΠΎΡΠΎ, ΠΈ Ρ.Π½. Π ΠΌΠΎΡ ΡΠ»ΡΡΠ°ΠΉ ΠΏΠΈΡΠΌΠΎΡΠΎ ΠΈΠ΄Π²Π° Ρ Π΅Π΄Π½Π° Π²ΡΡΠ·ΠΊΠ° ΠΊΡΠΌ Π°ΡΡ ΠΈΠ²Π°, ΠΊΠΎΡΡΠΎ ΡΡΡΠ±Π²Π° Π΄Π° ΠΏΠΎΡΡΠ°Π²Ρ Π½Π° ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½ΠΎ ΠΌΡΡΡΠΎ ΠΈ Π΄Π° Π·Π°ΠΏΠΎΡΠ½Π° ΠΏΠΎ-Π½Π°ΡΠ°ΡΡΡΠ½Π°ΡΠ° ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠ°.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
ΠΠΎΠ΄ΡΡ Π΅ ΠΏΡΠΎΡΡ, ΡΠ°ΠΊΠ° ΡΠ΅ Π΅Π΄Π²Π° Π»ΠΈ ΡΠ΅ Π½ΡΠΆΠ΄Π°Π΅ ΠΎΡ Π΄ΠΎΠΏΡΠ»Π½ΠΈΡΠ΅Π»Π½ΠΎ ΠΎΠ±ΡΡΠ½Π΅Π½ΠΈΠ΅. ΠΡΠΎΡΡΠΎ ΡΠ΅ Π²ΠΈ ΡΠ°Π·ΠΊΠ°ΠΆΠ° Π·Π° ΠΌΠ°Π³ΠΈΡΠ΅ΡΠΊΠΈΡ ΡΠ΅Π΄ imap_conn_id. Apache Airflow ΡΡΡ ΡΠ°Π½ΡΠ²Π° ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΠΈ Π½Π° Π²ΡΡΠ·ΠΊΠ°ΡΠ° (Π²Π»ΠΈΠ·Π°Π½Π΅, ΠΏΠ°ΡΠΎΠ»Π°, Π°Π΄ΡΠ΅Ρ ΠΈ Π΄ΡΡΠ³ΠΈ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΠΈ), ΠΊΠΎΠΈΡΠΎ ΠΌΠΎΠ³Π°Ρ Π΄Π° Π±ΡΠ΄Π°Ρ Π΄ΠΎΡΡΡΠΏΠ½ΠΈ ΡΡΠ΅Π· ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ Π½Π° Π½ΠΈΠ·. ΠΠΈΠ·ΡΠ°Π»Π½ΠΎ ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΠ΅ΡΠΎ Π½Π° Π²ΡΡΠ·ΠΊΠ°ΡΠ° ΠΈΠ·Π³Π»Π΅ΠΆΠ΄Π° ΡΠ°ΠΊΠ°
Π‘Π΅Π½Π·ΠΎΡ Π·Π° ΠΈΠ·ΡΠ°ΠΊΠ²Π°Π½Π΅ Π½Π° Π΄Π°Π½Π½ΠΈ
Π’ΡΠΉ ΠΊΠ°ΡΠΎ Π²Π΅ΡΠ΅ Π·Π½Π°Π΅ΠΌ ΠΊΠ°ΠΊ Π΄Π° ΡΠ΅ ΡΠ²ΡΡΠ·Π²Π°ΠΌΠ΅ ΠΈ Π΄Π° ΠΏΠΎΠ»ΡΡΠ°Π²Π°ΠΌΠ΅ Π΄Π°Π½Π½ΠΈ ΠΎΡ ΠΏΠΎΡΠ°ΡΠ°, ΡΠ΅Π³Π° ΠΌΠΎΠΆΠ΅ΠΌ Π΄Π° Π½Π°ΠΏΠΈΡΠ΅ΠΌ ΡΠ΅Π½Π·ΠΎΡ, ΠΊΠΎΠΉΡΠΎ Π΄Π° Π³ΠΈ ΡΠ°ΠΊΠ°. Π ΠΌΠΎΡ ΡΠ»ΡΡΠ°ΠΉ Π½Π΅ ΡΠ΅ ΠΏΠΎΠ»ΡΡΠΈ Π²Π΅Π΄Π½Π°Π³Π° Π΄Π° Π½Π°ΠΏΠΈΡΠ° ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ, ΠΊΠΎΠΉΡΠΎ ΡΠ΅ ΠΎΠ±ΡΠ°Π±ΠΎΡΠ²Π° Π΄Π°Π½Π½ΠΈΡΠ΅, Π°ΠΊΠΎ ΠΈΠΌΠ° ΡΠ°ΠΊΠΈΠ²Π°, Π·Π°ΡΠΎΡΠΎ Π΄ΡΡΠ³ΠΈ ΠΏΡΠΎΡΠ΅ΡΠΈ ΡΠ°Π±ΠΎΡΡΡ Π²ΡΠ· ΠΎΡΠ½ΠΎΠ²Π° Π½Π° Π΄Π°Π½Π½ΠΈΡΠ΅, ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈ ΠΎΡ ΠΏΠΎΡΠ°ΡΠ°, Π²ΠΊΠ»ΡΡΠΈΡΠ΅Π»Π½ΠΎ ΡΠ΅Π·ΠΈ, ΠΊΠΎΠΈΡΠΎ Π²Π·Π΅ΠΌΠ°Ρ ΡΠ²ΡΡΠ·Π°Π½ΠΈ Π΄Π°Π½Π½ΠΈ ΠΎΡ Π΄ΡΡΠ³ΠΈ ΠΈΠ·ΡΠΎΡΠ½ΠΈΡΠΈ (API, ΡΠ΅Π»Π΅ΡΠΎΠ½ΠΈΡ , ΡΠ΅Π± ΠΏΠΎΠΊΠ°Π·Π°ΡΠ΅Π»ΠΈ ΠΈ Ρ.Π½.) ΠΈ Ρ.Π½.). Π©Π΅ Π²ΠΈ Π΄Π°ΠΌ ΠΏΡΠΈΠΌΠ΅Ρ. Π CRM ΡΠΈΡΡΠ΅ΠΌΠ°ΡΠ° ΡΠ΅ ΠΏΠΎΡΠ²ΠΈ Π½ΠΎΠ² ΠΏΠΎΡΡΠ΅Π±ΠΈΡΠ΅Π» ΠΈ Π²ΡΠ΅ ΠΎΡΠ΅ Π½Π΅ Π·Π½Π°Π΅ΠΌ Π·Π° Π½Π΅Π³ΠΎΠ²ΠΈΡ UUID. Π‘Π»Π΅Π΄ ΡΠΎΠ²Π°, ΠΊΠΎΠ³Π°ΡΠΎ ΡΠ΅ ΠΎΠΏΠΈΡΠ²Π°ΠΌΠ΅ Π΄Π° ΠΏΠΎΠ»ΡΡΠΈΠΌ Π΄Π°Π½Π½ΠΈ ΠΎΡ SIP ΡΠ΅Π»Π΅ΡΠΎΠ½ΠΈΡ, ΡΠ΅ ΠΏΠΎΠ»ΡΡΠ°Π²Π°ΠΌΠ΅ ΠΎΠ±Π°ΠΆΠ΄Π°Π½ΠΈΡ, ΡΠ²ΡΡΠ·Π°Π½ΠΈ Ρ Π½Π΅Π³ΠΎΠ²ΠΈΡ UUID, Π½ΠΎ Π½ΡΠΌΠ° Π΄Π° ΠΌΠΎΠΆΠ΅ΠΌ Π΄Π° Π³ΠΈ Π·Π°ΠΏΠ°Π·ΠΈΠΌ ΠΈ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ ΠΏΡΠ°Π²ΠΈΠ»Π½ΠΎ. ΠΡΠΈ ΡΠ°ΠΊΠΈΠ²Π° Π²ΡΠΏΡΠΎΡΠΈ Π΅ Π²Π°ΠΆΠ½ΠΎ Π΄Π° ΡΠ΅ ΠΈΠΌΠ° ΠΏΡΠ΅Π΄Π²ΠΈΠ΄ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΡΠ° Π½Π° Π΄Π°Π½Π½ΠΈΡΠ΅, ΠΎΡΠΎΠ±Π΅Π½ΠΎ Π°ΠΊΠΎ ΡΠ° ΠΎΡ ΡΠ°Π·Π»ΠΈΡΠ½ΠΈ ΠΈΠ·ΡΠΎΡΠ½ΠΈΡΠΈ. Π’ΠΎΠ²Π°, ΡΠ°Π·Π±ΠΈΡΠ° ΡΠ΅, ΡΠ° Π½Π΅Π΄ΠΎΡΡΠ°ΡΡΡΠ½ΠΈ ΠΌΠ΅ΡΠΊΠΈ Π·Π° Π·Π°ΠΏΠ°Π·Π²Π°Π½Π΅ ΡΠ΅Π»ΠΎΡΡΡΠ° Π½Π° Π΄Π°Π½Π½ΠΈΡΠ΅, Π½ΠΎ Π² Π½ΡΠΊΠΎΠΈ ΡΠ»ΡΡΠ°ΠΈ ΡΠ° Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΈ. ΠΠ°, ΠΈ Π±Π΅Π·Π΄Π΅ΠΉΡΡΠ²ΠΈΠ΅ΡΠΎ Π·Π° Π·Π°Π΅ΠΌΠ°Π½Π΅ Π½Π° ΡΠ΅ΡΡΡΡΠΈ ΡΡΡΠΎ Π΅ ΠΈΡΠ°ΡΠΈΠΎΠ½Π°Π»Π½ΠΎ.
ΠΠΎ ΡΠΎΠ·ΠΈ Π½Π°ΡΠΈΠ½ Π½Π°ΡΠΈΡΡ ΡΠ΅Π½Π·ΠΎΡ ΡΠ΅ ΡΡΠ°ΡΡΠΈΡΠ° ΡΠ»Π΅Π΄Π²Π°ΡΠΈΡΠ΅ Π²ΡΡΡ ΠΎΠ²Π΅ Π½Π° Π³ΡΠ°ΡΠΈΠΊΠ°ΡΠ°, Π°ΠΊΠΎ ΠΈΠΌΠ° ΡΠ²Π΅ΠΆΠ° ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ Π² ΠΏΠΎΡΠ°ΡΠ°, Π° ΡΡΡΠΎ ΡΠ°ΠΊΠ° ΡΠ΅ ΠΌΠ°ΡΠΊΠΈΡΠ° ΠΏΡΠ΅Π΄ΠΈΡΠ½Π°ΡΠ° ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ ΠΊΠ°ΡΠΎ Π½Π΅ΡΠΌΠ΅ΡΡΠ½Π°.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
ΠΠΈΠ΅ ΠΏΠΎΠ»ΡΡΠ°Π²Π°ΠΌΠ΅ ΠΈ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ Π΄Π°Π½Π½ΠΈ
ΠΠ° Π΄Π° ΠΏΠΎΠ»ΡΡΠ°Π²Π°ΡΠ΅ ΠΈ ΠΎΠ±ΡΠ°Π±ΠΎΡΠ²Π°ΡΠ΅ Π΄Π°Π½Π½ΠΈ, ΠΌΠΎΠΆΠ΅ΡΠ΅ Π΄Π° Π½Π°ΠΏΠΈΡΠ΅ΡΠ΅ ΠΎΡΠ΄Π΅Π»Π΅Π½ ΠΎΠΏΠ΅ΡΠ°ΡΠΎΡ, ΠΌΠΎΠΆΠ΅ΡΠ΅ Π΄Π° ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΡΠ΅ Π³ΠΎΡΠΎΠ²ΠΈ. Π’ΡΠΉ ΠΊΠ°ΡΠΎ Π»ΠΎΠ³ΠΈΠΊΠ°ΡΠ° Π²ΡΠ΅ ΠΎΡΠ΅ Π΅ ΡΡΠΈΠ²ΠΈΠ°Π»Π½Π° - Π΄Π° Π²Π·Π΅ΠΌΠ΅ΠΌ Π΄Π°Π½Π½ΠΈ ΠΎΡ ΠΏΠΈΡΠΌΠΎΡΠΎ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, ΠΏΡΠ΅Π΄Π»Π°Π³Π°ΠΌ ΡΡΠ°Π½Π΄Π°ΡΡΠ½ΠΈΡ PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
ΠΠ΅ΠΆΠ΄Ρ Π΄ΡΡΠ³ΠΎΡΠΎ, Π°ΠΊΠΎ Π²Π°ΡΠ°ΡΠ° ΠΊΠΎΡΠΏΠΎΡΠ°ΡΠΈΠ²Π½Π° ΠΏΠΎΡΠ° ΡΡΡΠΎ Π΅ Π½Π° mail.ru, ΡΠΎΠ³Π°Π²Π° Π½ΡΠΌΠ° Π΄Π° ΠΌΠΎΠΆΠ΅ΡΠ΅ Π΄Π° ΡΡΡΡΠΈΡΠ΅ ΠΏΠΈΡΠΌΠ° ΠΏΠΎ ΡΠ΅ΠΌΠ°, ΠΏΠΎΠ΄Π°ΡΠ΅Π» ΠΈ Ρ.Π½. ΠΡΠ΅ ΠΏΡΠ΅Π· 2016 Π³. ΠΎΠ±Π΅ΡΠ°Ρ Π° Π΄Π° Π³ΠΎ Π²ΡΠ²Π΅Π΄Π°Ρ, Π½ΠΎ ΡΠ²Π½ΠΎ ΡΠ° ΡΠ°Π·ΠΌΠΈΡΠ»ΠΈΠ»ΠΈ. Π Π΅ΡΠΈΡ ΡΠΎΠ·ΠΈ ΠΏΡΠΎΠ±Π»Π΅ΠΌ, ΠΊΠ°ΡΠΎ ΡΡΠ·Π΄Π°Π΄ΠΎΡ ΠΎΡΠ΄Π΅Π»Π½Π° ΠΏΠ°ΠΏΠΊΠ° Π·Π° Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΈΡΠ΅ ΠΏΠΈΡΠΌΠ° ΠΈ Π½Π°ΡΡΡΠΎΠΈΡ ΡΠΈΠ»ΡΡΡ Π·Π° Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΈΡΠ΅ ΠΏΠΈΡΠΌΠ° Π² ΡΠ΅Π± ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡΠ° Π½Π° ΠΏΠΎΡΠ°ΡΠ°. ΠΠΎ ΡΠΎΠ·ΠΈ Π½Π°ΡΠΈΠ½ Π² ΡΠ°Π·ΠΈ ΠΏΠ°ΠΏΠΊΠ° Π²Π»ΠΈΠ·Π°Ρ ΡΠ°ΠΌΠΎ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΈΡΠ΅ Π±ΡΠΊΠ²ΠΈ ΠΈ ΡΡΠ»ΠΎΠ²ΠΈΡ Π·Π° ΡΡΡΡΠ΅Π½Π΅, Π² ΠΌΠΎΡ ΡΠ»ΡΡΠ°ΠΉ ΠΏΡΠΎΡΡΠΎ (ΠΠΠΠΠΠΠΠ).
ΠΠ±ΠΎΠ±ΡΠ°Π²Π°ΠΉΠΊΠΈ, ΠΈΠΌΠ°ΠΌΠ΅ ΡΠ»Π΅Π΄Π½Π°ΡΠ° ΠΏΠΎΡΠ»Π΅Π΄ΠΎΠ²Π°ΡΠ΅Π»Π½ΠΎΡΡ: ΠΏΡΠΎΠ²Π΅ΡΡΠ²Π°ΠΌΠ΅ Π΄Π°Π»ΠΈ ΠΈΠΌΠ° Π½ΠΎΠ²ΠΈ ΠΏΠΈΡΠΌΠ°, ΠΊΠΎΠΈΡΠΎ ΠΎΡΠ³ΠΎΠ²Π°ΡΡΡ Π½Π° ΡΡΠ»ΠΎΠ²ΠΈΡΡΠ°, Π°ΠΊΠΎ ΠΈΠΌΠ°, ΡΠΎΠ³Π°Π²Π° ΠΈΠ·ΡΠ΅Π³Π»ΡΠΌΠ΅ Π°ΡΡ
ΠΈΠ²Π°, ΠΊΠ°ΡΠΎ ΠΈΠ·ΠΏΠΎΠ»Π·Π²Π°ΠΌΠ΅ Π²ΡΡΠ·ΠΊΠ°ΡΠ° ΠΎΡ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΎΡΠΎ ΠΏΠΈΡΠΌΠΎ.
ΠΠΎΠ΄ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΡΠ΅ ΡΠΎΡΠΊΠΈ Π΅ ΠΏΡΠΎΠΏΡΡΠ½Π°ΡΠΎ, ΡΠ΅ ΡΠΎΠ·ΠΈ Π°ΡΡ
ΠΈΠ² ΡΠ΅ Π±ΡΠ΄Π΅ ΡΠ°Π·ΠΎΠΏΠ°ΠΊΠΎΠ²Π°Π½, Π΄Π°Π½Π½ΠΈΡΠ΅ ΠΎΡ Π°ΡΡ
ΠΈΠ²Π° ΡΠ΅ Π±ΡΠ΄Π°Ρ ΠΈΠ·ΡΠΈΡΡΠ΅Π½ΠΈ ΠΈ ΠΎΠ±ΡΠ°Π±ΠΎΡΠ΅Π½ΠΈ ΠΈ Π² ΡΠ΅Π·ΡΠ»ΡΠ°Ρ Π½Π° ΡΠΎΠ²Π° Π²ΡΠΈΡΠΊΠΎ ΡΠ΅ ΠΎΡΠΈΠ΄Π΅ ΠΏΠΎ-Π½Π°ΡΠ°ΡΡΠΊ Π² ΠΊΠΎΠ½Π²Π΅ΠΉΠ΅ΡΠ° Π½Π° ETL ΠΏΡΠΎΡΠ΅ΡΠ°, Π½ΠΎ ΡΠΎΠ²Π° Π²Π΅ΡΠ΅ Π΅ ΠΎΡΠ²ΡΠ΄ ΠΎΠ±Ρ
Π²Π°ΡΠ° Π½Π° ΡΡΠ°ΡΠΈΡΡΠ°. ΠΠΊΠΎ ΡΠ΅ ΠΎΠΊΠ°Π·Π° ΠΈΠ½ΡΠ΅ΡΠ΅ΡΠ½ΠΎ ΠΈ ΠΏΠΎΠ»Π΅Π·Π½ΠΎ, ΡΠΎΠ³Π°Π²Π° Ρ ΡΠ΄ΠΎΠ²ΠΎΠ»ΡΡΠ²ΠΈΠ΅ ΡΠ΅ ΠΏΡΠΎΠ΄ΡΠ»ΠΆΠ° Π΄Π° ΠΎΠΏΠΈΡΠ²Π°ΠΌ ETL ΡΠ΅ΡΠ΅Π½ΠΈΡ ΠΈ ΡΠ΅Ρ
Π½ΠΈΡΠ΅ ΡΠ°ΡΡΠΈ Π·Π° Apache Airflow.
ΠΠ·ΡΠΎΡΠ½ΠΈΠΊ: www.habr.com