Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αžŸαž½αžŸαŸ’αžαžΈ αžαŸ’αž‰αž»αŸ†αž‚αžΊ Dmitry Logvinenko - αžœαž·αžŸαŸ’αžœαž€αžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αž“αŸƒαž“αžΆαž™αž€αžŠαŸ’αž‹αžΆαž“αžœαž·αž—αžΆαž‚αž“αŸƒαž€αŸ’αžšαž»αž˜αž€αŸ’αžšαž»αž˜αž αŸŠαž»αž“ Vezet αŸ”

αžαŸ’αž‰αž»αŸ†αž“αžΉαž„αž”αŸ’αžšαžΆαž”αŸ‹αž’αŸ’αž“αž€αž’αŸ†αž–αžΈαž§αž”αž€αžšαžŽαŸαžŠαŸαž’αžŸαŸ’αž…αžΆαžšαŸ’αž™αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž”αž„αŸ’αž€αžΎαžαžŠαŸ†αžŽαžΎαžšαž€αžΆαžš ETL - Apache Airflow αŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚ Airflow αž˜αžΆαž“αž—αžΆαž–αž…αž˜αŸ’αžšαž»αŸ‡ αž“αž·αž„αž…αŸ’αžšαžΎαž“αž™αŸ‰αžΆαž„αžŠαŸ‚αž›αž’αŸ’αž“αž€αž‚αž½αžšαžαŸ‚αž–αž·αž“αž·αžαŸ’αž™αž˜αžΎαž›αžœαžΆαž±αŸ’αž™αž€αžΆαž“αŸ‹αžαŸ‚αžŠαž·αžαžŠαž›αŸ‹ αž‘αŸ„αŸ‡αž”αžΈαž‡αžΆαž’αŸ’αž“αž€αž˜αž·αž“αž–αžΆαž€αŸ‹αž–αŸαž“αŸ’αž’αž“αžΉαž„αž›αŸ†αž αžΌαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αž€αŸαžŠαŸ„αž™ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž…αžΆαŸ†αž”αžΆαž…αŸ‹αžαŸ’αžšαžΌαžœαž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžŽαžΆαž˜αž½αž™αž‡αžΆαž‘αŸ€αž„αž‘αžΆαžαŸ‹ αž“αž·αž„αžαžΆαž˜αžŠαžΆαž“αž€αžΆαžšαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αžšαž”αžŸαŸ‹αž–αž½αž€αž‚αŸαŸ”

αž αžΎαž™αž”αžΆαž‘, αžαŸ’αž‰αž»αŸ†αž“αžΉαž„αž˜αž·αž“αžαŸ’αžšαžΉαž˜αžαŸ‚αž”αŸ’αžšαžΆαž”αŸ‹, αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž€αŸαž”αž„αŸ’αž αžΆαž‰αž•αž„αžŠαŸ‚αžš: αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž“αŸαŸ‡αž˜αžΆαž“αž€αžΌαžŠαž‡αžΆαž…αŸ’αžšαžΎαž“, αžšαžΌαž”αžαžαž’αŸαž€αŸ’αžšαž„αŸ‹αž“αž·αž„αž’αž“αž»αžŸαžΆαžŸαž“αŸαŸ”

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›
αž’αŸ’αžœαžΈβ€‹αžŠαŸ‚αž›β€‹αž’αŸ’αž“αž€β€‹αžƒαžΎαž‰β€‹αž‡αžΆβ€‹αž’αž˜αŸ’αž˜αžαžΆβ€‹αž“αŸ…β€‹αž–αŸαž›β€‹αžŠαŸ‚αž›β€‹αž’αŸ’αž“αž€ Google αž–αžΆαž€αŸ’αž™ Airflow / Wikimedia Commons

αžαžΆαžšαžΆαž„αž˜αžΆαžαž·αž€αžΆ

αžŸαŸαž…αž€αŸ’αžαžΈαžŽαŸ‚αž“αžΆαŸ†

Apache Airflow αž‚αžΊαžŠαžΌαž…αž‡αžΆ Django αžŠαŸ‚αžšαŸ–

  • αžŸαžšαžŸαŸαžšαž‡αžΆ python
  • αž˜αžΆαž“αž•αŸ’αž‘αžΆαŸ†αž„αž‚αŸ’αžšαž”αŸ‹αž‚αŸ’αžšαž„αžŠαŸαž’αžŸαŸ’αž…αžΆαžšαŸ’αž™
  • αž’αžΆαž…αž–αž„αŸ’αžšαžΈαž€αž”αžΆαž“αžŠαŸ„αž™αž‚αŸ’αž˜αžΆαž“αž€αŸ†αžŽαžαŸ‹

- αž”αŸ’αžšαžŸαžΎαžšαž‡αžΆαž„ αž αžΎαž™αžœαžΆαžαŸ’αžšαžΌαžœαž”αžΆαž“αž”αž„αŸ’αž€αžΎαžαž‘αžΎαž„αž€αŸ’αž“αž»αž„αž‚αŸ„αž›αž”αŸ†αžŽαž„αžαž»αžŸαž‚αŸ’αž“αžΆαž‘αžΆαŸ†αž„αžŸαŸ’αžšαž»αž„ αž–αŸ„αž›αž‚αžΊ (αžŠαžΌαž…αžŠαŸ‚αž›αžœαžΆαžαŸ’αžšαžΌαžœαž”αžΆαž“αžŸαžšαžŸαŸαžšαž˜αž»αž“αž€αžΆαžαžΆ)αŸ–

  • αž€αŸ†αž–αž»αž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžš αž“αž·αž„αžαŸ’αžšαž½αžαž–αž·αž“αž·αžαŸ’αž™αž€αž·αž…αŸ’αž…αž€αžΆαžšαž“αŸ…αž›αžΎαž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž…αŸ†αž“αž½αž“αž‚αŸ’αž˜αžΆαž“αžŠαŸ‚αž“αž€αŸ†αžŽαžαŸ‹ (αžŠαžΌαž…αž‡αžΆ Celery / Kubernetes αž‡αžΆαž…αŸ’αžšαžΎαž“ αž“αž·αž„αž˜αž“αžŸαž·αž€αžΆαžšαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αž“αžΉαž„αž’αž“αž»αž‰αŸ’αž‰αžΆαžαž±αŸ’αž™αž’αŸ’αž“αž€)
  • αž‡αžΆαž˜αž½αž™αž“αžΉαž„αž€αžΆαžšαž”αž„αŸ’αž€αžΎαžαž›αŸ†αž αžΌαžšαž€αžΆαžšαž„αžΆαžšαžαžΆαž˜αžœαž“αŸ’αžαž–αžΈαž—αžΆαž–αž„αžΆαž™αžŸαŸ’αžšαž½αž›αž€αŸ’αž“αž»αž„αž€αžΆαžšαžŸαžšαžŸαŸαžš αž“αž·αž„αž™αž›αŸ‹αž€αžΌαžŠ Python
  • αž“αž·αž„αžŸαž˜αžαŸ’αžαž—αžΆαž–αž€αŸ’αž“αž»αž„αž€αžΆαžšαž—αŸ’αž‡αžΆαž”αŸ‹αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™ αž“αž·αž„ APIs αž‡αžΆαž˜αž½αž™αž‚αŸ’αž“αžΆαžŠαŸ„αž™αž”αŸ’αžšαžΎαžŸαž˜αžΆαžŸαž’αžΆαžαž»αžŠαŸ‚αž›αžαŸ’αžšαŸ€αž˜αžšαž½αž…αž‡αžΆαžŸαŸ’αžšαŸαž… αž“αž·αž„αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž‡αŸ†αž“αž½αž™αžŠαŸ‚αž›αž•αž›αž·αžαž“αŸ…αž•αŸ’αž‘αŸ‡ (αžŠαŸ‚αž›αžŸαžΆαž˜αž‰αŸ’αž‰αž”αŸ†αž•αž»αž)αŸ”

αž™αžΎαž„αž”αŸ’αžšαžΎ Apache Airflow αžŠαžΌαž…αž“αŸαŸ‡αŸ–

  • αž™αžΎαž„αž”αŸ’αžšαž˜αžΌαž›αž‘αž·αž“αŸ’αž“αž“αŸαž™αž–αžΈαž”αŸ’αžšαž—αž–αž•αŸ’αžŸαŸαž„αŸ— (αž§αž‘αžΆαž αžšαžŽαŸ SQL Server αž“αž·αž„ PostgreSQL αž‡αžΆαž…αŸ’αžšαžΎαž“ APIs αž‡αžΆαž…αŸ’αžšαžΎαž“αžŠαŸ‚αž›αž˜αžΆαž“αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαžœαžΆαžŸαŸ‹αžœαŸ‚αž„ αžŸαžΌαž˜αŸ’αž”αžΈαžαŸ‚ 1C) αž“αŸ…αž€αŸ’αž“αž»αž„ DWH αž“αž·αž„ ODS (αž™αžΎαž„αž˜αžΆαž“ Vertica αž“αž·αž„ Clickhouse)αŸ”
  • αž€αž˜αŸ’αžšαž·αžαžŽαžΆ cronαžŠαŸ‚αž›αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž”αž„αŸ’αžšαž½αž”αž”αž„αŸ’αžšαž½αž˜αž‘αž·αž“αŸ’αž“αž“αŸαž™αž“αŸ…αž›αžΎ ODS αž“αž·αž„αžαŸ’αžšαž½αžαž–αž·αž“αž·αžαŸ’αž™αž€αžΆαžšαžαŸ‚αž‘αžΆαŸ†αžšαž”αžŸαŸ‹αž–αž½αž€αž‚αŸαž•αž„αžŠαŸ‚αžšαŸ”

αžšαž αžΌαžαž˜αž€αžŠαž›αŸ‹αž–αŸαž›αžαŸ’αž˜αžΈαŸ—αž“αŸαŸ‡ αžαž˜αŸ’αžšαžΌαžœαž€αžΆαžšαžšαž”αžŸαŸ‹αž™αžΎαž„αžαŸ’αžšαžΌαžœαž”αžΆαž“αž‚αŸ’αžšαž”αžŠαžŽαŸ’αžαž”αŸ‹αžŠαŸ„αž™αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž˜αŸαžαžΌαž…αž˜αž½αž™αžŠαŸ‚αž›αž˜αžΆαž“ 32 cores αž“αž·αž„ 50 GB αž“αŸƒ RAM αŸ” αž“αŸ…αž€αŸ’αž“αž»αž„ Airflow αžœαžΆαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαŸ–

  • Π±ΠΎΠ»Π΅Π΅ 200 ដអស (αžαžΆαž˜αž–αž·αžαž›αŸ†αž αžΌαžšαž€αžΆαžšαž„αžΆαžšαžŠαŸ‚αž›αž™αžΎαž„αž”αžΆαž“αž”αŸ†αž–αŸαž‰αž€αž·αž…αŸ’αž…αž€αžΆαžš)
  • αž‡αžΆαž˜αž’αŸ’αž™αž˜αž“αžΈαž˜αž½αž™αŸ— 70 αž€αž·αž…αŸ’αž…αž€αžΆαžš,
  • αž—αžΆαž–αž›αŸ’αž’αž“αŸαŸ‡αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜ (αž‡αžΆαž˜αž’αŸ’αž™αž˜αž•αž„αžŠαŸ‚αžš) αž˜αŸ’αžαž„αž€αŸ’αž“αž»αž„αž˜αž½αž™αž˜αŸ‰αŸ„αž„.

αž αžΎαž™αž’αŸ†αž–αžΈαžšαž”αŸ€αž”αžŠαŸ‚αž›αž™αžΎαž„αž–αž„αŸ’αžšαžΈαž€ αžαŸ’αž‰αž»αŸ†αž“αžΉαž„αžŸαžšαžŸαŸαžšαžαžΆαž„αž€αŸ’αžšαŸ„αž˜ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž₯αž‘αžΌαžœαž“αŸαŸ‡ αž…αžΌαžšαž€αŸ†αžŽαžαŸ‹αž”αž‰αŸ’αž αžΆ ΓΌber αžŠαŸ‚αž›αž™αžΎαž„αž“αžΉαž„αžŠαŸ„αŸ‡αžŸαŸ’αžšαžΆαž™αŸ–

αž˜αžΆαž“αž”αŸ’αžšαž—αž– SQL Servers αž…αŸ†αž“αž½αž“ 50 αžŠαŸ‚αž›αž“αžΈαž˜αž½αž™αŸ—αž˜αžΆαž“ XNUMX databases - instances of one project αžšαŸ€αž„αž‚αŸ’αž“αžΆαž–αž½αž€αž‚αŸαž˜αžΆαž“αžšαž…αž“αžΆαžŸαž˜αŸ’αž–αŸαž“αŸ’αž’αžŠαžΌαž…αž‚αŸ’αž“αžΆ (αžŸαŸ’αž‘αžΎαžšαžαŸ‚αž‚αŸ’αžšαž”αŸ‹αž‘αžΈαž€αž“αŸ’αž›αŸ‚αž„ mua-ha-ha) αžŠαŸ‚αž›αž˜αžΆαž“αž“αŸαž™αžαžΆαž“αžΈαž˜αž½αž™αŸ—αž˜αžΆαž“αžαžΆαžšαžΆαž„ Orders (αž‡αžΆαžŸαŸ†αžŽαžΆαž„αž›αŸ’αž’ αžαžΆαžšαžΆαž„αžŠαŸ‚αž›αž˜αžΆαž“αž“αŸ„αŸ‡ αžˆαŸ’αž˜αŸ„αŸ‡αž’αžΆαž…αžαŸ’αžšαžΌαžœαž”αžΆαž“αž‡αŸ†αžšαž»αž‰αž‘αŸ…αž€αŸ’αž“αž»αž„αž’αžΆαž‡αžΈαžœαž€αž˜αŸ’αž˜αžŽαžΆαž˜αž½αž™) αŸ” αž™αžΎαž„αž™αž€αž‘αž·αž“αŸ’αž“αž“αŸαž™αžŠαŸ„αž™αž”αž“αŸ’αžαŸ‚αž˜αžœαžΆαž›αžŸαŸαžœαžΆαž€αž˜αŸ’αž˜ (αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž˜αŸαž”αŸ’αžšαž—αž– αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž”αŸ’αžšαž—αž– αž›αŸαžαžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž€αž·αž…αŸ’αž…αž€αžΆαžš ETL) αž αžΎαž™αž”αŸ„αŸ‡αžœαžΆαžŠαŸ„αž™αž₯αžαž›αžΆαž€αŸ‹αž›αŸ€αž˜ αž“αž·αž™αžΆαž™αžαžΆ Vertica αŸ”

αžαŸ„αŸ‡αž‘αŸ…!

αž•αŸ’αž“αŸ‚αž€αžŸαŸ†αžαžΆαž“αŸ‹ αž‡αžΆαž€αŸ‹αžŸαŸ’αžαŸ‚αž„ (αž“αž·αž„αž‘αŸ’αžšαžΉαžŸαŸ’αžαžΈαž”αž“αŸ’αžαž·αž…)

αž αŸαžαž»αž’αŸ’αžœαžΈαž”αžΆαž“αž‡αžΆαž™αžΎαž„ (αž“αž·αž„αž’αŸ’αž“αž€)

αž“αŸ…αž–αŸαž›αžŠαŸ‚αž›αžŠαžΎαž˜αžˆαžΎαž’αŸ†αž αžΎαž™αžαŸ’αž‰αž»αŸ†αžŸαžΆαž˜αž‰αŸ’αž‰ SQL-schik αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžΆαžšαž›αž€αŸ‹αžšαžΆαž™αžšαž”αžŸαŸ‹αžšαž»αžŸαŸ’αžŸαŸŠαžΈαž˜αž½αž™ αž™αžΎαž„αž”αžΆαž“αž”αŸ„αž€αž”αŸ’αžšαžΆαžŸαŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžš ETL aka αž›αŸ†αž αžΌαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŠαŸ„αž™αž”αŸ’αžšαžΎαž§αž”αž€αžšαžŽαŸαž–αžΈαžšαžŠαŸ‚αž›αž˜αžΆαž“αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž–αž½αž€αž™αžΎαž„αŸ–

  • αž˜αž‡αŸ’αžˆαž˜αžŽαŸ’αžŒαž›αžαžΆαž˜αž–αž› Informatica - αž”αŸ’αžšαž–αŸαž“αŸ’αž’αžšαžΈαž€αžšαžΆαž›αžŠαžΆαž›αžαŸ’αž›αžΆαŸ†αž„ αž•αž›αž·αžαž—αžΆαž–αžαŸ’αž›αžΆαŸ†αž„ αž‡αžΆαž˜αž½αž™αž“αžΉαž„αž•αŸ’αž“αŸ‚αž€αžšαžΉαž„αž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“ αž€αŸ†αžŽαŸ‚αž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“αžšαž”αžŸαŸ‹αžœαžΆαŸ” αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž”αŸ’αžšαžΎαž–αŸ’αžšαŸ‡αž αžΆαž˜αžƒαžΆαžαŸ‹ 1% αž“αŸƒαžŸαž˜αžαŸ’αžαž—αžΆαž–αžšαž”αžŸαŸ‹αžœαžΆαŸ” αž αŸαžαž»αž’αŸ’αžœαžΈ? αž‡αžΆαžŠαŸ†αž”αžΌαž„ αž…αŸ†αžŽαž»αž…αž”αŸ’αžšαž‘αžΆαž€αŸ‹αž“αŸαŸ‡ αž€αž“αŸ’αž›αŸ‚αž„αžŽαžΆαž˜αž½αž™αž–αžΈαž‘αžŸαžœαžαŸ’αžŸαžšαŸαž†αŸ’αž“αžΆαŸ† 380 αž”αžΆαž“αžŠαžΆαž€αŸ‹αžŸαž˜αŸ’αž–αžΆαž’αž•αŸ’αž›αžΌαžœαž…αž·αžαŸ’αžαž›αžΎαž™αžΎαž„αŸ” αž‘αžΈαž–αžΈαžš αž€αžΆαžšαž”αž„αŸ’αžαžΆαŸ†αž„αž“αŸαŸ‡αžαŸ’αžšαžΌαžœαž”αžΆαž“αžšαž…αž“αžΆαž‘αžΎαž„αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžŠαŸαž”αŸ’αžšαžŽαž·αžαž”αŸ†αž•αž»αž αž€αžΆαžšαž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹αž‘αžΎαž„αžœαž·αž‰αž“αžΌαžœαžŸαž˜αžΆαžŸαž’αžΆαžαž»αžŠαŸαžαžΉαž„αžŸαž˜αŸ’αž”αžΆαžš αž“αž·αž„αž›αŸ’αž”αž·αž…αžŸαž αž‚αŸ’αžšαžΆαžŸαžŸαŸ†αžαžΆαž“αŸ‹αŸ—αžŠαž‘αŸƒαž‘αŸ€αžαŸ” αž’αŸ†αž–αžΈαž’αŸ’αžœαžΈαžŠαŸ‚αž›αžœαžΆαž˜αžΆαž“αžαž˜αŸ’αž›αŸƒαžŠαžΌαž…αž‡αžΆαžŸαŸ’αž›αžΆαž”αžšαž”αžŸαŸ‹ Airbus AXNUMX / αž†αŸ’αž“αžΆαŸ†αž™αžΎαž„αž“αžΉαž„αž˜αž·αž“αž“αž·αž™αžΆαž™αž’αŸ’αžœαžΈαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αŸ”

    αžŸαžΌαž˜αž”αŸ’αžšαž™αŸαžαŸ’αž“ αžšαžΌαž”αžαžαž’αŸαž€αŸ’αžšαž„αŸ‹αž’αžΆαž…αž”αŸ‰αŸ‡αž–αžΆαž›αŸ‹αžŠαž›αŸ‹αž˜αž“αž»αžŸαŸ’αžŸαž’αžΆαž™αž»αž€αŸ’αžšαŸ„αž˜ 30 αž†αŸ’αž“αžΆαŸ†αž”αž“αŸ’αžαž·αž…

    Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

  • SQL Server Integration Server - αž™αžΎαž„αž”αžΆαž“αž”αŸ’αžšαžΎαžŸαž˜αž˜αž·αžαŸ’αžαž“αŸαŸ‡αž“αŸ…αž€αŸ’αž“αž»αž„αž›αŸ†αž αžΌαžšαžαžΆαž„αž€αŸ’αž“αž»αž„αž‚αž˜αŸ’αžšαŸ„αž„αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αžαžΆαž˜αž–αž·αžαŸ– αž™αžΎαž„αž”αŸ’αžšαžΎ SQL Server αžšαž½αž…αž αžΎαž™ αž αžΎαž™αžœαžΆαž“αžΉαž„αž˜αž·αž“αžŸαž˜αž αŸαžαž»αž•αž›αž‘αŸαž€αŸ’αž“αž»αž„αž€αžΆαžšαž˜αž·αž“αž”αŸ’αžšαžΎαž§αž”αž€αžšαžŽαŸ ETL αžšαž”αžŸαŸ‹αžœαžΆαŸ” αž’αŸ’αžœαžΈαž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αž“αŸ…αž€αŸ’αž“αž»αž„αžœαžΆαž‚αžΊαž›αŸ’αž’: αž‘αžΆαŸ†αž„αž…αŸ†αžŽαž»αž…αž”αŸ’αžšαž‘αžΆαž€αŸ‹αž‚αžΊαžŸαŸ’αžšαžŸαŸ‹αžŸαŸ’αž’αžΆαž, αž“αž·αž„αžšαž”αžΆαž™αž€αžΆαžšαžŽαŸαžœαžŒαŸ’αžαž“αž—αžΆαž– ... αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž“αŸαŸ‡αž˜αž·αž“αž˜αŸ‚αž“αž‡αžΆαž˜αžΌαž›αž αŸαžαž»αžŠαŸ‚αž›αž™αžΎαž„αžŸαŸ’αžšαž‘αžΆαž‰αŸ‹αž•αž›αž·αžαž•αž›αž€αž˜αŸ’αž˜αžœαž·αž’αžΈ, αž’αžΌαž˜αž·αž“αž˜αŸ‚αž“αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž“αŸαŸ‡αŸ” αž€αŸ†αžŽαŸ‚αžœαžΆαŸ” dtsx (αžŠαŸ‚αž›αž‡αžΆ XML αžŠαŸ‚αž›αž˜αžΆαž“αžαŸ’αž“αžΆαŸ†αž„αžŸαžΆαž”αŸ‹αž“αŸ…αž›αžΎαžšαž€αŸ’αžŸαžΆαž‘αž»αž€) αž™αžΎαž„αž’αžΆαž…αž’αŸ’αžœαžΎαž”αžΆαž“ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžαžΎαž’αŸ’αžœαžΈαž‘αŸ…αž‡αžΆαž…αŸ†αžŽαž»αž…? αžαžΎαž’αŸ’αžœαžΎαžŠαžΌαž…αž˜αŸ’αžαŸαž…αž’αŸ†αž–αžΈαž€αžΆαžšαž”αž„αŸ’αž€αžΎαžαž€αž‰αŸ’αž…αž”αŸ‹αž—αžΆαžšαž€αž·αž…αŸ’αž…αžŠαŸ‚αž›αž“αžΉαž„αž’αžΌαžŸαžαžΆαžšαžΆαž„αžšαžΆαž”αŸ‹αžšαž™αž–αžΈαž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž˜αŸαž˜αž½αž™αž‘αŸ…αž˜αž½αž™αž‘αŸ€αž? αž”αžΆαž‘, αž’αŸ’αžœαžΈαžŠαŸ‚αž›αž˜αž½αž™αžšαž™, αž˜αŸ’αžšαžΆαž˜αžŠαŸƒαžŸαž“αŸ’αž‘αžŸαŸ’αžŸαž“αŸαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αž“αžΉαž„αž’αŸ’αž›αžΆαž€αŸ‹αž…αž»αŸ‡αž–αžΈαž˜αŸ’αž—αŸƒαž”αŸ†αžŽαŸ‚αž€, αž…αž»αž…αž›αžΎαž”αŸŠαžΌαžαž»αž„αž€αžŽαŸ’αžŠαž»αžšαŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžœαžΆαž–αž·αžαž‡αžΆαž˜αžΎαž›αž‘αŸ…αž‘αžΆαž“αŸ‹αžŸαž˜αŸαž™αž‡αžΆαž„αž“αŸαŸ‡αŸ–

    Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž™αžΎαž„αž–αž·αžαž‡αžΆαž”αžΆαž“αžŸαŸ’αžœαŸ‚αž„αžšαž€αž•αŸ’αž›αžΌαžœαž…αŸαž‰αŸ” αž€αžšαžŽαžΈαžŸαžΌαž˜αŸ’αž”αžΈαžαŸ‚ αžŸαŸ’αž‘αžΎαžšαžαŸ‚ αž”αžΆαž“αž˜αž€αžŠαž›αŸ‹αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž”αž„αŸ’αž€αžΎαžαž€αž‰αŸ’αž…αž”αŸ‹ SSIS αžŠαŸ‚αž›αžŸαžšαžŸαŸαžšαžŠαŸ„αž™αžαŸ’αž›αž½αž“αž―αž„...

αž αžΎαž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž€αžΆαžšαž„αžΆαžšαžαŸ’αž˜αžΈαž”αžΆαž“αžšαž€αžƒαžΎαž‰αžαŸ’αž‰αž»αŸ†αŸ” αž αžΎαž™ Apache Airflow αž”αžΆαž“αž™αž€αžˆαŸ’αž“αŸ‡αžαŸ’αž‰αž»αŸ†αŸ”

αž“αŸ…αž–αŸαž›αžŠαŸ‚αž›αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αžšαž€αžƒαžΎαž‰αžαžΆαž€αžΆαžšαž–αž·αž–αžŽαŸŒαž“αžΆαžŠαŸ†αžŽαžΎαžšαž€αžΆαžš ETL αž‚αžΊαž‡αžΆαž€αžΌαžŠ Python αžŸαžΆαž˜αž‰αŸ’αž‰ αžαŸ’αž‰αž»αŸ†αž‚αŸ’αžšαžΆαž“αŸ‹αžαŸ‚αž˜αž·αž“αž”αžΆαž“αžšαžΆαŸ†αžŠαžΎαž˜αŸ’αž”αžΈαž—αžΆαž–αžšαžΈαž€αžšαžΆαž™αŸ” αž“αŸαŸ‡αž‡αžΆαžšαž”αŸ€αž”αžŠαŸ‚αž›αž€αžΆαžšαžŸαŸ’αž‘αŸ’αžšαžΈαž˜αž‘αž·αž“αŸ’αž“αž“αŸαž™αžαŸ’αžšαžΌαžœαž”αžΆαž“αž€αŸ†αžŽαŸ‚ αž“αž·αž„αžαž»αžŸαž‚αŸ’αž“αžΆ αž αžΎαž™αž€αžΆαžšαž”αž‰αŸ’αž…αžΌαž›αžαžΆαžšαžΆαž„αžŠαŸ‚αž›αž˜αžΆαž“αžšαž…αž“αžΆαžŸαž˜αŸ’αž–αŸαž“αŸ’αž’αžαŸ‚αž˜αž½αž™αž–αžΈαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αžšαžΆαž”αŸ‹αžšαž™αž‘αŸ…αž€αŸ’αž“αž»αž„αž‚αŸ„αž›αžŠαŸ…αžαŸ‚αž˜αž½αž™αž”αžΆαž“αž€αŸ’αž›αžΆαž™αž‡αžΆαž”αž‰αŸ’αž αžΆαž“αŸƒαž€αžΌαžŠ Python αž“αŸ…αž€αŸ’αž“αž»αž„αž’αŸαž€αŸ’αžšαž„αŸ‹αž˜αž½αž™αž€αž“αŸ’αž›αŸ‡αž¬αž–αžΈαžš 13 "αŸ”

αž€αžΆαžšαž”αŸ’αžšαž˜αžΌαž›αž•αŸ’αžαž»αŸ†αž…αž„αŸ’αž€αŸ„αž˜

αž…αžΌαžšαž™αžΎαž„αž€αž»αŸ†αžšαŸ€αž”αž…αŸ†αžŸαžΆαž›αžΆαž˜αžαŸ’αžαŸαž™αŸ’αž™αž‘αžΆαŸ†αž„αžŸαŸ’αžšαž»αž„ αž αžΎαž™αž€αž»αŸ†αž“αž·αž™αžΆαž™αž’αŸ†αž–αžΈαž’αŸ’αžœαžΈαžŠαŸ‚αž›αž‡αžΆαž€αŸ‹αžŸαŸ’αžαŸ‚αž„αž‘αžΆαŸ†αž„αžŸαŸ’αžšαž»αž„αž“αŸ…αž‘αžΈαž“αŸαŸ‡ αžŠαžΌαž…αž‡αžΆαž€αžΆαžšαžŠαŸ†αž‘αžΎαž„ Airflow αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αžŠαŸ‚αž›αž’αŸ’αž“αž€αž”αžΆαž“αž‡αŸ’αžšαžΎαžŸαžšαžΎαžŸ Celery αž“αž·αž„αž€αžšαžŽαžΈαž•αŸ’αžŸαŸαž„αž‘αŸ€αžαžŠαŸ‚αž›αž”αžΆαž“αž–αž·αž–αžŽαŸŒαž“αžΆαž“αŸ…αž€αŸ’αž“αž»αž„αž…αžαŸ”

αžŠαžΎαž˜αŸ’αž”αžΈαž²αŸ’αž™αž™αžΎαž„αž’αžΆαž…αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž€αžΆαžšαž–αž·αžŸαŸ„αž’αž“αŸαž—αŸ’αž›αžΆαž˜αŸ— αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž‚αžΌαžŸαžœαžΆαžŸ docker-compose.yml αžŠαŸ‚αž›αž€αŸ’αž“αž»αž„αž“αŸ„αŸ‡αŸ–

  • αž…αžΌαžšαž™αžΎαž„αž›αžΎαž€αž™αŸ‰αžΆαž„αž–αž·αžαž”αŸ’αžšαžΆαž€αžŠ αž›αŸ†αž αžΌαžšβ€‹αžαŸ’αž™αž›αŸ‹αŸ– αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž€αŸ†αžŽαžαŸ‹αž–αŸαž›, Webserver αŸ” αž•αŸ’αž€αžΆαž€αŸαž“αžΉαž„αžœαž·αž›αž‘αŸ…αž‘αžΈαž“αŸ„αŸ‡ αžŠαžΎαž˜αŸ’αž”αžΈαžαžΆαž˜αžŠαžΆαž“αž€αž·αž…αŸ’αž…αž€αžΆαžš Celery (αž–αŸ’αžšαŸ„αŸ‡αžœαžΆαžαŸ’αžšαžΌαžœαž”αžΆαž“αžšαž»αž‰αž…αžΌαž›αžšαž½αž…αž αžΎαž™ apache/airflow:1.10.10-python3.7αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž™αžΎαž„αž˜αž·αž“αž”αŸ’αžšαž€αžΆαž“αŸ‹αž‘αŸ)
  • PostgreSQLαžŠαŸ‚αž›αž€αŸ’αž“αž»αž„αž“αŸ„αŸ‡ Airflow αž“αžΉαž„αžŸαžšαžŸαŸαžšαž–αŸαžαŸŒαž˜αžΆαž“αžŸαŸαžœαžΆαž€αž˜αŸ’αž˜αžšαž”αžŸαŸ‹αžœαžΆ (αž‘αž·αž“αŸ’αž“αž“αŸαž™αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž€αŸ†αžŽαžαŸ‹αž–αŸαž› αžŸαŸ’αžαž·αžαž·αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αŸ”αž›αŸ”) αž αžΎαž™ Celery αž“αžΉαž„αžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž€αž·αž…αŸ’αž…αž€αžΆαžšαžŠαŸ‚αž›αž”αžΆαž“αž”αž‰αŸ’αž…αž”αŸ‹αŸ”
  • RedisαžŠαŸ‚αž›αž“αžΉαž„αžŠαžΎαžšαžαž½αž‡αžΆαžˆαŸ’αž˜αž½αž‰αž€αžŽαŸ’αžαžΆαž›αž€αž·αž…αŸ’αž…αž€αžΆαžšαžŸαž˜αŸ’αžšαžΆαž”αŸ‹ Celery;
  • αž€αž˜αŸ’αž˜αž€αžš CeleryαžŠαŸ‚αž›αž“αžΉαž„αž…αžΌαž›αžšαž½αž˜αž€αŸ’αž“αž»αž„αž€αžΆαžšαž’αž“αž»αžœαžαŸ’αžαž—αžΆαžšαž€αž·αž…αŸ’αž…αžŠαŸ„αž™αž•αŸ’αž‘αžΆαž›αŸ‹αŸ”
  • αž‘αŸ…αžαžαž―αž€αžŸαžΆαžš ./dags αž™αžΎαž„αž“αžΉαž„αž”αž“αŸ’αžαŸ‚αž˜αž―αž€αžŸαžΆαžšαžšαž”αžŸαŸ‹αž™αžΎαž„αž‡αžΆαž˜αž½αž™αž“αžΉαž„αž€αžΆαžšαž–αž·αž–αžŽαŸŒαž“αžΆαž’αŸ†αž–αžΈ dags αŸ” αž–αž½αž€αž‚αŸαž“αžΉαž„αžαŸ’αžšαžΌαžœαž”αžΆαž“αžšαžΎαžŸαž—αŸ’αž›αžΆαž˜αŸ— αžŠαžΌαž…αŸ’αž“αŸαŸ‡αž˜αž·αž“αž…αžΆαŸ†αž”αžΆαž…αŸ‹αž›αŸαž„αž‡αž„αŸ‹αž‘αžΆαŸ†αž„αž˜αžΌαž›αž‘αŸ αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž–αžΈαž€αžŽαŸ’αžαžΆαžŸαŸ‹αž˜αŸ’αžαž„αŸ—αŸ”

αž“αŸ…αž€αž“αŸ’αž›αŸ‚αž„αžαŸ’αž›αŸ‡ αž€αžΌαžŠαž€αŸ’αž“αž»αž„αž§αž‘αžΆαž αžšαžŽαŸαž˜αž·αž“αžαŸ’αžšαžΌαžœαž”αžΆαž“αž”αž„αŸ’αž αžΆαž‰αž‘αžΆαŸ†αž„αžŸαŸ’αžšαž»αž„αž‘αŸ (αžŠαžΎαž˜αŸ’αž”αžΈαž€αž»αŸ†αž±αŸ’αž™αž–αž„αŸ’αžšαžΆαž™αž’αžαŸ’αžαž”αž‘) αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž€αž“αŸ’αž›αŸ‚αž„αžŽαžΆαž˜αž½αž™αžœαžΆαžαŸ’αžšαžΌαžœαž”αžΆαž“αž€αŸ‚αž”αŸ’αžšαŸ‚αž€αŸ’αž“αž»αž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαŸ” αž§αž‘αžΆαž αžšαžŽαŸαž€αžΌαžŠαž€αžΆαžšαž„αžΆαžšαž–αŸαž‰αž›αŸαž‰αž’αžΆαž…αžšαž€αž”αžΆαž“αž“αŸ…αž€αŸ’αž“αž»αž„αžƒαŸ’αž›αžΆαŸ†αž„ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

αž…αŸ†αžŽαžΆαŸ†:

  • αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžΆαžšαž‡αž½αž”αž”αŸ’αžšαž‡αž»αŸ†αž‚αŸ’αž“αžΆαž“αŸƒαžŸαž˜αžΆαžŸαž—αžΆαž–αžαŸ’αž‰αž»αŸ†αž—αžΆαž‚αž…αŸ’αžšαžΎαž“αž–αžΉαž„αž•αŸ’αž’αŸ‚αž€αž›αžΎαžšαžΌαž”αž—αžΆαž–αž›αŸ’αž”αžΈ puckel/docker-αž›αŸ†αž αžΌαžšαžαŸ’αž™αž›αŸ‹ - αžαŸ’αžšαžΌαžœαž”αŸ’αžšαžΆαž€αžŠαžαžΆαž–αž·αž“αž·αžαŸ’αž™αž˜αžΎαž›αžœαžΆαŸ” αž”αŸ’αžšαž αŸ‚αž›αž‡αžΆαž’αŸ’αž“αž€αž˜αž·αž“αžαŸ’αžšαžΌαžœαž€αžΆαžšαž’αŸ’αžœαžΈαž•αŸ’αžŸαŸαž„αž‘αŸ€αžαž“αŸ…αž€αŸ’αž“αž»αž„αž‡αžΈαžœαž·αžαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αž‘αŸαŸ”
  • αž€αžΆαžšαž€αŸ†αžŽαžαŸ‹αž›αŸ†αž αžΌαžšαžαŸ’αž™αž›αŸ‹αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž’αžΆαž…αž”αŸ’αžšαžΎαž”αžΆαž“αž˜αž·αž“αžαŸ’αžšαžΉαž˜αžαŸ‚αžαžΆαž˜αžšαž™αŸˆ airflow.cfgαž”αŸ‰αž»αž“αŸ’αžαŸ‚αžαžΆαž˜αžšαž™αŸˆαž’αžαŸαžšαž”αžšαž·αžŸαŸ’αžαžΆαž“ (αž’αžšαž‚αž»αžŽαž…αŸ†αž–αŸ„αŸ‡αž’αŸ’αž“αž€αž’αž—αž·αžœαžŒαŸ’αžαž“αŸ) αžŠαŸ‚αž›αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž‘αžΆαž‰αž™αž€αž”αŸ’αžšαž™αŸ„αž‡αž“αŸαžŠαŸ„αž™αž–αŸ’αž™αžΆαž”αžΆαž‘αŸ”
  • αžαžΆαž˜αž’αž˜αŸ’αž˜αž‡αžΆαžαž· αžœαžΆαž˜αž·αž“αž˜αŸ‚αž“αž‡αžΆαž•αž›αž·αžαž€αž˜αŸ’αž˜αžŠαŸ‚αž›αžαŸ’αžšαŸ€αž˜αžšαž½αž…αž‡αžΆαžŸαŸ’αžšαŸαž…αž‘αŸαŸ– αžαŸ’αž‰αž»αŸ†αž˜αž·αž“αž˜αžΆαž“αž…αŸαžαž“αžΆαž˜αž·αž“αžŠαžΆαž€αŸ‹αž”αŸαŸ‡αžŠαžΌαž„αž“αŸ…αž›αžΎαž’αž»αž„αž‘αŸ αžαŸ’αž‰αž»αŸ†αž˜αž·αž“αžšαŸ†αžαžΆαž“αžŸαž“αŸ’αžαž·αžŸαž»αžαž‘αŸαŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž’αŸ’αžœαžΎαž’αž”αŸ’αž”αž”αžšαž˜αžΆαžŸαž˜αžšαž˜αŸ’αž™αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž’αŸ’αž“αž€αž–αž·αžŸαŸ„αž’αž“αŸαžšαž”αžŸαŸ‹αž™αžΎαž„αŸ”
  • αž…αŸ†αžŽαžΆαŸ†β€‹αžαžΆ:
    • ថត Dag αžαŸ’αžšαžΌαžœαžαŸ‚αž’αžΆαž…αž…αžΌαž›αž”αŸ’αžšαžΎαž”αžΆαž“αž‘αžΆαŸ†αž„αž’αŸ’αž“αž€αž€αŸ†αžŽαžαŸ‹αž–αŸαž› αž“αž·αž„αž€αž˜αŸ’αž˜αž€αžšαŸ”
    • αžŠαžΌαž…αž‚αŸ’αž“αžΆαž“αŸαŸ‡αžŠαŸ‚αžšαž’αž“αž»αžœαžαŸ’αžαž…αŸ†αž–αŸ„αŸ‡αž”αžŽαŸ’αžŽαžΆαž›αŸαž™αž—αžΆαž‚αžΈαž‘αžΈαž”αžΈαž‘αžΆαŸ†αž„αž’αžŸαŸ‹ - αž–αž½αž€αž‚αŸαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αžαŸ’αžšαžΌαžœαžαŸ‚αžαŸ’αžšαžΌαžœαž”αžΆαž“αžŠαŸ†αž‘αžΎαž„αž“αŸ…αž›αžΎαž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αžŠαŸ‚αž›αž˜αžΆαž“αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž€αŸ†αžŽαžαŸ‹αž–αŸαž› αž“αž·αž„αž€αž˜αŸ’αž˜αž€αžšαŸ”

αž₯αž‘αžΌαžœαž“αŸαŸ‡αžœαžΆαžŸαžΆαž˜αž‰αŸ’αž‰αž αžΎαž™αŸ–

$ docker-compose up --scale worker=3

αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž–αžΈαž’αŸ’αžœαžΈαŸ—αž€αžΎαž“αž‘αžΎαž„ αž’αŸ’αž“αž€αž’αžΆαž…αž˜αžΎαž›αž…αŸ†αžŽαž»αž…αž”αŸ’αžšαž‘αžΆαž€αŸ‹αž‚αŸαž αž‘αŸ†αž–αŸαžšαŸ–

αž‚αŸ†αž“αž·αžαž‡αžΆαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“

αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž’αŸ’αž“αž€αž˜αž·αž“αž™αž›αŸ‹αž’αŸ’αžœαžΈαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž“αŸ…αž€αŸ’αž“αž»αž„ "dags" αž‘αžΆαŸ†αž„αž“αŸαŸ‡αž‘αŸαž“αŸ„αŸ‡ αž“αŸαŸ‡αž‚αžΊαž‡αžΆαžœαž…αž“αžΆαž“αž»αž€αŸ’αžšαž˜αžαŸ’αž›αžΈαž˜αž½αž™αŸ–

  • αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž€αŸ†αžŽαžαŸ‹αž–αŸαž›αžœαŸαž›αžΆ - αž–αžΌαžŸαŸ†αžαžΆαž“αŸ‹αž”αŸ†αž•αž»αžαž“αŸ…αž€αŸ’αž“αž»αž„ Airflow αžŠαŸ‚αž›αž‚αŸ’αžšαž”αŸ‹αž‚αŸ’αžšαž„αžαžΆαž˜αž“αž»αžŸαŸ’αžŸαž™αž“αŸ’αžαž’αŸ’αžœαžΎαž€αžΆαžšαž™αŸ‰αžΆαž„αž›αŸ†αž”αžΆαž€ αž αžΎαž™αž˜αž·αž“αž˜αŸ‚αž“αž‡αžΆαž˜αž“αž»αžŸαŸ’αžŸαž‘αŸαŸ– αžαžΆαž˜αžŠαžΆαž“αž€αžΆαž›αžœαž·αž—αžΆαž‚ αž’αŸ’αžœαžΎαž”αž…αŸ’αž…αž»αž”αŸ’αž”αž“αŸ’αž“αž—αžΆαž– dags αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž€αž·αž…αŸ’αž…αž€αžΆαžšαŸ”

    αž‡αžΆαž‘αžΌαž‘αŸ…αž“αŸ…αž€αŸ’αž“αž»αž„αž€αŸ†αžŽαŸ‚αž…αžΆαžŸαŸ‹ αž‚αžΆαžαŸ‹αž˜αžΆαž“αž”αž‰αŸ’αž αžΆαž‡αžΆαž˜αž½αž™αž“αžΉαž„αž€αžΆαžšαž…αž„αž…αžΆαŸ† (αž‘αŸ αž˜αž·αž“αž˜αŸ‚αž“αž€αžΆαžšαž—αŸ’αž›αŸαž…αž—αŸ’αž›αžΆαŸ†αž„αž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž€αžΆαžšαž›αŸαž…αž’αŸ’αž›αžΆαž™) αž αžΎαž™αž”αŸ‰αžΆαžšαŸ‰αžΆαž˜αŸ‰αŸ‚αžαŸ’αžšαž€αŸαžšαŸ’αžαž·αŸαžŠαŸ†αžŽαŸ‚αž›αž“αŸ…αžαŸ‚αž˜αžΆαž“αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžΆαžšαž€αŸ†αžŽαžαŸ‹αŸ” run_duration - αž…αž“αŸ’αž›αŸ„αŸ‡αž–αŸαž›αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž‘αžΎαž„αžœαž·αž‰αžšαž”αžŸαŸ‹αžœαžΆαŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž₯αž‘αžΌαžœαž“αŸαŸ‡αž’αŸ’αžœαžΈαŸ—αž‚αžΊαž›αŸ’αž’αŸ”

  • DAG (aka "dag") - "αž€αŸ’αžšαžΆαž αŸ’αžœαžŠαŸ‚αž›αžŠαžΉαž€αž“αžΆαŸ†αžŠαŸ„αž™ acyclic" αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž“αž·αž™αž˜αž“αŸαž™αž”αŸ‚αž”αž“αŸαŸ‡αž“αžΉαž„αž”αŸ’αžšαžΆαž”αŸ‹αž˜αž“αž»αžŸαŸ’αžŸαžαž·αž…αžαž½αž… αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžαžΆαž˜αž–αž·αžαžœαžΆαž‚αžΊαž‡αžΆαž€αž»αž„αžαžΊαž“αŸαžšαžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž€αž·αž…αŸ’αž…αž€αžΆαžšαžŠαŸ‚αž›αž‘αžΆαž€αŸ‹αž‘αž„αž‚αŸ’αž“αžΆαž‘αŸ…αžœαž·αž‰αž‘αŸ…αž˜αž€ (αžŸαžΌαž˜αž˜αžΎαž›αžαžΆαž„αž€αŸ’αžšαŸ„αž˜) ឬ analogue αž“αŸƒαž€αž‰αŸ’αž…αž”αŸ‹αž“αŸ…αž€αŸ’αž“αž»αž„ SSIS αž“αž·αž„αž›αŸ†αž αžΌαžšαž€αžΆαžšαž„αžΆαžšαž“αŸ…αž€αŸ’αž“αž»αž„ Informatica .

    αž”αž“αŸ’αžαŸ‚αž˜αž–αžΈαž›αžΎ dags αž’αžΆαž…αž“αŸ…αžαŸ‚αž˜αžΆαž“ subdags αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž™αžΎαž„αž‘αŸ†αž“αž„αž‡αžΆαž“αžΉαž„αž˜αž·αž“αž‘αž‘αž½αž›αž”αžΆαž“αž–αž½αž€αžœαžΆαž‘αŸαŸ”

  • DAG αžšαžαŸ‹ - αžŠαžΎαž˜αžŠαžΆαž…αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž”αžΆαž“αž…αžΆαžαŸ‹αžαžΆαŸ†αž„αžŠαŸ„αž™αžαŸ’αž›αž½αž“αžœαžΆαž•αŸ’αž‘αžΆαž›αŸ‹ execution_date. Dagrans αž“αŸƒ dag αžŠαžΌαž…αž‚αŸ’αž“αžΆαž’αžΆαž…αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžŸαŸ’αžšαž”αž‚αŸ’αž“αžΆ (αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž’αŸ’αž“αž€αž”αžΆαž“αž’αŸ’αžœαžΎαž±αŸ’αž™αž€αž·αž…αŸ’αž…αž€αžΆαžšαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αž˜αž·αž“αž˜αžΆαž“αž€αž˜αŸ’αž›αžΆαŸ†αž„ αž–αž·αžαžŽαžΆαžŸαŸ‹)αŸ”
  • αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžš αž‚αžΊαž‡αžΆαž”αŸ†αžŽαŸ‚αž€αž“αŸƒαž€αžΌαžŠαžŠαŸ‚αž›αž‘αž‘αž½αž›αžαž»αžŸαžαŸ’αžšαžΌαžœαž€αŸ’αž“αž»αž„αž€αžΆαžšαž’αž“αž»αžœαžαŸ’αžαžŸαž€αž˜αŸ’αž˜αž—αžΆαž–αž‡αžΆαž€αŸ‹αž›αžΆαž€αŸ‹αž˜αž½αž™αŸ” αž˜αžΆαž“αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž”αžΈαž”αŸ’αžšαž—αŸαž‘αŸ–
    • αžŸαž€αž˜αŸ’αž˜αž—αžΆαž–αžŠαžΌαž…αž‡αžΆαžŸαŸ†αžŽαž–αŸ’αžœαžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” PythonOperatorαžŠαŸ‚αž›αž’αžΆαž…αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžΌαžŠ Python αžŽαžΆαž˜αž½αž™ (αžαŸ’αžšαžΉαž˜αžαŸ’αžšαžΌαžœ) αŸ”
    • αž€αžΆαžšαž•αŸ’αž‘αŸαžšαž”αŸ’αžšαžΆαž€αŸ‹αžŠαŸ‚αž›αžŠαžΉαž€αž‡αž‰αŸ’αž‡αžΌαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž–αžΈαž€αž“αŸ’αž›αŸ‚αž„αž˜αž½αž™αž‘αŸ…αž€αž“αŸ’αž›αŸ‚αž„αž˜αž½αž™ αž“αž·αž™αžΆαž™αžαžΆ MsSqlToHiveTransfer;
    • αž’αž„αŸ’αž‚αž‰αžΆαžŽ αž˜αŸ‰αŸ’αž™αžΆαž„αžœαž·αž‰αž‘αŸ€αž αžœαžΆαž“αžΉαž„αž’αž“αž»αž‰αŸ’αž‰αžΆαžαž±αŸ’αž™αž’αŸ’αž“αž€αž˜αžΆαž“αž”αŸ’αžšαžαž·αž€αž˜αŸ’αž˜ αž¬αž”αž“αŸ’αžαž™αž›αŸ’αž”αžΏαž“αž“αŸƒαž€αžΆαžšαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž”αž“αŸ’αžαŸ‚αž˜αž‘αŸ€αžαžšαž αžΌαžαžŠαž›αŸ‹αž–αŸ’αžšαžΉαžαŸ’αžαž·αž€αžΆαžšαžŽαŸαž˜αž½αž™αž€αžΎαžαž‘αžΎαž„αŸ” HttpSensor αž’αžΆαž…αž‘αžΆαž‰αž…αŸ†αžŽαž»αž…αž”αž‰αŸ’αž…αž”αŸ‹αžŠαŸ‚αž›αž”αžΆαž“αž”αž‰αŸ’αž‡αžΆαž€αŸ‹ αž αžΎαž™αž“αŸ…αž–αŸαž›αžŠαŸ‚αž›αž€αžΆαžšαž†αŸ’αž›αžΎαž™αžαž”αžŠαŸ‚αž›αž…αž„αŸ‹αž”αžΆαž“αž€αŸ†αž–αž»αž„αžšαž„αŸ‹αž…αžΆαŸ† αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž€αžΆαžšαž•αŸ’αž‘αŸαžš GoogleCloudStorageToS3Operator. αž…αž·αžαŸ’αžαžŠαŸ‚αž›αž…αž„αŸ‹αžŠαžΉαž„αž…αž„αŸ‹αžƒαžΎαž‰αž“αžΉαž„αžŸαž½αžšαžαžΆ β€œαž αŸαžαž»αž’αŸ’αžœαžΈ? αž™αŸ‰αžΆαž„αžŽαžΆαž˜αž·αž‰ αž’αŸ’αž“αž€αž’αžΆαž…αž’αŸ’αžœαžΎαž–αžΆαž€αŸ’αž™αžŠαžŠαŸ‚αž›αŸ—αž“αŸ…αž€αŸ’αž“αž»αž„αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžš!” αž αžΎαž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αžŠαžΎαž˜αŸ’αž”αžΈαž€αž»αŸ†αž±αŸ’αž™αžŸαŸ’αž‘αŸ‡αž’αžΆαž„αž“αŸƒαž—αžΆαžšαž€αž·αž…αŸ’αž…αž‡αžΆαž˜αž½αž™αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαžŠαŸ‚αž›αž•αŸ’αž’αžΆαž€αŸ” αž§αž”αž€αžšαžŽαŸαž…αžΆαž”αŸ‹αžŸαž‰αŸ’αž‰αžΆαž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜ αž–αž·αž“αž·αžαŸ’αž™ αž“αž·αž„αž„αžΆαž”αŸ‹ αž˜αž»αž“αž–αŸαž›αž€αžΆαžšαž”αŸ‰αž»αž“αž”αŸ‰αž„αž›αžΎαž€αž€αŸ’αžšαŸ„αž™αŸ”
  • αž€αž·αž…αŸ’αž…αž€αžΆαžšαŸ” - αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαžŠαŸ‚αž›αž”αžΆαž“αž”αŸ’αžšαž€αžΆαžŸ αžŠαŸ„αž™αž˜αž·αž“αž‚αž·αžαž–αžΈαž”αŸ’αžšαž—αŸαž‘ αž“αž·αž„αž—αŸ’αž‡αžΆαž”αŸ‹αž‡αžΆαž˜αž½αž™αžŠαžΆαž€ αžαŸ’αžšαžΌαžœαž”αžΆαž“αžαž˜αŸ’αž›αžΎαž„αž‹αžΆαž“αŸˆαž—αžΆαžšαž€αž·αž…αŸ’αž…αŸ”
  • αž§αž‘αžΆαž αžšαžŽαŸαž—αžΆαžšαž€αž·αž…αŸ’αž… - αž“αŸ…αž–αŸαž›αžŠαŸ‚αž›αž’αŸ’αž“αž€αžšαŸ€αž”αž…αŸ†αž•αŸ‚αž“αž€αžΆαžšαž‘αžΌαž‘αŸ…αž”αžΆαž“αžŸαž˜αŸ’αžšαŸαž…αž…αž·αžαŸ’αžαžαžΆαžœαžΆαžŠαž›αŸ‹αž–αŸαž›αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž”αž‰αŸ’αž‡αžΌαž“αž—αžΆαžšαž€αž·αž…αŸ’αž…αž‘αŸ…αž€αŸ’αž“αž»αž„αžŸαž˜αžšαž—αžΌαž˜αž·αž›αžΎαž’αŸ’αž“αž€αžŸαŸ†αžŠαŸ‚αž„ (αž“αŸ…αž“αžΉαž„αž€αž“αŸ’αž›αŸ‚αž„ αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž™αžΎαž„αž”αŸ’αžšαžΎ LocalExecutor αž¬αž‘αŸ…αžαŸ’αž“αžΆαŸ†αž„αž–αžΈαž…αž˜αŸ’αž„αžΆαž™αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈ CeleryExecutor) αžœαžΆαž•αŸ’αžαž›αŸ‹αž”αžšαž·αž”αž‘αžŠαž›αŸ‹αž–αž½αž€αž‚αŸ (ឧ. αžŸαŸ†αžŽαž»αŸ†αž“αŸƒαž’αžαŸαžš - αž”αŸ‰αžΆαžšαŸ‰αžΆαž˜αŸ‰αŸ‚αžαŸ’αžšαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·) αž–αž„αŸ’αžšαžΈαž€αž–αžΆαž€αŸ’αž™αž”αž‰αŸ’αž‡αžΆ αž¬αž‚αŸ†αžšαžΌαžŸαŸ†αžŽαž½αžš αž αžΎαž™αž”αž‰αŸ’αž…αžΌαž›αž–αž½αž€αžœαžΆαŸ”

αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž—αžΆαžšαž€αž·αž…αŸ’αž…

αž‡αžΆαžŠαŸ†αž”αžΌαž„ αž…αžΌαžšαž™αžΎαž„αž‚αžΌαžŸαž”αž‰αŸ’αž‡αžΆαž€αŸ‹αž’αŸ†αž–αžΈαž‚αŸ’αžšαŸ„αž„αž€αžΆαžšαžŽαŸαž‘αžΌαž‘αŸ…αž“αŸƒ doug αžšαž”αžŸαŸ‹αž™αžΎαž„ αž αžΎαž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž™αžΎαž„αž“αžΉαž„αž…αžΌαž›αž‘αŸ…αž€αŸ’αž“αž»αž„αž–αŸαžαŸŒαž˜αžΆαž“αž›αž˜αŸ’αž’αž·αžαž€αžΆαž“αŸ‹αžαŸ‚αž…αŸ’αžšαžΎαž“αž‘αžΎαž„αŸ— αž–αžΈαž–αŸ’αžšαŸ„αŸ‡αž™αžΎαž„αž’αž“αž»αžœαžαŸ’αžαžŠαŸ†αžŽαŸ„αŸ‡αžŸαŸ’αžšαžΆαž™αžŠαŸ‚αž›αž˜αž·αž“αžŸαŸ†αžαžΆαž“αŸ‹αž˜αž½αž™αž…αŸ†αž“αž½αž“αŸ”

αžŠαžΌαž…αŸ’αž“αŸαŸ‡ αž€αŸ’αž“αž»αž„αž‘αž˜αŸ’αžšαž„αŸ‹αžŠαŸαžŸαžΆαž˜αž‰αŸ’αž‰αž”αŸ†αž•αž»αžαžšαž”αžŸαŸ‹αžœαžΆ αžŠαžΆαžœαž”αŸ‚αž”αž“αŸαŸ‡αž“αžΉαž„αž˜αžΎαž›αž‘αŸ…αžŠαžΌαž…αž“αŸαŸ‡αŸ–

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

αž…αžΌαžšαž™αžΎαž„αžŠαŸ„αŸ‡αžŸαŸ’αžšαžΆαž™αžœαžΆαŸ–

  • αžŠαŸ†αž”αžΌαž„αž™αžΎαž„αž“αžΆαŸ†αž…αžΌαž› libs αž…αžΆαŸ†αž”αžΆαž…αŸ‹αž“αž·αž„ αž’αŸ’αžœαžΈβ€‹αž•αŸ’αžŸαŸαž„αž‘αŸ€αž;
  • sql_server_ds - αž“αŸαŸ‡β€‹αž‚αžΊαž‡αžΆ List[namedtuple[str, str]] αž‡αžΆαž˜αž½αž™αž“αžΉαž„αžˆαŸ’αž˜αŸ„αŸ‡αž“αŸƒαž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αž–αžΈ Airflow Connections αž“αž·αž„αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αžŠαŸ‚αž›αž™αžΎαž„αž“αžΉαž„αž™αž€αž…αžΆαž“αžšαž”αžŸαŸ‹αž™αžΎαž„;
  • dag - αžŸαŸαž…αž€αŸ’αžαžΈαž”αŸ’αžšαž€αžΆαžŸαž’αŸ†αž–αžΈαžŠαžΆαž€αžšαž”αžŸαŸ‹αž™αžΎαž„ αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαžαŸ‚αž…αžΆαŸ†αž”αžΆαž…αŸ‹αž“αŸ…αž€αŸ’αž“αž»αž„ globals()αž”αžΎαž˜αž·αž“αžŠαžΌαž…αŸ’αž“αŸαŸ‡αž‘αŸ Airflow αž“αžΉαž„αž˜αž·αž“αžŸαŸ’αžœαŸ‚αž„αžšαž€αžœαžΆαž‘αŸαŸ” Doug αž€αŸαžαŸ’αžšαžΌαžœαž“αž·αž™αžΆαž™αžαžΆαŸ–
    • αžαžΎβ€‹αž‚αžΆαžαŸ‹β€‹αžˆαŸ’αž˜αŸ„αŸ‡β€‹αž’αžΈ orders - αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αžˆαŸ’αž˜αŸ„αŸ‡αž“αŸαŸ‡αž“αžΉαž„αž”αž„αŸ’αž αžΆαž‰αž“αŸ…αž€αŸ’αž“αž»αž„αž…αŸ†αžŽαž»αž…αž”αŸ’αžšαž‘αžΆαž€αŸ‹αž”αžŽαŸ’αžŠαžΆαž‰
    • αžαžΆαž‚αžΆαžαŸ‹αž“αžΉαž„αž’αŸ’αžœαžΎαž€αžΆαžšαž…αžΆαž”αŸ‹αž–αžΈαž–αžΆαž€αŸ‹αž€αžŽαŸ’αžαžΆαž›αž’αž’αŸ’αžšαžΆαžαŸ’αžšαžαŸ’αž„αŸƒαž‘αžΈαž”αŸ’αžšαžΆαŸ†αž”αžΈαžαŸ‚αž€αž€αŸ’αž€αžŠαžΆαŸ”
    • αž αžΎαž™αžœαžΆαž‚αž½αžšαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž”αŸ’αžšαž αŸ‚αž›αžšαŸ€αž„αžšαžΆαž›αŸ‹ 6 αž˜αŸ‰αŸ„αž„αž˜αŸ’αžαž„ (αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž”αž»αžšαžŸαžšαžΉαž„αžšαžΌαžŸαž“αŸ…αž‘αžΈαž“αŸαŸ‡αž‡αŸ†αž“αž½αžŸαž±αŸ’αž™ timedelta() αž’αžΆαž…αž‘αž‘αž½αž›αž™αž€αž”αžΆαž“ cron- αž”αž“αŸ’αž‘αžΆαžαŸ‹ 0 0 0/6 ? * * *, αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžαŸ’αžšαž‡αžΆαž€αŸ‹αžαž·αž… - αž€αž“αŸ’αžŸαŸ„αž˜αžŠαžΌαž…αž‡αžΆ @daily);
  • workflow() αž“αžΉαž„αž’αŸ’αžœαžΎαž€αžΆαžšαž„αžΆαžšαžŸαŸ†αžαžΆαž“αŸ‹ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž˜αž·αž“αž˜αŸ‚αž“αž₯αž‘αžΌαžœαž“αŸαŸ‡αž‘αŸαŸ” αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž–αŸαž›αž“αŸαŸ‡ αž™αžΎαž„αž“αžΉαž„αž‘αž˜αŸ’αž›αžΆαž€αŸ‹αž”αžšαž·αž”αž‘αžšαž”αžŸαŸ‹αž™αžΎαž„αž‘αŸ…αž€αŸ’αž“αž»αž„αž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αŸ”
  • αž αžΎαž™αž₯αž‘αžΌαžœαž“αŸαŸ‡αžœαŸαž‘αž˜αž“αŸ’αžαžŸαžΆαž˜αž‰αŸ’αž‰αž“αŸƒαž€αžΆαžšαž”αž„αŸ’αž€αžΎαžαž—αžΆαžšαž€αž·αž…αŸ’αž…:
    • αž™αžΎαž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžαžΆαž˜αžšαž™αŸˆαž”αŸ’αžšαž—αž–αžšαž”αžŸαŸ‹αž™αžΎαž„;
    • αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜ PythonOperatorαžŠαŸ‚αž›αž“αžΉαž„αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž’αžαŸ‹αž…αŸαŸ‡αžŸαŸ„αŸ‡αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” workflow(). αž€αž»αŸ†αž—αŸ’αž›αŸαž…αž”αž‰αŸ’αž‡αžΆαž€αŸ‹αžˆαŸ’αž˜αŸ„αŸ‡αžαŸ‚αž˜αž½αž™αž‚αžαŸ‹ (αž“αŸ…αž€αŸ’αž“αž»αž„αžŠαžΆαž€) αž“αŸƒαž€αž·αž…αŸ’αž…αž€αžΆαžš αž αžΎαž™αž…αž„αžŠαžΆαž€αžŠαŸ„αž™αžαŸ’αž›αž½αž“αž―αž„αŸ” αž‘αž„αŸ‹αž‡αžΆαžαž· provide_context αž“αŸ…αž€αŸ’αž“αž»αž„αžœαŸαž“αž“αžΉαž„αž…αžΆαž€αŸ‹αž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹αž”αž“αŸ’αžαŸ‚αž˜αž‘αŸ…αž€αŸ’αž“αž»αž„αž˜αž»αžαž„αžΆαžšαžŠαŸ‚αž›αž™αžΎαž„αž“αžΉαž„αž”αŸ’αžšαž˜αžΌαž›αžŠαŸ„αž™αž”αŸ’αžšαž»αž„αž”αŸ’αžšαž™αŸαžαŸ’αž“αžŠαŸ„αž™αž”αŸ’αžšαžΎ **context.

αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž–αŸαž›αž“αŸαŸ‡ αž“αŸ„αŸ‡αž αžΎαž™αž‡αžΆαž’αŸ’αžœαžΈαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αŸ” αž’αŸ’αžœαžΈαžŠαŸ‚αž›αž™αžΎαž„αž‘αž‘αž½αž›αž”αžΆαž“αŸ–

  • dag αžαŸ’αž˜αžΈαž“αŸ…αž€αŸ’αž“αž»αž„αž…αŸ†αžŽαž»αž…αž”αŸ’αžšαž‘αžΆαž€αŸ‹αž”αžŽαŸ’αžŠαžΆαž‰,
  • αž€αž·αž…αŸ’αž…αž€αžΆαžšαž˜αž½αž™αž“αž·αž„αž€αž“αŸ’αž›αŸ‡αžšαž™αžŠαŸ‚αž›αž“αžΉαž„αžαŸ’αžšαžΌαžœαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αžŸαŸ’αžšαž”αž‚αŸ’αž“αžΆ (αž”αŸ’αžšαžŸαž·αž“αž”αžΎ Airflow, αž€αžΆαžšαž€αŸ†αžŽαžαŸ‹ Celery αž“αž·αž„αžŸαž˜αžαŸ’αžαž—αžΆαž–αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž˜αŸαž’αž“αž»αž‰αŸ’αž‰αžΆαž)αŸ”

αž‡αž·αžαž”αžΆαž“αž αžΎαž™αŸ”

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›
αžαžΎαž’αŸ’αž“αž€αžŽαžΆαž“αžΉαž„αžŠαŸ†αž‘αžΎαž„αž—αžΆαž–αž’αžΆαžŸαŸ’αžšαŸαž™?

αžŠαžΎαž˜αŸ’αž”αžΈβ€‹αžŸαž˜αŸ’αžšαž½αž›β€‹αžšαžΏαž„β€‹αž‘αžΆαŸ†αž„β€‹αž’αžŸαŸ‹β€‹αž“αŸαŸ‡ αžαŸ’αž‰αž»αŸ†β€‹αž”αžΆαž“β€‹αž…αžΌαž› docker-compose.yml αžŠαŸ†αžŽαžΎαžšαž€αžΆαžš requirements.txt αž“αŸ…αž›αžΎαžαŸ’αž“αžΆαŸ†αž„αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αŸ”

αž₯αž‘αžΌαžœβ€‹αž”αžΆαžαŸ‹β€‹αž αžΎαž™αŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž€αžΆαžšαŸ‰αŸαž–αžŽαŸŒαž”αŸ’αžšαž•αŸαŸ‡αž‚αžΊαž‡αžΆαž€αž·αž…αŸ’αž…αž€αžΆαžšαžŠαŸ‚αž›αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžŠαŸ„αž™αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž€αŸ†αžŽαžαŸ‹αž–αŸαž›αŸ”

αž™αžΎαž„β€‹αžšαž„αŸ‹αž…αžΆαŸ†β€‹αž”αž“αŸ’αžαž·αž… αž€αžΆαžšαž„αžΆαžšβ€‹αžαŸ’αžšαžΌαžœβ€‹αž”αžΆαž“β€‹αž…αžΆαž”αŸ‹β€‹αž™αž€β€‹αžŠαŸ„αž™β€‹αž€αž˜αŸ’αž˜αž€αžšαŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž”αŸƒαžαž„ αž–αž·αžαžŽαžΆαžŸαŸ‹αž”αžΆαž“αž”αž‰αŸ’αž…αž”αŸ‹αž€αžΆαžšαž„αžΆαžšαžšαž”αžŸαŸ‹αž–αž½αž€αž‚αŸαžŠαŸ„αž™αž‡αŸ„αž‚αž‡αŸαž™αŸ” αž αž„αŸ’αžŸαž€αŸ’αžšαž αž˜αž˜αž·αž“αžŸαžΌαžœαž‡αŸ„αž‚αž‡αŸαž™αž‘αŸαŸ”

αžŠαŸ„αž™αžœαž·αž’αžΈαž“αŸαŸ‡αž˜αž·αž“αž˜αžΆαž“αžαžαž“αŸ…αž›αžΎαž•αž›αž·αžαž•αž›αžšαž”αžŸαŸ‹αž™αžΎαž„αž‘αŸαŸ” ./dagsαž˜αž·αž“αž˜αžΆαž“αž€αžΆαžšαž’αŸ’αžœαžΎαžŸαž˜αž€αžΆαž›αž€αž˜αŸ’αž˜αžšαžœαžΆαž„αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž‘αŸ - dags αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αžŸαŸ’αžαž·αžαž“αŸ…αž€αŸ’αž“αž»αž„ git αž“αŸ…αž›αžΎ Gitlab αžšαž”αžŸαŸ‹αž™αžΎαž„ αž αžΎαž™ Gitlab CI αž…αŸ‚αž€αž…αžΆαž™αž€αžΆαžšαž’αžΆαž”αŸ‹αžŠαŸαžαž‘αŸ…αž€αžΆαž“αŸ‹αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž“αŸ…αž–αŸαž›αž”αž‰αŸ’αž…αžΌαž›αž…αžΌαž›αž‚αŸ’αž“αžΆαŸ” master.

αž”αž“αŸ’αžαž·αž…αž’αŸ†αž–αžΈαž•αŸ’αž€αžΆ

αžαžŽαŸˆβ€‹αž–αŸαž›β€‹αžŠαŸ‚αž›β€‹αž€αž˜αŸ’αž˜αž€αžšβ€‹αž€αŸ†αž–αž»αž„β€‹αžœαžΆαž™β€‹αž€αŸ’αž”αžΆαž›β€‹αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“β€‹αžαŸ’αžšαž‡αžΆαž€αŸ‹β€‹αžšαž”αžŸαŸ‹β€‹αž™αžΎαž„ αžŸαžΌαž˜β€‹αž…αž„αž…αžΆαŸ†β€‹αž§αž”αž€αžšαžŽαŸβ€‹αž˜αž½αž™β€‹αž‘αŸ€αžβ€‹αžŠαŸ‚αž›β€‹αž’αžΆαž…β€‹αž”αž„αŸ’αž αžΆαž‰β€‹αž’αŸ’αžœαžΈβ€‹αž˜αž½αž™β€‹αžŠαž›αŸ‹β€‹αž™αžΎαž„β€‹αž‚αžΊ αž•αŸ’αž€αžΆαŸ”

αž‘αŸ†αž–αŸαžšαž‘αžΈαž˜αž½αž™αžŠαŸ‚αž›αž˜αžΆαž“αž–αŸαžαŸŒαž˜αžΆαž“αžŸαž„αŸ’αžαŸαž”αž’αŸ†αž–αžΈαžαŸ’αž“αžΆαŸ†αž„αž€αž˜αŸ’αž˜αž€αžšαŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž‘αŸ†αž–αŸαžšαžŠαŸ‚αž›αžαŸ’αž›αžΆαŸ†αž„αž”αŸ†αž•αž»αžαž‡αžΆαž˜αž½αž™αž“αžΉαž„αž—αžΆαžšαž€αž·αž…αŸ’αž…αžŠαŸ‚αž›αž”αžΆαž“αž‘αŸ…αž’αŸ’αžœαžΎαž€αžΆαžšαŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž‘αŸ†αž–αŸαžšαžŠαŸ‚αž›αž‚αž½αžšαž±αŸ’αž™αž’αž»αž‰αž‘αŸ’αžšαžΆαž“αŸ‹αž”αŸ†αž•αž»αžαž‡αžΆαž˜αž½αž™αž“αžΉαž„αžŸαŸ’αžαžΆαž“αž—αžΆαž–αž“αŸƒαžˆαŸ’αž˜αž½αž‰αž€αžŽαŸ’αžαžΆαž›αžšαž”αžŸαŸ‹αž™αžΎαž„:

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž‘αŸ†αž–αŸαžšαž—αŸ’αž›αžΊαž”αŸ†αž•αž»αžαž‚αžΊαž‡αžΆαž˜αž½αž™αž“αžΉαž„αž€αŸ’αžšαžΆαž αŸ’αžœαžŸαŸ’αžαžΆαž“αž—αžΆαž–αž—αžΆαžšαž€αž·αž…αŸ’αž… αž“αž·αž„αž–αŸαž›αžœαŸαž›αžΆαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αžšαž”αžŸαŸ‹αž–αž½αž€αž‚αŸαŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž™αžΎαž„αž•αŸ’αž‘αž»αž€αž›αžΎαžŸαž‘αž˜αŸ’αž„αž“αŸ‹

αžŠαžΌαž…αŸ’αž“αŸαŸ‡ αž€αž·αž…αŸ’αž…αž€αžΆαžšβ€‹αž‘αžΆαŸ†αž„β€‹αž’αžŸαŸ‹β€‹αž”αžΆαž“β€‹αžŸαž˜αŸ’αžšαŸαž…β€‹αž αžΎαž™ αž’αŸ’αž“αž€β€‹αž’αžΆαž…β€‹αž™αž€β€‹αž’αŸ’αž“αž€β€‹αžšαž”αž½αžŸβ€‹αž…αŸαž‰β€‹αž”αžΆαž“αŸ”

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž αžΎαž™αž˜αžΆαž“αž’αŸ’αž“αž€αžšαž„αžšαž”αž½αžŸαž‡αžΆαž…αŸ’αžšαžΎαž“ - αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž αŸαžαž»αž•αž›αž˜αž½αž™αž¬αž•αŸ’αžŸαŸαž„αž‘αŸ€αžαŸ” αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈαž“αŸƒαž€αžΆαžšαž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹αžαŸ’αžšαžΉαž˜αžαŸ’αžšαžΌαžœαž“αŸƒ Airflow αž€αžΆαžšαŸ‰αŸαž‘αžΆαŸ†αž„αž“αŸαŸ‡αž”αž„αŸ’αž αžΆαž‰αžαžΆαž‘αž·αž“αŸ’αž“αž“αŸαž™αž–αž·αžαž‡αžΆαž˜αž·αž“αž”αžΆαž“αž˜αž€αžŠαž›αŸ‹αž‘αŸαŸ”

αž’αŸ’αž“αž€β€‹αžαŸ’αžšαžΌαžœβ€‹αž˜αžΎαž›β€‹αž€αŸ†αžŽαžαŸ‹β€‹αž αŸαžαž» αž αžΎαž™β€‹αž…αžΆαž”αŸ‹β€‹αž•αŸ’αžŠαžΎαž˜β€‹αž€αž·αž…αŸ’αž…αž€αžΆαžšβ€‹αžŠαŸ‚αž›β€‹αž’αŸ’αž›αžΆαž€αŸ‹β€‹αž…αž»αŸ‡β€‹αž‘αžΎαž„β€‹αžœαž·αž‰αŸ”

αžŠαŸ„αž™αž…αž»αž…αž›αžΎαž€αžΆαžšαŸ‰αŸαžŽαžΆαž˜αž½αž™ αž™αžΎαž„αž“αžΉαž„αžƒαžΎαž‰αžŸαž€αž˜αŸ’αž˜αž—αžΆαž–αžŠαŸ‚αž›αž˜αžΆαž“αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž™αžΎαž„αŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž’αŸ’αž“αž€αž’αžΆαž…αž™αž€αž“αž·αž„αž’αŸ’αžœαžΎαž±αŸ’αž™αž‡αž˜αŸ’αžšαŸ‡αž€αžΆαžšαžŠαž½αž›αžšαž›αŸ†αŸ” αž“αŸ„αŸ‡αž‚αžΊαž™αžΎαž„αž—αŸ’αž›αŸαž…αžαžΆαž˜αžΆαž“αž’αŸ’αžœαžΈαž˜αž½αž™αž”αžΆαž“αž”αžšαžΆαž‡αŸαž™αž“αŸ…αž‘αžΈαž“αŸ„αŸ‡ αž αžΎαž™αž€αž·αž…αŸ’αž…αž€αžΆαžšαž§αž‘αžΆαž αžšαžŽαŸαžŠαžΌαž…αž‚αŸ’αž“αžΆαž“αžΉαž„αž‘αŸ…αž€αžΆαž“αŸ‹αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž€αŸ†αžŽαžαŸ‹αž–αŸαž›αŸ”

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αžœαžΆαž…αŸ’αž”αžΆαžŸαŸ‹αžŽαžΆαžŸαŸ‹αžαžΆαž€αžΆαžšαž’αŸ’αžœαžΎαž”αŸ‚αž”αž“αŸαŸ‡αžŠαŸ„αž™αž”αŸ’αžšαžΎαž€αžŽαŸ’αžαž»αžšαž‡αžΆαž˜αž½αž™αž“αžΉαž„αž€αžΆαžšαŸ‰αŸαž€αŸ’αžšαž αž˜αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž‚αžΊαž˜αž·αž“αž˜αžΆαž“αž˜αž“αž»αžŸαŸ’αžŸαž’αž˜αŸŒαž‘αŸ - αž“αŸαŸ‡αž˜αž·αž“αž˜αŸ‚αž“αž‡αžΆαž’αŸ’αžœαžΈαžŠαŸ‚αž›αž™αžΎαž„αžšαŸ†αž–αžΉαž„αž–αžΈ Airflow αž‘αŸαŸ” αžαžΆαž˜αž’αž˜αŸ’αž˜αž‡αžΆαžαž· αž™αžΎαž„αž˜αžΆαž“αž’αžΆαžœαž»αž’αž”αŸ’αžšαž›αŸαž™αž›αŸ„αž€αŸ– Browse/Task Instances

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αžαŸ„αŸ‡β€‹αž‡αŸ’αžšαžΎαžŸαžšαžΎαžŸβ€‹αž’αŸ’αžœαžΈβ€‹αž‚αŸ’αžšαž”αŸ‹β€‹αž™αŸ‰αžΆαž„β€‹αž€αŸ’αž“αž»αž„β€‹αž–αŸαž›β€‹αžαŸ‚β€‹αž˜αž½αž™ αž αžΎαž™β€‹αž€αŸ†αžŽαžαŸ‹β€‹αž‘αžΎαž„β€‹αžœαž·αž‰β€‹αž‘αŸ…β€‹αžŸαžΌαž“αŸ’αž™ αž…αž»αž…β€‹αž’αžΆαžαž»β€‹αžŠαŸ‚αž›β€‹αžαŸ’αžšαžΉαž˜αžαŸ’αžšαžΌαžœαŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž–αžΈαžŸαž˜αŸ’αž’αžΆαžαžšαž½αž… αž‘αžΆαž“αžαžΆαž€αŸ‹αžŸαŸŠαžΈαžšαž”αžŸαŸ‹αž™αžΎαž„αž˜αžΎαž›αž‘αŸ…αžŠαžΌαž…αž“αŸαŸ‡ (αž–αž½αž€αž‚αŸαž€αŸ†αž–αž»αž„αžšαž„αŸ‹αž…αžΆαŸ†αž’αŸ’αž“αž€αžšαŸ€αž”αž…αŸ†αž€αžΆαž›αžœαž·αž—αžΆαž‚αžšαž½αž…αž αžΎαž™)αŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹ αž‘αŸ†αž–αž€αŸ‹ αž“αž·αž„αž’αžαŸαžšαž•αŸ’αžŸαŸαž„αž‘αŸ€αžαŸ”

αžœαžΆαžŠαž›αŸ‹αž–αŸαž›αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž˜αžΎαž› DAG αž”αž“αŸ’αž‘αžΆαž”αŸ‹ update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

αžαžΎαž’αŸ’αž“αž€αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž‚αŸ’αž“αžΆαž’αŸ’αž›αžΆαž”αŸ‹αž’αŸ’αžœαžΎαž”αž…αŸ’αž…αž»αž”αŸ’αž”αž“αŸ’αž“αž—αžΆαž–αžšαž”αžΆαž™αž€αžΆαžšαžŽαŸαž‘αŸ? αž“αŸαŸ‡αž‚αžΊαž‡αžΆαžšαž”αžŸαŸ‹αž“αžΆαž„αž˜αŸ’αžαž„αž‘αŸ€αž: αž˜αžΆαž“αž”αž‰αŸ’αž‡αžΈαž”αŸ’αžšαž—αž–αž–αžΈαž€αž“αŸ’αž›αŸ‚αž„αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž™αž€αž‘αž·αž“αŸ’αž“αž“αŸαž™; αž˜αžΆαž“αž”αž‰αŸ’αž‡αžΈαž€αž“αŸ’αž›αŸ‚αž„αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαžŠαžΆαž€αŸ‹; αž€αž»αŸ†β€‹αž—αŸ’αž›αŸαž…β€‹αž›αžΎαž€β€‹αžŸαžšαžŸαžΎαžšβ€‹αž–αŸαž›β€‹αž’αŸ’αžœαžΈαŸ—β€‹αž”αžΆαž“β€‹αž€αžΎαžβ€‹αž‘αžΎαž„β€‹αž¬β€‹αž”αŸ‚αž€αž”αžΆαž€αŸ‹β€‹αž‚αŸ’αž“αžΆ (αž›αŸ’αž’ αž“αŸαŸ‡β€‹αž˜αž·αž“β€‹αž˜αŸ‚αž“β€‹αž’αŸ†αž–αžΈβ€‹αž™αžΎαž„β€‹αž‘αŸ)αŸ”

αžαŸ„αŸ‡β€‹αž…αžΌαž›β€‹αž‘αŸ…β€‹αž˜αžΎαž›β€‹αž―αž€αžŸαžΆαžšβ€‹αž˜αŸ’αžŠαž„β€‹αž‘αŸ€αžβ€‹αž αžΎαž™β€‹αž˜αžΎαž›β€‹αžšαžΏαž„β€‹αž˜αž·αž“β€‹αž…αŸ’αž”αžΆαžŸαŸ‹β€‹αž›αžΆαžŸαŸ‹β€‹αžαŸ’αž˜αžΈαŸ–

  • from commons.operators import TelegramBotSendMessage - αž‚αŸ’αž˜αžΆαž“αž’αŸ’αžœαžΈαžšαžΆαžšαžΆαŸ†αž„αž™αžΎαž„αž–αžΈαž€αžΆαžšαž”αž„αŸ’αž€αžΎαžαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“αžšαž”αžŸαŸ‹αž™αžΎαž„ αžŠαŸ‚αž›αž™αžΎαž„αž‘αžΆαž‰αž™αž€αž”αŸ’αžšαž™αŸ„αž‡αž“αŸαž–αžΈαž€αžΆαžšαž”αž„αŸ’αž€αžΎαžαž€αž‰αŸ’αž…αž”αŸ‹αžαžΌαž…αž˜αž½αž™αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž•αŸ’αž‰αžΎαžŸαžΆαžšαž‘αŸ…αž€αžΆαž“αŸ‹ Unblocked αŸ” (αž™αžΎαž„αž“αžΉαž„αž“αž·αž™αžΆαž™αž”αž“αŸ’αžαŸ‚αž˜αž‘αŸ€αžαž’αŸ†αž–αžΈαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž“αŸαŸ‡αžαžΆαž„αž€αŸ’αžšαŸ„αž˜);
  • default_args={} - dag αž’αžΆαž…αž…αŸ‚αž€αž…αžΆαž™αž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹αžŠαžΌαž…αž‚αŸ’αž“αžΆαž‘αŸ…αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αžšαž”αžŸαŸ‹αžœαžΆαŸ”
  • to='{{ var.value.all_the_kings_men }}' - αžœαžΆαž› to αž™αžΎαž„αž“αžΉαž„αž˜αž·αž“αž˜αžΆαž“αž€αžΆαžšαžŸαžšαžŸαŸαžšαž€αžΌαžŠαžšαžΉαž„αž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžαŸ’αžšαžΌαžœαž”αžΆαž“αž”αž„αŸ’αž€αžΎαžαžŠαŸ„αž™αžαžΆαž˜αžœαž“αŸ’αžαžŠαŸ„αž™αž”αŸ’αžšαžΎ Jinja αž“αž·αž„αž’αžαŸαžšαž‡αžΆαž˜αž½αž™αž“αžΉαž„αž”αž‰αŸ’αž‡αžΈαž’αŸŠαžΈαž˜αŸ‚αž› αžŠαŸ‚αž›αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αžŠαžΆαž€αŸ‹αžŠαŸ„αž™αž”αŸ’αžšαž»αž„αž”αŸ’αžšαž™αŸαžαŸ’αž“αŸ” Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - αž›αž€αŸ’αžαžαžŽαŸ’αžŒαžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž€αžΆαžšαž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαŸ” αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈαžšαž”αžŸαŸ‹αž™αžΎαž„ αžŸαŸ†αž”αž»αžαŸ’αžšαž“αžΉαž„αž αŸ„αŸ‡αž αžΎαžšαž‘αŸ…αž€αžΆαž“αŸ‹αžαŸ…αž€αŸ‚αž›αž»αŸ‡αžαŸ’αžšαžΆαžαŸ‚αž—αžΆαž–αž’αžΆαžŸαŸ’αžšαŸαž™αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž”αžΆαž“αžŠαŸ†αžŽαžΎαžšαž€αžΆαžš αžŠαŸ„αž™αž‡αŸ„αž‚αž‡αŸαž™;
  • tg_bot_conn_id='tg_main' - αž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹ conn_id αž‘αž‘αž½αž›αž™αž€αž›αŸαžαžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αžŠαŸ‚αž›αž™αžΎαž„αž”αž„αŸ’αž€αžΎαž Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - αžŸαžΆαžšαž€αŸ’αž“αž»αž„ Telegram αž“αžΉαž„αž αŸ„αŸ‡αž αžΎαžšαž‘αŸ…αž†αŸ’αž„αžΆαž™ αž›αž»αŸ‡αžαŸ’αžšαžΆαžαŸ‚αž˜αžΆαž“αž€αž·αž…αŸ’αž…αž€αžΆαžšαž’αŸ’αž›αžΆαž€αŸ‹αŸ”
  • task_concurrency=1 - αž™αžΎαž„αž αžΆαž˜αžƒαžΆαžαŸ‹αž€αžΆαžšαž”αžΎαž€αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž€αŸ’αž“αž»αž„αž–αŸαž›αžŠαŸ†αžŽαžΆαž›αž‚αŸ’αž“αžΆαž“αŸƒαž€αž·αž…αŸ’αž…αž€αžΆαžšαž‡αžΆαž…αŸ’αžšαžΎαž“αž“αŸƒαž€αž·αž…αŸ’αž…αž€αžΆαžšαž˜αž½αž™αŸ” αž”αžΎαž˜αž·αž“αžŠαžΌαž…αŸ’αž“αŸαŸ‡αž‘αŸ αž™αžΎαž„αž“αžΉαž„αž‘αž‘αž½αž›αž”αžΆαž“αž€αžΆαžšαž”αžΎαž€αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž€αŸ’αž“αž»αž„αž–αŸαž›αžŠαŸ†αžŽαžΆαž›αž‚αŸ’αž“αžΆαž‡αžΆαž…αŸ’αžšαžΎαž“αŸ” VerticaOperator (αž˜αžΎαž›αžαžΆαžšαžΆαž„αž˜αž½αž™);
  • report_update >> [email, tg] - αž‘αžΆαŸ†αž„αž’αžŸαŸ‹ VerticaOperator αžšαž½αž˜β€‹αž”αž‰αŸ’αž…αžΌαž›β€‹αž‚αŸ’αž“αžΆβ€‹αž€αŸ’αž“αž»αž„β€‹αž€αžΆαžšβ€‹αž•αŸ’αž‰αžΎβ€‹αžŸαŸ†αž”αž»αžαŸ’αžš αž“αž·αž„β€‹αžŸαžΆαžšβ€‹αžŠαžΌαž…β€‹αž“αŸαŸ‡αŸ–
    Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

    αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž…αžΆαž”αŸ‹αžαžΆαŸ†αž„αž–αžΈαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž‡αžΌαž“αžŠαŸ†αžŽαžΉαž„αž˜αžΆαž“αž›αž€αŸ’αžαžαžŽαŸ’αžŒαž“αŸƒαž€αžΆαžšαž”αžΎαž€αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž•αŸ’αžŸαŸαž„αž‚αŸ’αž“αžΆ αž˜αžΆαž“αžαŸ‚αž˜αž½αž™αž”αŸ‰αž»αžŽαŸ’αžŽαŸ„αŸ‡αžŠαŸ‚αž›αž“αžΉαž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαŸ” αž“αŸ…αž€αŸ’αž“αž»αž„αž‘αž·αžŠαŸ’αž‹αž—αžΆαž–αž˜αŸ‚αž€αž’αžΆαž„ αž’αŸ’αžœαžΈαŸ—αž˜αžΎαž›αž‘αŸ…αž αžΆαž€αŸ‹αžŠαžΌαž…αž‡αžΆαž˜αž·αž“αžŸαžΌαžœαž…αŸ’αž”αžΆαžŸαŸ‹αž”αž“αŸ’αžαž·αž…αŸ–
    Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αžαŸ’αž‰αž»αŸ†αž“αžΉαž„αž“αž·αž™αžΆαž™αž–αžΆαž€αŸ’αž™αž–αžΈαžšαž”αžΈαž’αŸ†αž–αžΈ αž˜αŸ‰αžΆαž€αŸ’αžšαžΌ αž“αž·αž„αž˜αž·αžαŸ’αžαž—αž€αŸ’αžαž·αžšαž”αžŸαŸ‹αž–αž½αž€αž‚αŸ - ធថេរ.

Macros αž‚αžΊαž‡αžΆαž€αž“αŸ’αž›αŸ‚αž„αžŠαžΆαž€αŸ‹ Jinja αžŠαŸ‚αž›αž’αžΆαž…αž‡αŸ†αž“αž½αžŸαž–αŸαžαŸŒαž˜αžΆαž“αž˜αžΆαž“αž”αŸ’αžšαž™αŸ„αž‡αž“αŸαž•αŸ’αžŸαŸαž„αŸ—αž‘αŸ…αž€αŸ’αž“αž»αž„αž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹αžšαž”αžŸαŸ‹αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαŸ” αž§αž‘αžΆαž αžšαžŽαŸαžŠαžΌαž…αž“αŸαŸ‡αŸ–

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} αž“αžΉαž„αž–αž„αŸ’αžšαžΈαž€αž‘αŸ…αž˜αžΆαžαž·αž€αžΆαž“αŸƒαž’αžαŸαžšαž”αžšαž·αž”αž‘ execution_date αž€αŸ’αž“αž»αž„αž‘αŸ’αžšαž„αŸ‹αž‘αŸ’αžšαžΆαž™ YYYY-MM-DD: 2020-07-14. αž•αŸ’αž“αŸ‚αž€αžŠαŸαž›αŸ’αž’αž”αŸ†αž•αž»αžαž‚αžΊαžαžΆαž’αžαŸαžšαž”αžšαž·αž”αž‘αžαŸ’αžšαžΌαžœαž”αžΆαž“αž—αŸ’αž‡αžΆαž”αŸ‹αž‘αŸ…αž“αžΉαž„αž§αž‘αžΆαž αžšαžŽαŸαž€αž·αž…αŸ’αž…αž€αžΆαžšαž‡αžΆαž€αŸ‹αž›αžΆαž€αŸ‹αž˜αž½αž™ (αž€αžΆαžšαŸ‰αŸαž€αŸ’αž“αž»αž„αž‘αž·αžŠαŸ’αž‹αž—αžΆαž–αž˜αŸ‚αž€αž’αžΆαž„) αž αžΎαž™αž“αŸ…αž–αŸαž›αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž‘αžΎαž„αžœαž·αž‰ αž€αž“αŸ’αž›αŸ‚αž„αžŠαžΆαž€αŸ‹αž“αžΉαž„αž–αž„αŸ’αžšαžΈαž€αž‘αŸ…αžαž˜αŸ’αž›αŸƒαžŠαžΌαž…αž‚αŸ’αž“αžΆαŸ”

αžαž˜αŸ’αž›αŸƒαžŠαŸ‚αž›αž”αžΆαž“αž€αŸ†αžŽαžαŸ‹αž’αžΆαž…αžαŸ’αžšαžΌαžœαž”αžΆαž“αž˜αžΎαž›αžŠαŸ„αž™αž”αŸ’αžšαžΎαž”αŸŠαžΌαžαž»αž„αž”αž„αŸ’αž αžΆαž‰αž“αŸ…αž›αžΎαž§αž‘αžΆαž αžšαžŽαŸαž—αžΆαžšαž€αž·αž…αŸ’αž…αž“αžΈαž˜αž½αž™αŸ—αŸ” αž“αŸαŸ‡αž‡αžΆαžšαž”αŸ€αž”αžŠαŸ‚αž›αž—αžΆαžšαž€αž·αž…αŸ’αž…αž•αŸ’αž‰αžΎαžŸαŸ†αž”αž»αžαŸ’αžšαŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž αžΎαž™αžŠαžΌαž…αŸ’αž“αŸαŸ‡αž“αŸ…αž―αž—αžΆαžšαž€αž·αž…αŸ’αž…αž‡αžΆαž˜αž½αž™αž“αžΉαž„αž€αžΆαžšαž•αŸ’αž‰αžΎαžŸαžΆαžšαž˜αž½αž™:

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž”αž‰αŸ’αž‡αžΈαž–αŸαž‰αž›αŸαž‰αž“αŸƒαž˜αŸ‰αžΆαž€αŸ’αžšαžΌαžŠαŸ‚αž›αž—αŸ’αž‡αžΆαž”αŸ‹αž˜αž€αž‡αžΆαž˜αž½αž™αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž€αŸ†αžŽαŸ‚αž…αž»αž„αž€αŸ’αžšαŸ„αž™αž”αŸ†αž•αž»αžαž’αžΆαž…αžšαž€αž”αžΆαž“αž“αŸ…αž‘αžΈαž“αŸαŸ‡αŸ– αžŸαŸαž…αž€αŸ’αžαžΈαž™αŸ„αž„αž˜αŸ‰αžΆαž€αŸ’αžšαžΌ

αž›αžΎαžŸαž–αžΈαž“αŸαŸ‡αž‘αŸ…αž‘αŸ€αž αžŠαŸ„αž™αž˜αžΆαž“αž‡αŸ†αž“αž½αž™αž–αžΈαž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž‡αŸ†αž“αž½αž™ αž™αžΎαž„αž’αžΆαž…αž”αŸ’αžšαž€αžΆαžŸαž˜αŸ‰αžΆαž€αŸ’αžšαžΌαž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“αžšαž”αžŸαŸ‹αž™αžΎαž„ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž“αŸ„αŸ‡αž‡αžΆαžšαžΏαž„αž˜αž½αž™αž‘αŸ€αžαŸ”

αž”αž“αŸ’αžαŸ‚αž˜αž–αžΈαž›αžΎαžœαžαŸ’αžαž»αžŠαŸ‚αž›αž”αžΆαž“αž€αŸ†αžŽαžαŸ‹αž‡αžΆαž˜αž»αž“ αž™αžΎαž„αž’αžΆαž…αž‡αŸ†αž“αž½αžŸαžαž˜αŸ’αž›αŸƒαž“αŸƒαž’αžαŸαžšαžšαž”αžŸαŸ‹αž™αžΎαž„ (αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž”αŸ’αžšαžΎαžœαžΆαžšαž½αž…αž αžΎαž™αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžΌαžŠαžαžΆαž„αž›αžΎ)αŸ” αž…αžΌαžšαž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž“αŸ…αž€αŸ’αž“αž»αž„ Admin/Variables αž–αžΈαžšαž”αžΈαžšαžΏαž„αŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž’αŸ’αžœαžΈαž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αžŠαŸ‚αž›αž’αŸ’αž“αž€αž’αžΆαž…αž”αŸ’αžšαžΎαŸ–

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

αžαž˜αŸ’αž›αŸƒαž’αžΆαž…αž‡αžΆαž˜αžΆαžαŸ’αžšαžŠαŸ’αž‹αžΆαž“ αž¬αž€αŸαž’αžΆαž…αž‡αžΆ JSON αž•αž„αžŠαŸ‚αžšαŸ” αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈ JSONαŸ–

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

αž‚αŸ’αžšαžΆαž“αŸ‹αžαŸ‚αž”αŸ’αžšαžΎαž•αŸ’αž›αžΌαžœαž‘αŸ…αž€αžΆαž“αŸ‹αž‚αž“αŸ’αž›αžΉαŸ‡αžŠαŸ‚αž›αž…αž„αŸ‹αž”αžΆαž“αŸ– {{ var.json.bot_config.bot.token }}.

αžαŸ’αž‰αž»αŸ†αž“αžΉαž„αž“αž·αž™αžΆαž™αž–αžΆαž€αŸ’αž™αž˜αž½αž™αžƒαŸ’αž›αžΆ αž“αž·αž„αž”αž„αŸ’αž αžΆαž‰αžšαžΌαž”αžαžαž’αŸαž€αŸ’αžšαž„αŸ‹αž˜αž½αž™αž’αŸ†αž–αžΈ αž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹. αž’αŸ’αžœαžΈαž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αž‚αžΊαžŸαŸ†αžαžΆαž“αŸ‹αž“αŸ…αž‘αžΈαž“αŸαŸ‡: αž“αŸ…αž›αžΎαž‘αŸ†αž–αŸαžš Admin/Connections αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹ αž”αž“αŸ’αžαŸ‚αž˜αž€αžΆαžšαž…αžΌαž›/αž–αžΆαž€αŸ’αž™αžŸαž˜αŸ’αž„αžΆαžαŸ‹αžšαž”αžŸαŸ‹αž™αžΎαž„ αž“αž·αž„αž”αŸ‰αžΆαžšαŸ‰αžΆαž˜αŸ‰αŸ‚αžαŸ’αžšαž‡αžΆαž€αŸ‹αž›αžΆαž€αŸ‹αž”αž“αŸ’αžαŸ‚αž˜αž‘αŸ€αžαž“αŸ…αž‘αžΈαž“αŸ„αŸ‡αŸ” αžŠαžΌαž…αž“αŸαŸ‡αŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž–αžΆαž€αŸ’αž™β€‹αžŸαž˜αŸ’αž„αžΆαžαŸ‹β€‹αž’αžΆαž…β€‹αžαŸ’αžšαžΌαžœβ€‹αž”αžΆαž“β€‹αž’αŸŠαž·αž“αž‚αŸ’αžšαžΈαž” (β€‹αž›αŸ’αž’αž·αžαž›αŸ’αž’αž“αŸ‹β€‹αž‡αžΆαž„β€‹αž›αŸ†αž“αžΆαŸ†αžŠαžΎαž˜β€‹) αž¬β€‹αž’αŸ’αž“αž€β€‹αž’αžΆαž…β€‹αž‘αž»αž€β€‹αž”αŸ’αžšαž—αŸαž‘β€‹αž€αžΆαžšβ€‹αžαž—αŸ’αž‡αžΆαž”αŸ‹ (αžŠαžΌαž…β€‹αžŠαŸ‚αž›β€‹αžαŸ’αž‰αž»αŸ†β€‹αž”αžΆαž“β€‹αž’αŸ’αžœαžΎβ€‹αžŸαž˜αŸ’αžšαžΆαž”αŸ‹ tg_main) - αž€αžΆαžšαž–αž·αžαž‚αžΊαžαžΆαž”αž‰αŸ’αž‡αžΈαž“αŸƒαž”αŸ’αžšαž—αŸαž‘αž‚αžΊαžšαžΉαž„αž“αŸ…αž€αŸ’αž“αž»αž„αž˜αŸ‰αžΌαžŠαŸ‚αž› Airflow αž αžΎαž™αž˜αž·αž“αž’αžΆαž…αž–αž„αŸ’αžšαžΈαž€αžŠαŸ„αž™αž˜αž·αž“αž…αžΆαŸ†αž”αžΆαž…αŸ‹αž…αžΌαž›αž‘αŸ…αž€αŸ’αž“αž»αž„αž€αžΌαžŠαž”αŸ’αžšαž—αž–αž‘αŸ (αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž—αŸ’αž›αžΆαž˜αŸ—αž“αŸ„αŸ‡αžαŸ’αž‰αž»αŸ†αž˜αž·αž“αž”αžΆαž“ Google αž’αŸ’αžœαžΈαž˜αž½αž™ αžŸαžΌαž˜αž€αŸ‚αžαž˜αŸ’αžšαžΌαžœαžαŸ’αž‰αž»αŸ†) αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž‚αŸ’αž˜αžΆαž“αž’αŸ’αžœαžΈαž“αžΉαž„αžšαžΆαžšαžΆαŸ†αž„αž™αžΎαž„αž–αžΈαž€αžΆαžšαž‘αž‘αž½αž›αž”αžΆαž“αž€αŸ’αžšαŸαžŒαžΈαžαžŠαŸ„αž™αž‚αŸ’αžšαžΆαž“αŸ‹αžαŸ‚ αžˆαŸ’αž˜αŸ„αŸ‡αŸ”

αž’αŸ’αž“αž€αž€αŸαž’αžΆαž…αž’αŸ’αžœαžΎαž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αž‡αžΆαž…αŸ’αžšαžΎαž“αžŠαŸ‚αž›αž˜αžΆαž“αžˆαŸ’αž˜αŸ„αŸ‡αžŠαžΌαž…αž‚αŸ’αž“αžΆ: αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈαž“αŸαŸ‡αžœαž·αž’αžΈαžŸαžΆαžŸαŸ’αžαŸ’αžš BaseHook.get_connection()αžŠαŸ‚αž›αž’αŸ’αžœαžΎαž±αŸ’αž™αž™αžΎαž„αž—αŸ’αž‡αžΆαž”αŸ‹αž‘αŸ†αž“αžΆαž€αŸ‹αž‘αŸ†αž“αž„αžαžΆαž˜αžˆαŸ’αž˜αŸ„αŸ‡αž“αžΉαž„αž•αŸ’αžαž›αŸ‹αž±αŸ’αž™ αž…αŸƒαžŠαž“αŸ’αž™ αž–αžΈαžˆαŸ’αž˜αŸ„αŸ‡αž‡αžΆαž…αŸ’αžšαžΎαž“ (αžœαžΆαž“αžΉαž„αžŸαž˜αž αŸαžαž»αž•αž›αž‡αžΆαž„αž€αŸ’αž“αž»αž„αž€αžΆαžšαž”αž„αŸ’αž€αžΎαž Round Robin αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžŸαžΌαž˜αž‘αž»αž€αžœαžΆαž“αŸ…αž›αžΎαž˜αž“αžŸαž·αž€αžΆαžšαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αž’αž—αž·αžœαžŒαŸ’αžαž“αŸ Airflow)αŸ”

Variables αž“αž·αž„ Connections αž‚αžΊαž‡αžΆαž§αž”αž€αžšαžŽαŸαžŠαŸαžαŸ’αžšαž‡αžΆαž€αŸ‹ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžœαžΆαžŸαŸ†αžαžΆαž“αŸ‹αžŽαžΆαžŸαŸ‹αžŠαŸ‚αž›αž˜αž·αž“αžαŸ’αžšαžΌαžœαž”αžΆαžαŸ‹αž”αž„αŸ‹αžŸαž˜αžαž»αž›αŸ’αž™αŸ– αžαžΎαž•αŸ’αž“αŸ‚αž€αžŽαžΆαž˜αž½αž™αž“αŸƒαž›αŸ†αž αžΌαžšαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αžŠαŸ‚αž›αž’αŸ’αž“αž€αžšαž€αŸ’αžŸαžΆαž‘αž»αž€αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžΌαžŠαžαŸ’αž›αž½αž“αž―αž„ αž“αž·αž„αž•αŸ’αž“αŸ‚αž€αžŽαžΆαž˜αž½αž™αžŠαŸ‚αž›αž’αŸ’αž“αž€αž•αŸ’αžαž›αŸ‹αž±αŸ’αž™ Airflow αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž•αŸ’αž‘αž»αž€αŸ” αž˜αŸ’αž™αŸ‰αžΆαž„αžœαž·αž‰αž‘αŸ€αž αžœαžΆαž’αžΆαž…αž˜αžΆαž“αž—αžΆαž–αž„αžΆαž™αžŸαŸ’αžšαž½αž›αž€αŸ’αž“αž»αž„αž€αžΆαžšαž•αŸ’αž›αžΆαžŸαŸ‹αž”αŸ’αžαžΌαžšαžαž˜αŸ’αž›αŸƒαž™αŸ‰αžΆαž„αž†αžΆαž”αŸ‹αžšαž αŸαžŸ αž§αž‘αžΆαž αžšαžŽαŸ αž”αŸ’αžšαž’αž”αŸ‹αžŸαŸ†αž”αž»αžαŸ’αžšαžαžΆαž˜αžšαž™αŸˆ UI αŸ” αž˜αŸ‰αŸ’αž™αžΆαž„αžœαž·αž‰αž‘αŸ€αž αž“αŸαŸ‡αž“αŸ…αžαŸ‚αž‡αžΆαž€αžΆαžšαžαŸ’αžšαž›αž”αŸ‹αž‘αŸ…αž€αžΆαžšαž…αž»αž…αž€αžŽαŸ’αžŠαž»αžš αžŠαŸ‚αž›αž™αžΎαž„ (αžαŸ’αž‰αž»αŸ†) αž…αž„αŸ‹αž€αž˜αŸ’αž…αžΆαžαŸ‹αŸ”

αž€αžΆαžšαž’αŸ’αžœαžΎαž€αžΆαžšαž‡αžΆαž˜αž½αž™αž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αž‚αžΊαž‡αžΆαž€αž·αž…αŸ’αž…αž€αžΆαžšαž˜αž½αž™αŸ” αž‘αŸ†αž–αž€αŸ‹. αž‡αžΆαž‘αžΌαž‘αŸ… αž‘αŸ†αž–αž€αŸ‹ Airflow αž‚αžΊαž‡αžΆαž…αŸ†αžŽαž»αž…αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž—αŸ’αž‡αžΆαž”αŸ‹αžœαžΆαž‘αŸ…αžŸαŸαžœαžΆαž€αž˜αŸ’αž˜αž—αžΆαž‚αžΈαž‘αžΈαž”αžΈ αž“αž·αž„αž”αžŽαŸ’αžŽαžΆαž›αŸαž™αŸ” ឧ. JiraHook αž“αžΉαž„αž”αžΎαž€αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž—αŸ’αž‰αŸ€αžœαžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž–αž½αž€αž™αžΎαž„αžŠαžΎαž˜αŸ’αž”αžΈαž’αŸ’αžœαžΎαž’αž“αŸ’αžαžšαž€αž˜αŸ’αž˜αž‡αžΆαž˜αž½αž™ Jira (αž’αŸ’αž“αž€αž’αžΆαž…αž•αŸ’αž›αžΆαžŸαŸ‹αž‘αžΈαž—αžΆαžšαž€αž·αž…αŸ’αž…αž‘αŸ…αž˜αž€) αž“αž·αž„αžŠαŸ„αž™αž˜αžΆαž“αž‡αŸ†αž“αž½αž™αž–αžΈ SambaHook αž’αŸ’αž“αž€αž’αžΆαž…αžšαž»αž‰αž―αž€αžŸαžΆαžšαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αŸ… smb- αž…αŸ†αžŽαž»αž…αŸ”

αž€αžΆαžšαž‰αŸ‚αž€αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“

αž αžΎαž™β€‹αž™αžΎαž„β€‹αžαž·αžβ€‹αž‘αŸ…β€‹αž‡αž·αžβ€‹αžŠαžΎαž˜αŸ’αž”αžΈβ€‹αž˜αžΎαž›β€‹αž–αžΈβ€‹αžšαž”αŸ€αž”β€‹αžŠαŸ‚αž›β€‹αžœαžΆβ€‹αžαŸ’αžšαžΌαžœβ€‹αž”αžΆαž“β€‹αž‚αŸβ€‹αž•αž›αž·αž TelegramBotSendMessage

αž›αŸαžαž€αžΌαžŠ commons/operators.py αž‡αžΆαž˜αž½αž™αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž–αž·αžαž”αŸ’αžšαžΆαž€αžŠαŸ–

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

αž“αŸ…αž‘αžΈαž“αŸαŸ‡ αžŠαžΌαž…αž‡αžΆαž’αŸ’αžœαžΈαŸ—αž•αŸ’αžŸαŸαž„αž‘αŸ€αžαž“αŸ…αž€αŸ’αž“αž»αž„ Airflow αž’αŸ’αžœαžΈαž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αž‚αžΊαžŸαžΆαž˜αž‰αŸ’αž‰αžŽαžΆαžŸαŸ‹αŸ–

  • αž‘αž‘αž½αž›αž˜αžšαžαž€αž–αžΈ BaseOperatorαžŠαŸ‚αž›αž’αž“αž»αžœαžαŸ’αžαžšαžΏαž„αž‡αžΆαž€αŸ‹αž›αžΆαž€αŸ‹αž˜αž½αž™αž…αŸ†αž“αž½αž“αž“αŸƒαž›αŸ†αž αžΌαžšαžαŸ’αž™αž›αŸ‹ (αž˜αžΎαž›αž€αžΆαžšαž€αž˜αŸ’αžŸαžΆαž“αŸ’αžαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€)
  • αžœαžΆαž›αžŠαŸ‚αž›αž”αžΆαž“αž”αŸ’αžšαž€αžΆαžŸ template_fieldsαžŠαŸ‚αž›αž€αŸ’αž“αž»αž„αž“αŸ„αŸ‡ Jinja αž“αžΉαž„αžŸαŸ’αžœαŸ‚αž„αžšαž€αž˜αŸ‰αžΆαž€αŸ’αžšαžΌαžŠαžΎαž˜αŸ’αž”αžΈαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαŸ”
  • αž”αžΆαž“αžšαŸ€αž”αž…αŸ†αž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹αžαŸ’αžšαžΉαž˜αžαŸ’αžšαžΌαžœαžŸαž˜αŸ’αžšαžΆαž”αŸ‹ __init__()αž€αŸ†αžŽαžαŸ‹αž›αŸ†αž“αžΆαŸ†αžŠαžΎαž˜αžαžΆαž˜αžαž˜αŸ’αžšαžΌαžœαž€αžΆαžšαŸ”
  • αž™αžΎαž„αž€αŸαž˜αž·αž“αž—αŸ’αž›αŸαž…αž’αŸ†αž–αžΈαž€αžΆαžšαž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž“αŸƒαž”αž»αž–αŸ’αžœαž”αž»αžšαžŸαžŠαŸ‚αžšαŸ”
  • αž”αžΆαž“αž”αžΎαž€αž‘αŸ†αž–αž€αŸ‹αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž‚αŸ’αž“αžΆαŸ” TelegramBotHookαž”αžΆαž“αž‘αž‘αž½αž›αžœαžαŸ’αžαž»αž’αžαž·αžαž·αž‡αž“αž–αžΈαžœαžΆαŸ”
  • αžœαž·αž’αžΈαžŸαžΆαžŸαŸ’αžšαŸ’αžαž”αžŠαž·αžŸαŸαž’ (αž€αŸ†αžŽαžαŸ‹αž‘αžΎαž„αžœαž·αž‰) BaseOperator.execute()αžŠαŸ‚αž› Airfow αž“αžΉαž„αž‰αŸαžšαž“αŸ…αž–αŸαž›αžŠαž›αŸ‹αž–αŸαž›αž”αžΎαž€αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžš - αž“αŸ…αž€αŸ’αž“αž»αž„αžœαžΆαž™αžΎαž„αž“αžΉαž„αž’αž“αž»αžœαžαŸ’αžαžŸαž€αž˜αŸ’αž˜αž—αžΆαž–αž…αž˜αŸ’αž”αž„αžŠαŸ„αž™αž—αŸ’αž›αŸαž…αž…αžΌαž›αŸ” (αž™αžΎαž„αž…αžΌαž› αžŠαŸ„αž™αžœαž·αž’αžΈαž“αŸαŸ‡ αž…αžΌαž› stdout ΠΈ stderr - αž›αŸ†αž αžΌαžšαžαŸ’αž™αž›αŸ‹αž“αžΉαž„αžŸαŸ’αž‘αžΆαž€αŸ‹αž…αžΆαž”αŸ‹αž’αŸ’αžœαžΈαŸ—αž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„ αžšαž»αŸ†αžœαžΆαž±αŸ’αž™αžŸαŸ’αž’αžΆαž αž”αŸ†αž”αŸ‚αž€αžœαžΆαžαžΆαž˜αžαž˜αŸ’αžšαžΌαžœαž€αžΆαžšαŸ” )

αžŸαžΌαž˜αž˜αžΎαž›αž’αŸ’αžœαžΈαžŠαŸ‚αž›αž™αžΎαž„αž˜αžΆαž“ commons/hooks.py. αž•αŸ’αž“αŸ‚αž€αžŠαŸ†αž”αžΌαž„αž“αŸƒαž―αž€αžŸαžΆαžšαžŠαŸ„αž™αž˜αžΆαž“αž‘αŸ†αž–αž€αŸ‹αžαŸ’αž›αž½αž“αžœαžΆαŸ–

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

αžαŸ’αž‰αž»αŸ†β€‹αž˜αž·αž“β€‹αžŠαžΉαž„β€‹αžαžΆβ€‹αžαŸ’αžšαžΌαžœβ€‹αž–αž“αŸ’αž™αž›αŸ‹β€‹αž™αŸ‰αžΆαž„β€‹αžŽαžΆβ€‹αž“αŸ…β€‹αž‘αžΈβ€‹αž“αŸαŸ‡ αžαŸ’αž‰αž»αŸ†β€‹αž‚αŸ’αžšαžΆαž“αŸ‹β€‹αžαŸ‚β€‹αž€αžαŸ‹β€‹αžŸαž˜αŸ’αž‚αžΆαž›αŸ‹β€‹αž…αŸ†αžŽαž»αž…β€‹αžŸαŸ†αžαžΆαž“αŸ‹αŸ—αŸ–

  • αž™αžΎαž„αž‘αž‘αž½αž›αž˜αžšαžαž€, αž‚αž·αžαž’αŸ†αž–αžΈαž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹ - αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈαž—αžΆαž‚αž…αŸ’αžšαžΎαž“αžœαžΆαž“αžΉαž„αž˜αžΆαž“αžαŸ‚αž˜αž½αž™: conn_id;
  • αž”αžŠαž·αžŸαŸαž’αžœαž·αž’αžΈαžŸαžΆαžŸαŸ’αžšαŸ’αžαžŸαŸ’αžαž„αŸ‹αžŠαžΆαžšαŸ– αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž€αŸ†αžŽαžαŸ‹αžαŸ’αž›αž½αž“αž―αž„ get_conn()αžŠαŸ‚αž›αž€αŸ’αž“αž»αž„αž“αŸ„αŸ‡αžαŸ’αž‰αž»αŸ†αž‘αž‘αž½αž›αž”αžΆαž“αž”αŸ‰αžΆαžšαŸ‰αžΆαž˜αŸ‰αŸ‚αžαŸ’αžšαž“αŸƒαž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αžαžΆαž˜αžˆαŸ’αž˜αŸ„αŸ‡αž αžΎαž™αž‚αŸ’αžšαžΆαž“αŸ‹αžαŸ‚αž‘αž‘αž½αž›αž”αžΆαž“αž•αŸ’αž“αŸ‚αž€ extra (αž“αŸαŸ‡αž‚αžΊαž‡αžΆαžœαžΆαž› JSON) αžŠαŸ‚αž›αžαŸ’αž‰αž»αŸ† (αž™αŸ„αž„αž‘αŸ…αžαžΆαž˜αž€αžΆαžšαžŽαŸ‚αž“αžΆαŸ†αž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“αžšαž”αžŸαŸ‹αžαŸ’αž‰αž»αŸ†!) αžŠαžΆαž€αŸ‹αž“αž·αž˜αž·αžαŸ’αžαžŸαž‰αŸ’αž‰αžΆ Telegram bot: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • αžαŸ’αž‰αž»αŸ†αž”αž„αŸ’αž€αžΎαžαž§αž‘αžΆαž αžšαžŽαŸαžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” TelegramBotαž•αŸ’αžαž›αŸ‹αž±αŸ’αž™αžœαžΆαž“αžΌαžœαžŸαž‰αŸ’αž‰αžΆαžŸαž˜αŸ’αž„αžΆαžαŸ‹αž‡αžΆαž€αŸ‹αž›αžΆαž€αŸ‹αž˜αž½αž™αŸ”

αž’αžŸαŸ‹αž αžΎαž™αŸ” αž’αŸ’αž“αž€αž’αžΆαž…αž‘αž‘αž½αž›αž”αžΆαž“αž’αžαž·αžαž·αž‡αž“αž–αžΈαž‘αŸ†αž–αž€αŸ‹αžŠαŸ„αž™αž”αŸ’αžšαžΎ TelegramBotHook().clent ឬ TelegramBotHook().get_conn().

αž αžΎαž™αž•αŸ’αž“αŸ‚αž€αž‘αžΈαž–αžΈαžšαž“αŸƒαž―αž€αžŸαžΆαžšαžŠαŸ‚αž›αžαŸ’αž‰αž»αŸ†αž”αž„αŸ’αž€αžΎαž microwrapper αžŸαž˜αŸ’αžšαžΆαž”αŸ‹ Telegram REST API αžŠαžΎαž˜αŸ’αž”αžΈαž€αž»αŸ†αž±αŸ’αž™αž’αžΌαžŸαžŠαžΌαž…αž‚αŸ’αž“αžΆ python-telegram-bot αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžœαž·αž’αžΈαžŸαžΆαžŸαŸ’αžšαŸ’αžαž˜αž½αž™αŸ” sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

αžœαž·αž’αžΈαžαŸ’αžšαžΉαž˜αžαŸ’αžšαžΌαžœαž‚αžΊαžαŸ’αžšαžΌαžœαž”αž“αŸ’αžαŸ‚αž˜αžœαžΆαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αŸ– TelegramBotSendMessage, TelegramBotHook, TelegramBot - αž“αŸ…αž€αŸ’αž“αž»αž„αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž‡αŸ†αž“αž½αž™ αžŠαžΆαž€αŸ‹αž€αŸ’αž“αž»αž„αžƒαŸ’αž›αžΆαŸ†αž„αžŸαžΆαž’αžΆαžšαžŽαŸˆ αž αžΎαž™αž•αŸ’αžαž›αŸ‹αž±αŸ’αž™αžœαžΆαž‘αŸ…αž”αŸ’αžšαž—αž–αž”αžΎαž€αž…αŸ†αž αŸ”

αžαžŽαŸˆαž–αŸαž›αžŠαŸ‚αž›αž™αžΎαž„αž€αŸ†αž–αž»αž„αžŸαž·αž€αŸ’αžŸαžΆαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž“αŸαŸ‡ αž€αžΆαžšαž’αŸ’αžœαžΎαž”αž…αŸ’αž…αž»αž”αŸ’αž”αž“αŸ’αž“αž—αžΆαž–αžšαž”αžΆαž™αž€αžΆαžšαžŽαŸαžšαž”αžŸαŸ‹αž™αžΎαž„αž”αžΆαž“αž”αžšαžΆαž‡αŸαž™αžŠαŸ„αž™αž‡αŸ„αž‚αž‡αŸαž™ αž αžΎαž™αž•αŸ’αž‰αžΎαžŸαžΆαžšαž€αŸ†αž αž»αžŸαž˜αž€αžαŸ’αž‰αž»αŸ†αž“αŸ…αž€αŸ’αž“αž»αž„αž”αŸ‰αž»αžŸαŸ’αžαž·αŸαŸ” αžαŸ’αž‰αž»αŸ†αž“αžΉαž„αž–αž·αž“αž·αžαŸ’αž™αž˜αžΎαž›αžαžΆαžαžΎαžœαžΆαžαž»αžŸ ...

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›
αž’αŸ’αžœαžΈαž˜αž½αž™αž”αžΆαž“αž•αŸ’αž‘αž»αŸ‡αž“αŸ…αž€αŸ’αž“αž»αž„αžŸαžαŸ’αžœαž†αŸ’αž€αŸ‚αžšαž”αžŸαŸ‹αž™αžΎαž„! αžαžΎαž“αŸ„αŸ‡αž˜αž·αž“αž˜αŸ‚αž“αž‡αžΆαž’αŸ’αžœαžΈαžŠαŸ‚αž›αž™αžΎαž„αžšαŸ†αž–αžΉαž„αž‘αž»αž€αž‘αŸ? αž™αŸ‰αžΆαž„β€‹αž–αž·αžαž”αŸ’αžšαžΆαž€αžŠ!

αžαžΎαž’αŸ’αž“αž€αž“αžΉαž„αž…αžΆαž€αŸ‹αž‘αŸ?

αžαžΎαž’αŸ’αž“αž€αž˜αžΆαž“αž’αžΆαžšαž˜αŸ’αž˜αžŽαŸαžαžΆαžαŸ’αž‰αž»αŸ†αž“αžΉαž€αž’αŸ’αžœαžΈαž˜αž½αž™αž‘αŸ? αžœαžΆαž αžΆαž€αŸ‹αž”αžΈαžŠαžΌαž…αž‡αžΆαž‚αžΆαžαŸ‹αž”αžΆαž“αžŸαž“αŸ’αž™αžΆαžαžΆαž“αžΉαž„αž•αŸ’αž‘αŸαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αž–αžΈ SQL Server αž‘αŸ… Vertica αž αžΎαž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž‚αžΆαžαŸ‹αž”αžΆαž“αž™αž€αžœαžΆαž…αŸαž‰ αž αžΎαž™αž•αŸ’αž›αžΆαžŸαŸ‹αž”αŸ’αžαžΌαžšαž”αŸ’αžšαž’αžΆαž“αž”αž‘αž“αŸαŸ‡ αž‚αžΊαž’αŸ’αž“αž€αž˜αžΎαž›αž„αžΆαž™!

αž’αŸ†αž–αžΎβ€‹αžƒαŸ„αžšαžƒαŸ…β€‹αž“αŸαŸ‡β€‹αž‚αžΊβ€‹αž˜αžΆαž“β€‹αž…αŸαžαž“αžΆ αžαŸ’αž‰αž»αŸ†β€‹αž‚αŸ’αžšαžΆαž“αŸ‹β€‹αžαŸ‚β€‹αž”αž€β€‹αžŸαŸ’αžšαžΆαž™β€‹αž–αžΆαž€αŸ’αž™β€‹αž˜αž½αž™β€‹αž…αŸ†αž“αž½αž“β€‹αžŸαž˜αŸ’αžšαžΆαž”αŸ‹β€‹αž’αŸ’αž“αž€αŸ” αž₯αž‘αžΌαžœαž“αŸαŸ‡αž’αŸ’αž“αž€αž’αžΆαž…αž‘αŸ…αž”αž“αŸ’αžαŸ‚αž˜αž‘αŸ€αžαŸ”

αž•αŸ‚αž“αž€αžΆαžšαžšαž”αžŸαŸ‹αž™αžΎαž„αž‚αžΊαž“αŸαŸ‡αŸ–

  1. αž’αŸ’αžœαžΎαžŠαžΆαž…
  2. αž”αž„αŸ’αž€αžΎαžαž€αž·αž…αŸ’αž…αž€αžΆαžš
  3. αž˜αžΎαž›β€‹αžαžΆβ€‹αžŸαŸ’αž’αžΆαžβ€‹αž”αŸ‰αž»αžŽαŸ’αžŽαžΆβ€‹αž‘αŸ…β€‹
  4. αž€αŸ†αžŽαžαŸ‹αž›αŸαžαžœαž‚αŸ’αž‚αžŠαžΎαž˜αŸ’αž”αžΈαž”αŸ†αž–αŸαž‰
  5. αž‘αž‘αž½αž›αž”αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž–αžΈ SQL Server
  6. αžŠαžΆαž€αŸ‹αž‘αž·αž“αŸ’αž“αž“αŸαž™αž‘αŸ…αž€αŸ’αž“αž»αž„ Vertica
  7. αž”αŸ’αžšαž˜αžΌαž›αžŸαŸ’αžαž·αžαž·

αžŠαžΌαž…αŸ’αž“αŸαŸ‡ αžŠαžΎαž˜αŸ’αž”αžΈβ€‹αžŸαž˜αŸ’αžšαŸαž…β€‹αž”αžΆαž“β€‹αž‘αžΆαŸ†αž„β€‹αž’αžŸαŸ‹β€‹αž“αŸαŸ‡ αžαŸ’αž‰αž»αŸ†β€‹αž”αžΆαž“β€‹αž’αŸ’αžœαžΎβ€‹αž€αžΆαžšβ€‹αž”αž“αŸ’αžαŸ‚αž˜β€‹αž˜αž½αž™β€‹αž…αŸ†αž“αž½αž“β€‹αžαžΌαž…β€‹αž‘αŸ…β€‹αž“αžΉαž„β€‹αžšαž”αžŸαŸ‹β€‹αž™αžΎαž„ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

αž“αŸ…αž‘αžΈαž“αŸ„αŸ‡αž™αžΎαž„αž›αžΎαž€αŸ–

  • Vertica αž‡αžΆαž˜αŸ’αž…αžΆαžŸαŸ‹αž•αŸ’αž‘αŸ‡ dwh αž‡αžΆαž˜αž½αž™αž“αžΉαž„αž€αžΆαžšαž€αŸ†αžŽαžαŸ‹αž›αŸ†αž“αžΆαŸ†αžŠαžΎαž˜αž”αŸ†αž•αž»αž
  • αž§αž‘αžΆαž αžšαžŽαŸαž”αžΈαž“αŸƒ SQL Server,
  • αž™αžΎαž„αž”αŸ†αž–αŸαž‰αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž“αŸ…αž–αŸαž›αž€αŸ’αžšαŸ„αž™αž‡αžΆαž˜αž½αž™αž“αžΉαž„αž‘αž·αž“αŸ’αž“αž“αŸαž™αž˜αž½αž™αž…αŸ†αž“αž½αž“ (αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈαžŽαžΆαž€αŸαžŠαŸ„αž™αž€αž»αŸ†αž–αž·αž“αž·αžαŸ’αž™αž˜αžΎαž› mssql_init.py!)

αž™αžΎαž„αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž›αŸ’αž’αž‘αžΆαŸ†αž„αž’αžŸαŸ‹ αžŠαŸ„αž™αž˜αžΆαž“αž‡αŸ†αž“αž½αž™αž–αžΈαž–αžΆαž€αŸ’αž™αž”αž‰αŸ’αž‡αžΆαžŠαŸ‚αž›αžŸαŸ’αž˜αž»αž‚αžŸαŸ’αž˜αžΆαž‰αž‡αžΆαž„αž–αŸαž›αž˜αž»αž“αž”αž“αŸ’αžαž·αž…αŸ–

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

αž’αŸ’αžœαžΈαžŠαŸ‚αž›αž’αž–αŸ’αž—αžΌαžαž αŸαžαž»αž…αŸƒαžŠαž“αŸ’αž™αžšαž”αžŸαŸ‹αž™αžΎαž„αž”αžΆαž“αž”αž„αŸ’αž€αžΎαž αž’αŸ’αž“αž€αž’αžΆαž…αž”αŸ’αžšαžΎαž’αžΆαžαž» Data Profiling/Ad Hoc Query:

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›
αžšαžΏαž„αžŸαŸ†αžαžΆαž“αŸ‹αž‚αžΊαž˜αž·αž“αžαŸ’αžšαžΌαžœαž”αž„αŸ’αž αžΆαž‰αžœαžΆαžŠαž›αŸ‹αž’αŸ’αž“αž€αžœαž·αž—αžΆαž‚αž‘αŸαŸ”

αž›αž˜αŸ’αž’αž·αžαž“αŸ…αž›αžΎ αžœαž‚αŸ’αž‚ ETL αžαŸ’αž‰αž»αŸ†αž“αžΉαž„αž˜αž·αž“αž’αžΈαž‘αŸ αž’αŸ’αžœαžΈαž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αž‚αžΊαžαžΌαž…αžαžΆαž…αž“αŸ…αž‘αžΈαž“αŸ„αŸ‡αŸ– αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž˜αž½αž™ αž˜αžΆαž“αžŸαž‰αŸ’αž‰αžΆαž˜αž½αž™αž“αŸ…αž€αŸ’αž“αž»αž„αžœαžΆ αž™αžΎαž„αžšαž»αŸ†αž’αŸ’αžœαžΈαž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αž‡αžΆαž˜αž½αž™αž’αŸ’αž“αž€αž‚αŸ’αžšαž”αŸ‹αž‚αŸ’αžšαž„αž”αžšαž·αž”αž‘ αž αžΎαž™αž₯αž‘αžΌαžœαž“αŸαŸ‡αž™αžΎαž„αž’αŸ’αžœαžΎαžŠαžΌαž…αž“αŸαŸ‡αŸ–

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

αž–αŸαž›αžœαŸαž›αžΆβ€‹αž”αžΆαž“β€‹αž˜αž€αžŠαž›αŸ‹β€‹αž αžΎαž™ αž”αŸ’αžšαž˜αžΌαž›αž‘αž·αž“αŸ’αž“αž“αŸαž™αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αž–αžΈαžαž»αž˜αž½αž™αž€αž“αŸ’αž›αŸ‡αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αž…αžΌαžšαž’αŸ’αžœαžΎαžŠαžΌαž…αž“αŸαŸ‡αžŠαŸ„αž™αž˜αžΆαž“αž‡αŸ†αž“αž½αž™αž–αžΈαž”αž“αŸ’αž‘αžΆαžαŸ‹αžŠαŸ‚αž›αž˜αž·αž“αž‚αž½αžšαž±αŸ’αž™αž‡αžΏαž”αŸ†αž•αž»αž:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. αžŠαŸ„αž™αž˜αžΆαž“αž‡αŸ†αž“αž½αž™αž–αžΈαž‘αŸ†αž–αž€αŸ‹αž˜αž½αž™αž™αžΎαž„αž‘αž‘αž½αž›αž”αžΆαž“αž–αžΈ Airflow pymssql- αž—αŸ’αž‡αžΆαž”αŸ‹
  2. αž…αžΌαžšαž‡αŸ†αž“αž½αžŸαž€αžΆαžšαžšαžΉαžαž”αž“αŸ’αžαžΉαž„αž€αŸ’αž“αž»αž„αž‘αž˜αŸ’αžšαž„αŸ‹αž“αŸƒαž€αžΆαž›αž”αžšαž·αž…αŸ’αž†αŸαž‘αž‘αŸ…αž€αŸ’αž“αž»αž„αžŸαŸ†αžŽαžΎ - αžœαžΆαž“αžΉαž„αžαŸ’αžšαžΌαžœαž”αžΆαž“αž”αŸ„αŸ‡αž…αžΌαž›αž‘αŸ…αž€αŸ’αž“αž»αž„αž˜αž»αžαž„αžΆαžšαžŠαŸ„αž™αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž‚αŸ†αžšαžΌαŸ”
  3. αž…αž·αž‰αŸ’αž…αžΉαž˜αžαžΆαž˜αžŸαŸ†αžŽαžΎαžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” pandasαžαžΎαž’αŸ’αž“αž€αžŽαžΆαž“αžΉαž„αž‘αž‘αž½αž›αž”αžΆαž“αž™αžΎαž„ DataFrame - αžœαžΆαž“αžΉαž„αž˜αžΆαž“αž”αŸ’αžšαž™αŸ„αž‡αž“αŸαžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž™αžΎαž„αž“αžΆαž–αŸαž›αž’αž“αžΆαž‚αžαŸ”

αžαŸ’αž‰αž»αŸ†αž€αŸ†αž–αž»αž„αž”αŸ’αžšαžΎαž€αžΆαžšαž‡αŸ†αž“αž½αžŸ {dt} αž‡αŸ†αž“αž½αžŸαž±αŸ’αž™αž”αŸ‰αžΆαžšαŸ‰αžΆαž˜αŸ‰αŸ‚αžαŸ’αžšαžŸαŸ’αž“αžΎαžŸαž»αŸ† %s αž˜αž·αž“αž˜αŸ‚αž“αžŠαŸ„αž™αžŸαžΆαžšαžαŸ‚αžαŸ’αž‰αž»αŸ†αž‡αžΆ Pinocchio αž’αžΆαž€αŸ’αžšαž€αŸ‹αž“αŸ„αŸ‡αž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžŠαŸ„αž™αžŸαžΆαžšαžαŸ‚ pandas αž˜αž·αž“αž’αžΆαž…αžŠαŸ„αŸ‡αžŸαŸ’αžšαžΆαž™αž”αžΆαž“αž‘αŸαŸ” pymssql αž αžΎαž™αžšαž’αž·αž›αž…αž»αž„αž€αŸ’αžšαŸ„αž™ params: Listαž‘αŸ„αŸ‡αž”αžΈαž‡αžΆαž‚αžΆαžαŸ‹αž–αž·αžαž‡αžΆαž…αž„αŸ‹αž”αžΆαž“αž€αŸαžŠαŸ„αž™αŸ” tuple.
αž…αŸ†αžŽαžΆαŸ†αž•αž„αžŠαŸ‚αžšαžαžΆαž’αŸ’αž“αž€αž’αž—αž·αžœαžŒαŸ’αžαž“αŸ pymssql αžŸαž˜αŸ’αžšαŸαž…αž…αž·αžαŸ’αžαž˜αž·αž“αž‚αžΆαŸ†αž‘αŸ’αžšαž‚αžΆαžαŸ‹αž‘αŸ€αžαž‘αŸ αž αžΎαž™αžŠαž›αŸ‹αž–αŸαž›αžαŸ’αžšαžΌαžœαžšαžΎαž…αŸαž‰αž αžΎαž™αŸ” pyodbc.

αžαŸ„αŸ‡αž˜αžΎαž›αž’αŸ’αžœαžΈαžŠαŸ‚αž› Airflow αž”αž‰αŸ’αž…αžΌαž›αž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹αž“αŸƒαž˜αž»αžαž„αžΆαžšαžšαž”αžŸαŸ‹αž™αžΎαž„αž‡αžΆαž˜αž½αž™αŸ–

Apache AirflowαŸ– αž’αŸ’αžœαžΎαž±αŸ’αž™ ETL αž€αžΆαž“αŸ‹αžαŸ‚αž„αžΆαž™αžŸαŸ’αžšαž½αž›

αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž‚αŸ’αž˜αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž‘αŸ αž“αŸ„αŸ‡αž‚αŸ’αž˜αžΆαž“αž…αŸ†αžŽαž»αž…αž’αŸ’αžœαžΈαž€αŸ’αž“αž»αž„αž€αžΆαžšαž”αž“αŸ’αžαž‘αŸαŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžœαžΆαž€αŸαž…αž˜αŸ’αž›αŸ‚αž€αžŠαŸ‚αžšαž€αŸ’αž“αž»αž„αž€αžΆαžšαž–αž·αž…αžΆαžšαžŽαžΆαžαžΆαž€αžΆαžšαž”αŸ†αž–αŸαž‰αž‘αž‘αž½αž›αž”αžΆαž“αž‡αŸ„αž‚αž‡αŸαž™αŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž“αŸαŸ‡αž˜αž·αž“αž˜αŸ‚αž“αž‡αžΆαž€αŸ†αž αž»αžŸαž‘αŸαŸ” ធអ-ធអ-ធអ αž’αŸ’αžœαžΎαž’αžΈ?! αž αžΎαž™αž“αŸαŸ‡αž‡αžΆαž’αŸ’αžœαžΈαŸ–

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException αž“αžΉαž„αž”αŸ’αžšαžΆαž”αŸ‹ Airflow αžαžΆαž˜αž·αž“αž˜αžΆαž“αž€αŸ†αž αž»αžŸαž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž™αžΎαž„αžšαŸ†αž›αž„αž€αž·αž…αŸ’αž…αž€αžΆαžšαŸ” αž…αŸ†αžŽαž»αž…αž”αŸ’αžšαž‘αžΆαž€αŸ‹αž“αžΉαž„αž˜αž·αž“αž˜αžΆαž“αž€αžΆαžšαŸ‰αŸαž–αžŽαŸŒαž”αŸƒαžαž„αž¬αž€αŸ’αžšαž αž˜αž‘αŸαž”αŸ‰αž»αž“αŸ’αžαŸ‚αž–αžŽαŸŒαž•αŸ’αž€αžΆαžˆαžΌαž€αŸ”

αžαŸ„αŸ‡αž”αŸ„αŸ‡αž‘αž·αž“αŸ’αž“αž“αŸαž™αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αž‡αž½αžšαžˆαžšαž…αŸ’αžšαžΎαž“αŸ”:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

αžˆαŸ’αž˜αŸ„αŸ‡αŸ–

  • αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αžŠαŸ‚αž›αž™αžΎαž„αž”αžΆαž“αž”αž‰αŸ’αž‡αžΆαž‘αž·αž‰
  • αž›αŸαžαžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž“αŸƒαžœαž‚αŸ’αž‚αž‘αžΉαž€αž‡αŸ†αž“αž“αŸ‹αžšαž”αžŸαŸ‹αž™αžΎαž„ (αžœαžΆαž“αžΉαž„αžαž»αžŸαž‚αŸ’αž“αžΆ αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžšαžΆαž›αŸ‹αž€αž·αž…αŸ’αž…αž€αžΆαžš),
  • αž›αŸαžαžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž–αžΈαž”αŸ’αžšαž—αž– αž“αž·αž„αž›αŸ†αžŠαžΆαž”αŸ‹αž›αŸαžαžŸαž˜αŸ’αž‚αžΆαž›αŸ‹ - αžŠαžΌαž…αŸ’αž“αŸαŸ‡αž“αŸ…αž€αŸ’αž“αž»αž„αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž…αž»αž„αž€αŸ’αžšαŸ„αž™ (αžŠαŸ‚αž›αž’αŸ’αžœαžΈαŸ—αžαŸ’αžšαžΌαžœαž”αžΆαž“αž…αžΆαž€αŸ‹αž…αžΌαž›αž‘αŸ…αž€αŸ’αž“αž»αž„αžαžΆαžšαžΆαž„αžαŸ‚αž˜αž½αž™) αž™αžΎαž„αž˜αžΆαž“αž›αŸαžαžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž›αŸ†αžŠαžΆαž”αŸ‹αžαŸ‚αž˜αž½αž™αž‚αžαŸ‹αŸ”

αž‡αŸ†αž αžΆαž“αž…αž»αž„αž€αŸ’αžšαŸ„αž™αž“αŸ…αžαŸ‚αž˜αžΆαž“: αž…αžΆαž€αŸ‹αž’αŸ’αžœαžΈαž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αž‘αŸ…αž€αŸ’αž“αž»αž„ Vertica αŸ” αž αžΎαž™αž‚αž½αžšαž±αŸ’αž™αž…αž˜αŸ’αž›αŸ‚αž€αžŽαžΆαžŸαŸ‹ αžœαž·αž’αžΈαžŠαŸαž’αžŸαŸ’αž…αžΆαžšαŸ’αž™ αž“αž·αž„αž˜αžΆαž“αž”αŸ’αžšαžŸαž·αž‘αŸ’αž’αž—αžΆαž–αž”αŸ†αž•αž»αžαž€αŸ’αž“αž»αž„αž€αžΆαžšαž’αŸ’αžœαžΎαž“αŸαŸ‡αž‚αžΊαžαžΆαž˜αžšαž™αŸˆ CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. αž™αžΎαž„αž€αŸ†αž–αž»αž„αž”αž„αŸ’αž€αžΎαžαž’αŸ’αž“αž€αž‘αž‘αž½αž›αž–αž·αžŸαŸαžŸ StringIO.
  2. pandas αž“αžΉαž„αžŠαžΆαž€αŸ‹αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” DataFrame αž“αŸ…αž€αŸ’αž“αž»αž„αžŸαŸ†αžŽαž»αŸ†αž”αŸ‚αž”αž”αž‘ CSV- αž”αž“αŸ’αž‘αžΆαžαŸ‹αŸ”
  3. αžαŸ„αŸ‡αž”αžΎαž€αž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αž‘αŸ… Vertica αžŸαŸ†αžŽαž–αŸ’αžœαžšαž”αžŸαŸ‹αž™αžΎαž„αžŠαŸ„αž™αž”αŸ’αžšαžΎαž‘αŸ†αž–αž€αŸ‹αŸ”
  4. αž αžΎαž™αž₯αž‘αžΌαžœαž“αŸαŸ‡αž‡αžΆαž˜αž½αž™αž“αžΉαž„αž‡αŸ†αž“αž½αž™ copy() αž•αŸ’αž‰αžΎαž‘αž·αž“αŸ’αž“αž“αŸαž™αžšαž”αžŸαŸ‹αž™αžΎαž„αžŠαŸ„αž™αž•αŸ’αž‘αžΆαž›αŸ‹αž‘αŸ… Vertika!

αž™αžΎαž„β€‹αž“αžΉαž„β€‹αž™αž€β€‹αž–αžΈβ€‹αž’αŸ’αž“αž€β€‹αž”αž‰αŸ’αž‡αžΆβ€‹αž–αžΈβ€‹αž…αŸ†αž“αž½αž“β€‹αž”αž“αŸ’αž‘αžΆαžαŸ‹β€‹αžŠαŸ‚αž›β€‹αžαŸ’αžšαžΌαžœβ€‹αž”αžΆαž“β€‹αž”αŸ†αž–αŸαž‰ αž αžΎαž™β€‹αž”αŸ’αžšαžΆαž”αŸ‹β€‹αž’αŸ’αž“αž€β€‹αž‚αŸ’αžšαž”αŸ‹β€‹αž‚αŸ’αžšαž„β€‹αžŸαž˜αŸαž™β€‹αžαžΆβ€‹αž‚αŸ’αžšαž”αŸ‹β€‹αž™αŸ‰αžΆαž„β€‹αž‚αžΊβ€‹αž˜αž·αž“β€‹αž’αžΈβ€‹αž‘αŸαŸ–

session.loaded_rows = cursor.rowcount
session.successful = True

αž’αžŸαŸ‹αž αžΎαž™αŸ”

αž“αŸ…αž›αžΎαž€αžΆαžšαž›αž€αŸ‹ αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž…αžΆαž“αž‚αŸ„αž›αžŠαŸ…αžŠαŸ„αž™αžŠαŸƒαŸ” αž“αŸ…αž‘αžΈαž“αŸαŸ‡αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž’αž“αž»αž‰αŸ’αž‰αžΆαžαž±αŸ’αž™αžαŸ’αž›αž½αž“αžαŸ’αž‰αž»αŸ†αž“αžΌαžœαž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αžαžΌαž…αž˜αž½αž™:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

αžαŸ’αž‰αž»αŸ†β€‹αž€αŸ†αž–αž»αž„β€‹αž”αŸ’αžšαžΎ VerticaOperator() αžαŸ’αž‰αž»αŸ†αž”αž„αŸ’αž€αžΎαžαž‚αŸ’αžšαŸ„αž„αž€αžΆαžšαžŽαŸαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™ αž“αž·αž„αžαžΆαžšαžΆαž„ (αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž–αž½αž€αž‚αŸαž˜αž·αž“αž‘αžΆαž“αŸ‹αž˜αžΆαž“αž‘αŸ αž–αž·αžαžŽαžΆαžŸαŸ‹)αŸ” αžšαžΏαž„αž…αŸ†αž”αž„αž‚αžΊαžšαŸ€αž”αž…αŸ†αž±αŸ’αž™αž”αžΆαž“αžαŸ’αžšαžΉαž˜αžαŸ’αžšαžΌαžœαž“αžΌαžœαž—αžΆαž–αž’αžΆαžŸαŸ’αžšαŸαž™:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

αžŸαž„αŸ’αžαŸαž”

- αž˜αŸ‚αž“αž αžΎαž™ - αž€αžΌαž“αž€αžŽαŸ’αžαž»αžšαžαžΌαž…αž”αžΆαž“αž“αž·αž™αžΆαž™αžαžΆ - αž˜αŸ‚αž“αž‘αŸαž₯αž‘αžΌαžœαž“αŸαŸ‡
αžαžΎαž’αŸ’αž“αž€αž‡αžΏαžαžΆαžαŸ’αž‰αž»αŸ†αž‡αžΆαžŸαžαŸ’αžœαžŠαŸ‚αž›αž‚αž½αžšαž±αŸ’αž™αžαŸ’αž›αžΆαž…αž”αŸ†αž•αž»αžαž“αŸ…αž€αŸ’αž“αž»αž„αž–αŸ’αžšαŸƒ?

Julia Donaldson, The Gruffalo

αžαŸ’αž‰αž»αŸ†αž‚αž·αžαžαžΆαž”αŸ’αžšαžŸαž·αž“αž”αžΎαžŸαž αžŸαŸαžœαž·αž€αžšαž”αžŸαŸ‹αžαŸ’αž‰αž»αŸ† αž“αž·αž„αžαŸ’αž‰αž»αŸ†αž˜αžΆαž“αž€αžΆαžšαž”αŸ’αžšαž€αž½αžαž”αŸ’αžšαž‡αŸ‚αž„αž˜αž½αž™αŸ– αžαžΎαž’αŸ’αž“αž€αžŽαžΆαž“αžΉαž„αž”αž„αŸ’αž€αžΎαž αž“αž·αž„αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αžŠαŸ†αžŽαžΎαžšαž€αžΆαžš ETL αž™αŸ‰αžΆαž„αž†αžΆαž”αŸ‹αžšαž αŸαžŸαž–αžΈαžŠαŸ†αž”αžΌαž„αŸ– αž–αž½αž€αž‚αŸαž‡αžΆαž˜αž½αž™αž“αžΉαž„ SSIS αž“αž·αž„αž€αžŽαŸ’αžαž»αžšαžšαž”αžŸαŸ‹αž–αž½αž€αž‚αŸ αž“αž·αž„αžαŸ’αž‰αž»αŸ†αž‡αžΆαž˜αž½αž™αž“αžΉαž„ Airflow ... αž αžΎαž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž™αžΎαž„αž€αŸαž“αžΉαž„αž”αŸ’αžšαŸ€αž”αž’αŸ€αž”αž—αžΆαž–αž„αžΆαž™αžŸαŸ’αžšαž½αž›αž“αŸƒαž€αžΆαžšαžαŸ‚αž‘αžΆαŸ†αž•αž„αžŠαŸ‚αžš ... αž’αžΈαž™αŸ‰αžΆ αžαŸ’αž‰αž»αŸ†β€‹αž‚αž·αžβ€‹αžαžΆβ€‹αž’αŸ’αž“αž€β€‹αž“αžΉαž„β€‹αž™αž›αŸ‹αž–αŸ’αžšαž˜β€‹αžαžΆβ€‹αžαŸ’αž‰αž»αŸ†β€‹αž“αžΉαž„β€‹αžœαžΆαž™β€‹αž–αž½αž€β€‹αž‚αŸβ€‹αž“αŸ…β€‹αž›αžΎβ€‹αž˜αž»αžβ€‹αž‘αžΆαŸ†αž„β€‹αž’αžŸαŸ‹!

αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž’αŸ’αž„αž“αŸ‹αž’αŸ’αž„αžšαž‡αžΆαž„αž“αŸαŸ‡αž”αž“αŸ’αžαž·αž… αž“αŸ„αŸ‡ Apache Airflow - αžŠαŸ„αž™αž–αžŽαŸŒαž“αžΆαž’αŸ†αž–αžΈαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž€αŸ’αž“αž»αž„αž‘αž˜αŸ’αžšαž„αŸ‹αž‡αžΆαž€αžΌαžŠαž€αž˜αŸ’αž˜αžœαž·αž’αžΈ - αž”αžΆαž“αž’αŸ’αžœαžΎαž€αžΆαžšαž„αžΆαžšαžšαž”αžŸαŸ‹αžαŸ’αž‰αž»αŸ† αž…αŸ’αžšαžΎαž“ αž€αžΆαž“αŸ‹αžαŸ‚αž˜αžΆαž“αž•αžΆαžŸαž»αž€αž—αžΆαž– αž“αž·αž„αžšαžΈαž€αžšαžΆαž™αŸ”

αž€αžΆαžšαž–αž„αŸ’αžšαžΈαž€αž‚αŸ’αž˜αžΆαž“αžŠαŸ‚αž“αž€αŸ†αžŽαžαŸ‹αžšαž”αžŸαŸ‹αžœαžΆ αž‘αžΆαŸ†αž„αž“αŸ…αž€αŸ’αž“αž»αž„αž›αž€αŸ’αžαžαžŽαŸ’αžŒαž“αŸƒαž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž‡αŸ†αž“αž½αž™ αž“αž·αž„αž‘αŸ†αž“αŸ„αžšαž‘αŸ…αžšαž€αž€αžΆαžšαž’αŸ’αžœαžΎαž˜αžΆαžαŸ’αžšαžŠαŸ’αž‹αžΆαž“ αž•αŸ’αžαž›αŸ‹αž±αŸ’αž™αž’αŸ’αž“αž€αž“αžΌαžœαž±αž€αžΆαžŸαžŠαžΎαž˜αŸ’αž”αžΈαž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹ Airflow αž“αŸ…αžŸαŸ’αž‘αžΎαžšαžαŸ‚αž‚αŸ’αžšαž”αŸ‹αž•αŸ’αž“αŸ‚αž€αŸ– αžŸαžΌαž˜αŸ’αž”αžΈαžαŸ‚αž“αŸ…αž€αŸ’αž“αž»αž„αžœαžŠαŸ’αžαž–αŸαž‰αž›αŸαž‰αž“αŸƒαž€αžΆαžšαž”αŸ’αžšαž˜αžΌαž› αžšαŸ€αž”αž…αŸ† αž“αž·αž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™ αžŸαžΌαž˜αŸ’αž”αžΈαžαŸ‚αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžΆαžšαž”αžΆαž‰αŸ‹αž”αž„αŸ’αž αŸ„αŸ‡αžšαŸ‰αž»αž€αŸ’αž€αŸ‚αž (αž‘αŸ…αž€αžΆαž“αŸ‹αž—αž–αž–αŸ’αžšαŸ‡αž’αž„αŸ’αž‚αžΆαžš αž“αŸƒ αžœαž‚αŸ’αž‚αžŸαž·αž€αŸ’αžŸαžΆ) αŸ”

αž•αŸ’αž“αŸ‚αž€αž…αž»αž„αž€αŸ’αžšαŸ„αž™ αž―αž€αžŸαžΆαžšαž™αŸ„αž„ αž“αž·αž„αž–αŸαžαŸŒαž˜αžΆαž“

αžαž»αž„αžšαž½αž…αžŠαŸ‚αž›αž™αžΎαž„αž”αŸ’αžšαž˜αžΌαž›αž”αžΆαž“αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž’αŸ’αž“αž€

  • start_date. αž”αžΆαž‘ αž“αŸαŸ‡β€‹αž‡αžΆ meme αž€αŸ’αž“αž»αž„αžŸαŸ’αžšαž»αž€β€‹αžšαž½αž…β€‹αž αžΎαž™αŸ” αžαžΆαž˜αžšαž™αŸˆαž’αžΆαž‚αž»αž™αž˜αŸ‰αž„αŸ‹αž…αž˜αŸ’αž”αž„αžšαž”αžŸαŸ‹ Doug start_date αž†αŸ’αž›αž„αž€αžΆαžαŸ‹αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αŸ” αžŸαž„αŸ’αžαŸαž”αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž’αŸ’αž“αž€αž”αž‰αŸ’αž‡αžΆαž€αŸ‹αž“αŸ…αž€αŸ’αž“αž»αž„ start_date αž€αžΆαž›αž”αžšαž·αž…αŸ’αž†αŸαž‘αž”αž…αŸ’αž…αž»αž”αŸ’αž”αž“αŸ’αž“ αž“αž·αž„ schedule_interval - αžαŸ’αž„αŸƒαž˜αž½αž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€ DAG αž“αžΉαž„αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž“αŸ…αžαŸ’αž„αŸƒαžŸαŸ’αž’αŸ‚αž€αž˜αž·αž“αž˜αž»αž“αž‘αŸαŸ”
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    αž αžΎαž™αž›αŸ‚αž„αž˜αžΆαž“αž”αž‰αŸ’αž αžΆαž‘αŸ€αžαž αžΎαž™αŸ”

    αž˜αžΆαž“β€‹αž€αŸ†αž αž»αžŸβ€‹αž–αŸαž›β€‹αžšαžαŸ‹β€‹αž˜αž½αž™β€‹αž‘αŸ€αžβ€‹αžŠαŸ‚αž›β€‹αž‘αžΆαž€αŸ‹αž‘αž„β€‹αž“αžΉαž„β€‹αžœαžΆαŸ– Task is missing the start_date parameterαžŠαŸ‚αž›αž—αžΆαž‚αž…αŸ’αžšαžΎαž“αž”αž„αŸ’αž αžΆαž‰αžαžΆαž’αŸ’αž“αž€αž—αŸ’αž›αŸαž…αž…αž„αž—αŸ’αž‡αžΆαž”αŸ‹αž‡αžΆαž˜αž½αž™αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžš dag αŸ”

  • αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž“αŸ…αž›αžΎαž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αžαŸ‚αž˜αž½αž™αŸ” αž”αžΆαž‘/αž…αžΆαžŸ αž“αž·αž„αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“ (Airflow αžαŸ’αž›αž½αž“αžœαžΆαž•αŸ’αž‘αžΆαž›αŸ‹ αž“αž·αž„αžαŸ’αž“αžΆαŸ†αž€αžΌαžαžšαž”αžŸαŸ‹αž™αžΎαž„) αž“αž·αž„αž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž˜αŸαž‚αŸαž αž‘αŸ†αž–αŸαžš αž“αž·αž„αž’αŸ’αž“αž€αž€αŸ†αžŽαžαŸ‹αž–αŸαž› αž“αž·αž„αž€αž˜αŸ’αž˜αž€αžšαŸ” αž αžΎαž™αžœαžΆαžαŸ‚αž˜αž‘αžΆαŸ†αž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αŸ€αžαž•αž„αŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž™αžΌαžš αŸ— αž‘αŸ…αž…αŸ†αž“αž½αž“αž“αŸƒαž—αžΆαžšαž€αž·αž…αŸ’αž…αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžŸαŸαžœαžΆαž€αž˜αŸ’αž˜αž”αžΆαž“αž€αžΎαž“αž‘αžΎαž„αž αžΎαž™αž“αŸ…αž–αŸαž›αžŠαŸ‚αž› PostgreSQL αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž†αŸ’αž›αžΎαž™αžαž”αž‘αŸ…αž“αžΉαž„αžŸαž“αŸ’αž‘αžŸαŸ’αžŸαž“αŸαž€αŸ’αž“αž»αž„αžšαž™αŸˆαž–αŸαž› 20 αžœαž·αž“αžΆαž‘αžΈαž‡αŸ†αž“αž½αžŸαž±αŸ’αž™ 5 ms αž™αžΎαž„αž”αžΆαž“αž™αž€αžœαžΆαž‘αŸ…αž†αŸ’αž„αžΆαž™αŸ”
  • αž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžšαž€αŸ’αž“αž»αž„αžŸαŸ’αžšαž»αž€αŸ” αž”αžΆαž‘ αž™αžΎαž„β€‹αž“αŸ…β€‹αžαŸ‚β€‹αž’αž„αŸ’αž‚αž»αž™β€‹αž›αžΎβ€‹αžœαžΆ αž αžΎαž™β€‹αž™αžΎαž„β€‹αž”αžΆαž“β€‹αž˜αž€β€‹αžŠαž›αŸ‹β€‹αž˜αžΆαžαŸ‹β€‹αž‡αŸ’αžšαŸ…β€‹αž αžΎαž™αŸ” LocalExecutor αž‚αžΊαž‚αŸ’αžšαž”αŸ‹αž‚αŸ’αžšαžΆαž“αŸ‹αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž™αžΎαž„αžšαž αžΌαžαž˜αž€αžŠαž›αŸ‹αž–αŸαž›αž“αŸαŸ‡ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž₯αž‘αžΌαžœαž“αŸαŸ‡αžœαžΆαžŠαž›αŸ‹αž–αŸαž›αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž–αž„αŸ’αžšαžΈαž€αž‡αžΆαž˜αž½αž™αž”αž»αž‚αŸ’αž‚αž›αž·αž€αž™αŸ‰αžΆαž„αž αŸ„αž…αžŽαžΆαžŸαŸ‹αž˜αŸ’αž“αžΆαž€αŸ‹ αž αžΎαž™αž™αžΎαž„αž“αžΉαž„αžαŸ’αžšαžΌαžœαžαž·αžαžαŸ†αž”αŸ’αžšαžΉαž„αž”αŸ’αžšαŸ‚αž„αžŠαžΎαž˜αŸ’αž”αžΈαž•αŸ’αž›αžΆαžŸαŸ‹αž‘αžΈαž‘αŸ… CeleryExecutor αŸ” αž αžΎαž™αž“αŸ…αž€αŸ’αž“αž»αž„αž‘αž·αžŠαŸ’αž‹αž—αžΆαž–αž“αŸƒαž€αžΆαžšαž–αž·αžαžŠαŸ‚αž›αžαžΆαž’αŸ’αž“αž€αž’αžΆαž…αž’αŸ’αžœαžΎαž€αžΆαžšαž‡αžΆαž˜αž½αž™αžœαžΆαž“αŸ…αž›αžΎαž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αžαŸ‚αž˜αž½αž™αž‚αŸ’αž˜αžΆαž“αž’αŸ’αžœαžΈαžšαžΆαžšαžΆαŸ†αž„αž’αŸ’αž“αž€αž–αžΈαž€αžΆαžšαž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹ Celery αžŸαžΌαž˜αŸ’αž”αžΈαžαŸ‚αž“αŸ…αž›αžΎαž˜αŸ‰αžΆαžŸαŸŠαžΈαž“αž˜αŸαžŠαŸ‚αž› "αž‡αžΆαž€αžΆαžšαž–αž·αžαžŽαžΆαžŸαŸ‹αž“αžΉαž„αž˜αž·αž“αž…αžΌαž›αž‘αŸ…αž€αŸ’αž“αž»αž„αž€αžΆαžšαž•αž›αž·αžαžŠαŸ„αž™αžŸαŸ’αž˜αŸ„αŸ‡αžαŸ’αžšαž„αŸ‹!"
  • αž˜αž·αž“αž”αŸ’αžšαžΎ αž§αž”αž€αžšαžŽαŸαžŠαŸ‚αž›αž—αŸ’αž‡αžΆαž”αŸ‹αž˜αž€αž‡αžΆαž˜αž½αž™:
    • αž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹ αžŠαžΎαž˜αŸ’αž”αžΈαžšαž€αŸ’αžŸαžΆαž‘αž»αž€αž–αŸαžαŸŒαž˜αžΆαž“αž”αž‰αŸ’αž‡αžΆαž€αŸ‹αžŸαŸαžœαžΆαž€αž˜αŸ’αž˜,
    • SLA αž“αžΉαž€ αžŠαžΎαž˜αŸ’αž”αžΈαž†αŸ’αž›αžΎαž™αžαž”αž‘αŸ…αž“αžΉαž„αž€αž·αž…αŸ’αž…αž€αžΆαžšαžŠαŸ‚αž›αž˜αž·αž“αž”αžΆαž“αžŸαž˜αŸ’αžšαŸαž…αž‘αžΆαž“αŸ‹αž–αŸαž›αžœαŸαž›αžΆαŸ”
    • xcom αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž€αžΆαžšαž•αŸ’αž›αžΆαžŸαŸ‹αž”αŸ’αžαžΌαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αž˜αŸαžαžΆ (αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž“αž·αž™αžΆαž™ αž˜αŸαžαžΆαž‘αž·αž“αŸ’αž“αž“αŸαž™!) αžšαžœαžΆαž„αž€αžΆαžšαž„αžΆαžš dag αŸ”
  • αž€αžΆαžšαž”αŸ†αž–αžΆαž“αžŸαŸ†αž”αž»αžαŸ’αžšαŸ” αž’αž‰αŸ’αž…αžΉαž„αžαžΎαžαŸ’αž‰αž»αŸ†αž’αžΆαž…αž“αž·αž™αžΆαž™αž’αŸ’αžœαžΈαž”αžΆαž“? αž€αžΆαžšαž‡αžΌαž“β€‹αžŠαŸ†αžŽαžΉαž„β€‹αžαŸ’αžšαžΌαžœβ€‹αž”αžΆαž“β€‹αž”αž„αŸ’αž€αžΎαžβ€‹αž‘αžΎαž„β€‹αžŸαž˜αŸ’αžšαžΆαž”αŸ‹β€‹αž€αžΆαžšβ€‹αž’αŸ’αžœαžΎβ€‹αžŠαžŠαŸ‚αž›αŸ—β€‹αž“αŸƒβ€‹αž€αž·αž…αŸ’αž…αž€αžΆαžšβ€‹αžŠαŸ‚αž›β€‹αž”αžΆαž“β€‹αž’αŸ’αž›αžΆαž€αŸ‹β€‹αž…αž»αŸ‡αŸ” αž₯αž‘αžΌαžœαž“αŸαŸ‡ Gmail αž€αžΆαžšαž„αžΆαžšαžšαž”αžŸαŸ‹αžαŸ’αž‰αž»αŸ†αž˜αžΆαž“αž’αŸŠαžΈαž˜αŸ‚αž› > 90k αž–αžΈ Airflow αž αžΎαž™ web mail muzzle αž”αžŠαž·αžŸαŸαž’αž˜αž·αž“αž‘αž‘αž½αž›αž™αž€ αž“αž·αž„αž›αž»αž”αž…αŸ’αžšαžΎαž“αž‡αžΆαž„ 100 αž€αŸ’αž“αž»αž„αž–αŸαž›αžαŸ‚αž˜αž½αž™αŸ”

αž‚αŸ’αžšαŸ„αŸ‡αžαŸ’αž“αžΆαž€αŸ‹αž…αŸ’αžšαžΎαž“αž‘αŸ€αžαŸ– Apache Airflow Pitfails

αž§αž”αž€αžšαžŽαŸαžŸαŸ’αžœαŸαž™αž”αŸ’αžšαžœαžαŸ’αžαž·αž€αž˜αŸ’αž˜αž…αŸ’αžšαžΎαž“αž‘αŸ€αž

αžŠαžΎαž˜αŸ’αž”αžΈαž±αŸ’αž™αž™αžΎαž„αž’αŸ’αžœαžΎαž€αžΆαžšαž€αžΆαž“αŸ‹αžαŸ‚αž…αŸ’αžšαžΎαž“αžŠαŸ„αž™αž”αŸ’αžšαžΎαž€αŸ’αž”αžΆαž›αžšαž”αžŸαŸ‹αž™αžΎαž„ αž˜αž·αž“αž˜αŸ‚αž“αžŠαŸ„αž™αžŠαŸƒαžšαž”αžŸαŸ‹αž™αžΎαž„ Airflow αž”αžΆαž“αžšαŸ€αž”αž…αŸ†αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž™αžΎαž„αž“αžΌαžœαž…αŸ†αžŽαž»αž…αž“αŸαŸ‡αŸ–

  • REST API - αž‚αžΆαžαŸ‹αž“αŸ…αžαŸ‚αž˜αžΆαž“αžŸαŸ’αžαžΆαž“αž—αžΆαž–αž“αŸƒαž”αž‘αž–αž·αžŸαŸ„αž’αž“αŸαžŠαŸ‚αž›αž˜αž·αž“αžšαžΆαžšαžΆαŸ†αž„αž‚αžΆαžαŸ‹αž–αžΈαž€αžΆαžšαž„αžΆαžšαŸ” αž‡αžΆαž˜αž½αž™αžœαžΆ αž’αŸ’αž“αž€αž˜αž·αž“αžαŸ’αžšαžΉαž˜αžαŸ‚αž’αžΆαž…αž‘αž‘αž½αž›αž”αžΆαž“αž–αŸαžαŸŒαž˜αžΆαž“αž’αŸ†αž–αžΈ dags αž“αž·αž„αž€αž·αž…αŸ’αž…αž€αžΆαžšαž”αŸ‰αž»αžŽαŸ’αžŽαŸ„αŸ‡αž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžαŸ‚αž˜αž‘αžΆαŸ†αž„αž”αž‰αŸ’αžˆαž”αŸ‹/αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜ dag αž”αž„αŸ’αž€αžΎαž DAG Run αž¬αž’αžΆαž„αŸ”
  • CLI - αž§αž”αž€αžšαžŽαŸαž‡αžΆαž…αŸ’αžšαžΎαž“αž’αžΆαž…αžšαž€αž”αžΆαž“αžαžΆαž˜αžšαž™αŸˆαž”αž“αŸ’αž‘αžΆαžαŸ‹αž–αžΆαž€αŸ’αž™αž”αž‰αŸ’αž‡αžΆαžŠαŸ‚αž›αž˜αž·αž“αžαŸ’αžšαžΉαž˜αžαŸ‚αž˜αž·αž“αž„αžΆαž™αžŸαŸ’αžšαž½αž›αž”αŸ’αžšαžΎαžαžΆαž˜αžšαž™αŸˆ WebUI αž”αŸ‰αž»αžŽαŸ’αžŽαŸ„αŸ‡αž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž‡αžΆαž‘αžΌαž‘αŸ…αž‚αžΊαž’αžœαžαŸ’αžαž˜αžΆαž“αŸ” αž§αž‘αžΆαž αžšαžŽαŸ:
    • backfill αžαŸ’αžšαžΌαžœαž€αžΆαžšαžŠαžΎαž˜αŸ’αž”αžΈαž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž§αž‘αžΆαž αžšαžŽαŸαž€αž·αž…αŸ’αž…αž€αžΆαžšαž‘αžΎαž„αžœαž·αž‰αŸ”
      αž‡αžΆαž§αž‘αžΆαž αžšαžŽαŸ αž’αŸ’αž“αž€αžœαž·αž—αžΆαž‚αž”αžΆαž“αž˜αž€ αž αžΎαž™αž“αž·αž™αžΆαž™αžαžΆαŸ– Β«αž αžΎαž™αžŸαž˜αž˜αž·αžαŸ’αž αž˜αžΆαž“αž€αžΆαžšαžŸαž˜αž αŸαžαž»αžŸαž˜αž•αž›αž€αŸ’αž“αž»αž„αž‘αž·αž“αŸ’αž“αž“αŸαž™αž…αžΆαž”αŸ‹αž–αžΈαžαŸ’αž„αŸƒαž‘αžΈ 1 αžŠαž›αŸ‹αžαŸ’αž„αŸƒαž‘αžΈ 13 αžαŸ‚αž˜αž€αžšαžΆ! αž‡αž½αžŸαž‡αž»αž› αž€αŸ‚ αž‡αž½αžŸαž‡αž»αž› αž‡αž½αžŸαž‡αž»αž›! αž αžΎαž™αž’αŸ’αž“αž€αž‚αžΊαž‡αžΆαž…αŸ†αžŽαž„αŸ‹αž…αŸ†αžŽαžΌαž›αž…αž·αžαŸ’αžαž”αŸ‚αž”αž“αŸαŸ‡:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • αžŸαŸαžœαžΆαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αŸ– initdb, resetdb, upgradedb, checkdb.
    • runαžŠαŸ‚αž›αž’αž“αž»αž‰αŸ’αž‰αžΆαžαž±αŸ’αž™αž’αŸ’αž“αž€αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž€αž·αž…αŸ’αž…αž€αžΆαžšαž§αž‘αžΆαž αžšαžŽαŸαž˜αž½αž™ αž αžΎαž™αžαŸ‚αž˜αž‘αžΆαŸ†αž„αžŠαžΆαž€αŸ‹αž–αž·αž“αŸ’αž‘αž»αž›αžΎαž—αžΆαž–αž’αžΆαžŸαŸ’αžšαŸαž™αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αŸ” αž›αžΎαžŸαž–αžΈαž“αŸαŸ‡αž‘αŸ€αžαž’αŸ’αž“αž€αž’αžΆαž…αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžœαžΆαžαžΆαž˜αžšαž™αŸˆ LocalExecutorαž‘αŸ„αŸ‡αž”αžΈαž‡αžΆαž’αŸ’αž“αž€αž˜αžΆαž“αž…αž„αŸ’αž€αŸ„αž˜ Celery αž€αŸαžŠαŸ„αž™αŸ”
    • αž’αŸ’αžœαžΎαžšαžΏαž„αžŠαžΌαž…αž‚αŸ’αž“αžΆαŸ” testαž˜αžΆαž“αžαŸ‚αž“αŸ…αž€αŸ’αž“αž»αž„αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž˜αž·αž“αžŸαžšαžŸαŸαžšαž’αŸ’αžœαžΈαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αŸ”
    • connections αž’αž“αž»αž‰αŸ’αž‰αžΆαžαž±αŸ’αž™αž”αž„αŸ’αž€αžΎαžαž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αžŠαŸαž’αŸ†αž–αžΈαžŸαŸ‚αž›αŸ”
  • python api - αž˜αž’αŸ’αž™αŸ„αž”αžΆαž™αž–αž·αž”αžΆαž€αž‘αžΆαž€αŸ‹αž‘αž„αž‚αŸ’αž“αžΆ αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž”αžΆαž“αž”αž˜αŸ’αžšαž»αž„αž‘αž»αž€αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž‡αŸ†αž“αž½αž™ αž“αž·αž„αž˜αž·αž“αž‰αž‰αž½αžšαž“αŸ…αž€αŸ’αž“αž»αž„αžœαžΆαžŠαŸ„αž™αžŠαŸƒαžαž·αž…αžαž½αž…αŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚β€‹αž’αŸ’αž“αž€β€‹αžŽαžΆβ€‹αž˜αž€β€‹αžšαžΆαžšαžΆαŸ†αž„β€‹αž™αžΎαž„β€‹αž˜αž·αž“β€‹αž²αŸ’αž™β€‹αž‘αŸ… /home/airflow/dagsαžšαžαŸ‹ ipython αž αžΎαž™αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αžšαž‰αŸ‰αŸαžšαž‰αŸ‰αŸƒ? αž§αž‘αžΆαž αžšαžŽαŸ αž’αŸ’αž“αž€αž’αžΆαž…αž“αžΆαŸ†αž…αŸαž‰αž€αžΆαžšαžαž—αŸ’αž‡αžΆαž”αŸ‹αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αžŠαŸ„αž™αž”αŸ’αžšαžΎαž€αžΌαžŠαžαžΆαž„αž€αŸ’αžšαŸ„αž˜αŸ–
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • αž€αŸ†αž–αž»αž„αž—αŸ’αž‡αžΆαž”αŸ‹αž‘αŸ…αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž˜αŸαžαžΆαž›αŸ†αž αžΌαžšαžαŸ’αž™αž›αŸ‹αŸ” αžαŸ’αž‰αž»αŸ†β€‹αž˜αž·αž“β€‹αžŽαŸ‚αž“αžΆαŸ†β€‹αž²αŸ’αž™β€‹αžŸαžšαžŸαŸαžšβ€‹αž‘αŸ…β€‹αžœαžΆβ€‹αž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚β€‹αž€αžΆαžšβ€‹αž‘αž‘αž½αž›β€‹αž”αžΆαž“β€‹αžŸαŸ’αžαžΆαž“αž—αžΆαž–β€‹αž€αž·αž…αŸ’αž…αž€αžΆαžšβ€‹αžŸαž˜αŸ’αžšαžΆαž”αŸ‹β€‹αž˜αŸ‰αŸ‚αžαŸ’αžšβ€‹αž‡αžΆαž€αŸ‹αž›αžΆαž€αŸ‹β€‹αž•αŸ’αžŸαŸαž„αŸ—β€‹αž’αžΆαž…β€‹αž›αžΏαž“β€‹αž‡αžΆαž„β€‹αž“αž·αž„β€‹αž„αžΆαž™αžŸαŸ’αžšαž½αž›β€‹αž‡αžΆαž„β€‹αž€αžΆαžšβ€‹αž”αŸ’αžšαžΎ APIs αžŽαžΆαž˜αž½αž™αŸ”

    αž§αž”αž˜αžΆαžαžΆ αž€αž·αž…αŸ’αž…αž€αžΆαžšαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αžšαž”αžŸαŸ‹αž™αžΎαž„αž˜αž·αž“αž’αžŸαŸ‹αž€αŸ†αž›αžΆαŸ†αž„αž‘αŸ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž–αŸαž›αžαŸ’αž›αŸ‡αžœαžΆαž’αžΆαž…αž’αŸ’αž›αžΆαž€αŸ‹αž…αž»αŸ‡ αž αžΎαž™αž“αŸαŸ‡αž‡αžΆαžšαžΏαž„αž’αž˜αŸ’αž˜αžαžΆαŸ” αž”αŸ‰αž»αž“αŸ’αžαŸ‚αž€αžΆαžšαžŸαŸ’αž‘αŸ‡αž˜αž½αž™αž…αŸ†αž“αž½αž“αž˜αžΆαž“αž€αžΆαžšαžŸαž„αŸ’αžŸαŸαž™αžšαž½αž…αž αžΎαž™ αž αžΎαž™αž…αžΆαŸ†αž”αžΆαž…αŸ‹αžαŸ’αžšαžΌαžœαž–αž·αž“αž·αžαŸ’αž™αŸ”

    SQL αž”αŸ’αžšαž™αŸαžαŸ’αž“!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

αžŸαŸαž…αž€αŸ’αžαžΈαž™αŸ„αž„

αž αžΎαž™αž‡αžΆαž€αžΆαžšαž–αž·αžαžŽαžΆαžŸαŸ‹ αžαŸ†αžŽαž—αŸ’αž‡αžΆαž”αŸ‹αžŠαž”αŸ‹αžŠαŸ†αž”αžΌαž„αž–αžΈαž€αžΆαžšαž…αŸαž‰αžšαž”αžŸαŸ‹ Google αž‚αžΊαž‡αžΆαž˜αžΆαžαž·αž€αžΆαž“αŸƒαžαž Airflow αž–αžΈαž…αŸ†αžŽαžΆαŸ†αžšαž”αžŸαŸ‹αžαŸ’αž‰αž»αŸ†αŸ”

αž“αž·αž„αžαŸ†αžŽαž—αŸ’αž‡αžΆαž”αŸ‹αžŠαŸ‚αž›αž”αŸ’αžšαžΎαž€αŸ’αž“αž»αž„αž’αžαŸ’αžαž”αž‘αŸ–

αž”αŸ’αžšαž—αž–: www.habr.com