เบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบญเบตเป€เบกเบฅเปŒเปƒเบ™ Apache Airflow

เบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบญเบตเป€เบกเบฅเปŒเปƒเบ™ Apache Airflow

เบšเปเปˆเบงเปˆเบฒเป€เบ—เบเป‚เบ™เป‚เบฅเบเบตเบˆเบฐเบžเบฑเบ”เบ—เบฐเบ™เบฒเบซเบผเบฒเบเบ›เบฒเบ™เปƒเบ”, เบงเบดเบ—เบตเบเบฒเบ™เบ—เบตเปˆเบฅเป‰เบฒเบชเบฐเป„เบซเบกเบกเบฑเบเบˆเบฐเบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบžเบฑเบ”เบ—เบฐเบ™เบฒ. เบ™เบตเป‰เบญเบฒเบ”เบˆเบฐเป€เบ›เบฑเบ™เบเป‰เบญเบ™เบเบฒเบ™เบซเบฑเบ™เบ›เปˆเบฝเบ™เบ—เบตเปˆเบฅเบฝเบšเบ‡เปˆเบฒเบ, เบ›เบฑเบ”เปƒเบˆเบ‚เบญเบ‡เบกเบฐเบ™เบธเบ”, เบ„เบงเบฒเบกเบ•เป‰เบญเบ‡เบเบฒเบ™เบ”เป‰เบฒเบ™เป€เบ•เบฑเบเป‚เบ™เป‚เบฅเบขเบต, เบซเบผเบทเบชเบดเปˆเบ‡เบญเบทเปˆเบ™. เปƒเบ™เบ‚เบปเบ‡เป€เบ‚เบ”เบเบฒเบ™เบ›เบธเบ‡เปเบ•เปˆเบ‡เบ‚เปเป‰เบกเบนเบ™, เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เปเบกเปˆเบ™เป€เบ›เบตเบ”เป€เบœเบตเบเบซเบผเบฒเบเบ—เบตเปˆเบชเบธเบ”เปƒเบ™เบชเปˆเบงเบ™เบ™เบตเป‰. เบšเปเปˆเบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ™เบขเบฒเบเบเปเบฒเบˆเบฑเบ”เบชเบดเปˆเบ‡เบ™เบตเป‰เบซเบผเบฒเบเบ›เบฒเบ™เปƒเบ”, เปเบ•เปˆเบกเบฒเบฎเบญเบ”เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบ‚เปเป‰เบกเบนเบ™เบชเปˆเบงเบ™เบซเบ™เบถเปˆเบ‡เปเบกเปˆเบ™เบ–เบทเบเบชเบปเปˆเบ‡เป„เบ›เปƒเบ™เบ•เบปเบงเบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเปเบฅเบฐเบญเบตเป€เบกเบง, เบšเปเปˆเป„เบ”เป‰เบเปˆเบฒเบงเป€เบ–เบดเบ‡เบฎเบนเบšเปเบšเบšเป€เบเบปเปˆเบฒเปเบเปˆเบซเบผเบฒเบ. เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบ‚เปเป€เบŠเบทเป‰เบญเป€เบŠเบตเบ™เบ—เปˆเบฒเบ™เปƒเบซเป‰เบ–เบญเบ”เบซเบ™เบถเปˆเบ‡เปƒเบ™เบ—เบฒเบ‡เป€เบฅเบทเบญเบเบชเปเบฒเบฅเบฑเบš Apache Airflow, เบชเบฐเปเบ”เบ‡เบงเบดเบ—เบตเบ—เบตเปˆเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบญเบตเป€เบกเบง.

เบ›เบฐเบงเบฑเบ”เบชเบฒเบ”

เบ‚เปเป‰เบกเบนเบ™เบˆเปเบฒเบ™เบงเบ™เบซเบฅเบฒเบเบเบฑเบ‡เบ–เบทเบเป‚เบญเบ™เบœเปˆเบฒเบ™เบ—เบฒเบ‡เบญเบตเป€เบกเบฅเปŒ, เบˆเบฒเบเบเบฒเบ™เบชเบทเปˆเบชเบฒเบ™เบฅเบฐเบซเบงเปˆเบฒเบ‡เบšเบธเบเบ„เบปเบ™เบเบฑเบšเบกเบฒเบ”เบ•เบฐเบ–เบฒเบ™เบเบฒเบ™เบžเบปเบงเบžเบฑเบ™เบฅเบฐเบซเบงเปˆเบฒเบ‡เบšเปเบฅเบดเบชเบฑเบ”. เบกเบฑเบ™เป€เบ›เบฑเบ™เบเบฒเบ™เบ”เบตเบ–เป‰เบฒเบกเบฑเบ™เป€เบ›เบฑเบ™เป„เบ›เป„เบ”เป‰เบ—เบตเปˆเบˆเบฐเบ‚เบฝเบ™เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเป€เบžเบทเปˆเบญเปƒเบซเป‰เป„เบ”เป‰เบ‚เปเป‰เบกเบนเบ™เบซเบผเบทเป€เบญเบปเบฒเบ„เบปเบ™เบขเบนเปˆเปƒเบ™เบซเป‰เบญเบ‡เบเบฒเบ™เบ—เบตเปˆเบˆเบฐเป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เบ™เบตเป‰เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เปเบซเบผเปˆเบ‡เบ—เบตเปˆเบชเบฐเบ”เบงเบเบเบงเปˆเบฒ, เปเบ•เปˆเป€เบฅเบทเป‰เบญเบเป†เบ™เบตเป‰เบญเบฒเบ”เบˆเบฐเบšเปเปˆเป€เบ›เบฑเบ™เป„เบ›เป„เบ”เป‰. เบงเบฝเบเบ‡เบฒเบ™เบชเบฐเป€เบžเบฒเบฐเบ—เบตเปˆเบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบ›เบฐเป€เบŠเบตเบ™เปเบกเปˆเบ™เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบฅเบฐเบšเบปเบš CRM เบ—เบตเปˆเบกเบตเบŠเบทเปˆเบชเบฝเบ‡เบเบฑเบšเบ„เบฑเบ‡เบ‚เปเป‰เบกเบนเบ™, เปเบฅเบฐเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบเบฑเบšเบฅเบฐเบšเบปเบš OLAP. เบกเบฑเบ™เป€เบเบตเบ”เบ‚เบถเป‰เบ™เปƒเบ™เบ›เบฐเบซเบงเบฑเบ”เบชเบฒเบ”เบงเปˆเบฒเบชเปเบฒเบฅเบฑเบšเบšเปเบฅเบดเบชเบฑเบ”เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบฅเบฐเบšเบปเบšเบ™เบตเป‰เปเบกเปˆเบ™เบชเบฐเบ”เบงเบเปƒเบ™เบžเบทเป‰เบ™เบ—เบตเปˆเบ—เบธเบฅเบฐเบเบดเบ”เป‚เบ”เบเบชเบฐเป€เบžเบฒเบฐ. เป€เบžเบฒเบฐเบชเบฐเบ™เบฑเป‰เบ™, เบ—เบธเบเบ„เบปเบ™เบเปเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เบชเบฒเบกเบฒเบ”เบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เบเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบฅเบฐเบšเบปเบšเบžเบฒเบเบชเปˆเบงเบ™เบ—เบตเบชเบฒเบกเบ™เบตเป‰เป€เบŠเบฑเปˆเบ™เบ”เบฝเบงเบเบฑเบ™. เบเปˆเบญเบ™เบญเบทเปˆเบ™ เปเบปเบ”, เปเบ™เปˆเบ™เบญเบ™, เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เป„เบ›เป„เบ”เป‰เบ‚เบญเบ‡เบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบ API เป€เบ›เบตเบ”เป„เบ”เป‰เบ–เบทเบเบชเบถเบเบชเบฒ. เปเบ•เปˆเบซเบ™เป‰เบฒเป€เบชเบเบ”เบฒเบ, API เบšเปเปˆเป„เบ”เป‰เบเบงเบกเป€เบญเบปเบฒเบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”, เปเบฅเบฐ, เปƒเบ™เบ„เปเบฒเบชเบฑเบšเบ—เบตเปˆเบ‡เปˆเบฒเบเบ”เบฒเบ, เบกเบฑเบ™เบขเบนเปˆเปƒเบ™เบซเบผเบฒเบเบ—เบฒเบ‡เบ—เบตเปˆเบซเบเบฒเบšเบ„เบฒเบ, เปเบฅเบฐเบเบฒเบ™เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบ”เป‰เบฒเบ™เบงเบดเบŠเบฒเบเบฒเบ™เบšเปเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เบซเบผเบทเบšเปเปˆเบชเบฒเบกเบฒเบ”เบ•เบญเบšเบชเบฐเบซเบ™เบญเบ‡เป€เบ„เบดเปˆเบ‡เบซเบ™เบถเปˆเบ‡เป€เบžเบทเปˆเบญเปƒเบซเป‰เบกเบตเบซเบ™เป‰เบฒเบ—เบตเปˆเบ—เบตเปˆเบชเบปเบกเบšเบนเบ™เปเบšเบšเบซเบผเบฒเบเบ‚เบถเป‰เบ™. เปเบ•เปˆเบฅเบฐเบšเบปเบšเบ™เบตเป‰เปƒเบซเป‰เป‚เบญเบเบฒเบ”เบ—เบตเปˆเบˆเบฐเป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบ‚เบฒเบ”เบซเบฒเบเป„เบ›เป€เบ›เบฑเบ™เป„เบฅเบเบฐเบ—เบฒเบ‡เป„เบ›เบชเบฐเบ™เบตเปƒเบ™เบฎเบนเบšเปเบšเบšเบ‚เบญเบ‡เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™ unloading เบฎเบงเบšเบฎเบงเบก.

เบกเบฑเบ™เบ„เบงเบ™เบˆเบฐเบชเบฑเบ‡เป€เบเบ”เบงเปˆเบฒเบ™เบตเป‰เบšเปเปˆเปเบกเปˆเบ™เบเปเบฅเบฐเบ™เบตเบ”เบฝเบงเบ—เบตเปˆเบ—เบธเบฅเบฐเบเบดเบ”เบ•เป‰เบญเบ‡เบเบฒเบ™เป€เบเบฑเบšเบเปเบฒเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบญเบตเป€เบกเบฅเปŒเบซเบผเบทเบœเบนเป‰เบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบ—เบฑเบ™เบ—เบต. เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰, เบžเบงเบเป€เบฎเบปเบฒเบšเปเปˆเบชเบฒเบกเบฒเบ”เบกเบตเบญเบดเบ”เบ—เบดเบžเบปเบ™เบ•เปเปˆเบšเปเบฅเบดเบชเบฑเบ”เบžเบฒเบเบชเปˆเบงเบ™เบ—เบตเบชเบฒเบกเบ—เบตเปˆเบชเบฐเบซเบ™เบญเบ‡เบชเปˆเบงเบ™เบซเบ™เบถเปˆเบ‡เบ‚เบญเบ‡เบ‚เปเป‰เบกเบนเบ™เบžเบฝเบ‡เปเบ•เปˆเปƒเบ™เบงเบดเบ—เบตเบเบฒเบ™เบ™เบตเป‰.

apache airflow

เป€เบžเบทเปˆเบญเบชเป‰เบฒเบ‡เบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL, เบžเบงเบเป€เบฎเบปเบฒเบชเปˆเบงเบ™เบซเบผเบฒเบเปเบกเปˆเบ™เปƒเบŠเป‰ Apache Airflow. เป€เบžเบทเปˆเบญเปƒเบซเป‰เบœเบนเป‰เบญเปˆเบฒเบ™เบ—เบตเปˆเบšเปเปˆเบ„เบธเป‰เบ™เป€เบ„เบตเบเบเบฑเบšเป€เบ—เบเป‚เบ™เป‚เบฅเบเบตเบ™เบตเป‰เป€เบ‚เบปเป‰เบฒเปƒเบˆเบ”เบตเบเบงเปˆเบฒเบงเปˆเบฒเบกเบฑเบ™เบกเบตเบฅเบฑเบเบชเบฐเบ™เบฐเปเบ™เบงเปƒเบ”เปƒเบ™เบชเบฐเบžเบฒเบšเบเบฒเบ™เปเบฅเบฐเป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›, เบ‚เป‰เบญเบเบˆเบฐเบญเบฐเบ—เบดเบšเบฒเบเบชเบญเบ‡เบชเบฒเบกเบ‚เปเป‰เปเบ™เบฐเบ™เปเบฒ.

Apache Airflow เป€เบ›เบฑเบ™เปเบžเบฅเบฐเบ•เบฐเบŸเบญเบกเบŸเบฃเบตเบ—เบตเปˆเบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เป€เบžเบทเปˆเบญเบชเป‰เบฒเบ‡, เบ›เบฐเบ•เบดเบšเบฑเบ”เปเบฅเบฐเบ•เบดเบ”เบ•เบฒเบกเบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL (Extract-Transform-Loading) เปƒเบ™ Python. เปเบ™เบงเบ„เบงเบฒเบกเบ„เบดเบ”เบ•เบปเป‰เบ™เบ•เปเปƒเบ™ Airflow เปเบกเปˆเบ™เบเบฒเบŸ acyclic เบŠเบตเป‰, เบšเปˆเบญเบ™เบ—เบตเปˆเบˆเบธเบ”เบ‚เบญเบ‡เป€เบชเบฑเป‰เบ™เบชเบฐเปเบ”เบ‡เปเบกเปˆเบ™เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบชเบฐเป€เบžเบฒเบฐ, เปเบฅเบฐเบ‚เบญเบšเบ‚เบญเบ‡เบเบฒเบŸเปเบกเปˆเบ™เบเบฒเบ™เป„เบซเบผเป€เบ‚เบปเป‰เบฒเบ‚เบญเบ‡เบเบฒเบ™เบ„เบงเบšเบ„เบธเบกเบซเบผเบทเบ‚เปเป‰เบกเบนเบ™. เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบญเบตเป‰เบ™เบŸเบฑเบ‡เบŠเบฑเบ™ Python เปƒเบ”เป†, เบซเบผเบทเบกเบฑเบ™เบชเบฒเบกเบฒเบ”เบกเบตเป€เบซเบ”เบœเบปเบ™เบ—เบตเปˆเบชเบฑเบšเบชเบปเบ™เบซเบผเบฒเบเบˆเบฒเบเบเบฒเบ™เป€เบญเบตเป‰เบ™เบซเบผเบฒเบเบซเบ™เป‰เบฒเบ—เบตเปˆเบ•เบฒเบกเบฅเปเบฒเบ”เบฑเบšเปƒเบ™เบชเบฐเบžเบฒเบšเบเบฒเบ™เบ‚เบญเบ‡เบซเป‰เบญเบ‡เบฎเบฝเบ™. เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ‡เบฒเบ™เป€เบฅเบทเป‰เบญเบเป†เบ—เบตเปˆเบชเบธเบ”, เบกเบตเบเบฒเบ™เบžเบฑเบ”เบ—เบฐเบ™เบฒเบ—เบตเปˆเบเบฝเบกเบžเป‰เบญเบกเบซเบผเบฒเบเปเบฅเป‰เบงเบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เป€เบ›เบฑเบ™เบ‚เบฐเบšเบงเบ™เบเบฒเบ™. เบเบฒเบ™เบžเบฑเบ”เบ—เบฐเบ™เบฒเบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบ›เบฐเบเบญเบšเบกเบต:

  • operators - เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป‚เบญเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบšเปˆเบญเบ™เบซเบ™เบถเปˆเบ‡เป„เบ›เบซเบฒเบšเปˆเบญเบ™เบญเบทเปˆเบ™, เบ•เบปเบงเบขเปˆเบฒเบ‡, เบˆเบฒเบเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เป„เบ›เบซเบฒเบ„เบฑเบ‡เบ‚เปเป‰เบกเบนเบ™;
  • เป€เบŠเบฑเบ™เป€เบŠเบต - เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบฅเปเบ–เป‰เบฒเบเบฒเบ™เบ›เบฐเบเบปเบ”เบ•เบปเบงเบ‚เบญเบ‡เป€เบซเบ”เบเบฒเบ™เบชเบฐเป€เบžเบฒเบฐเปƒเบ”เบซเบ™เบถเปˆเบ‡เปเบฅเบฐเบŠเบตเป‰เบ™เปเบฒเบเบฒเบ™เป„เบซเบผเป€เบ‚เบปเป‰เบฒเบ‚เบญเบ‡เบเบฒเบ™เบ„เบงเบšเบ„เบธเบกเป„เบ›เบชเบนเปˆเบˆเบธเบ”เป†เบ•เปเปˆเบกเบฒเบ‚เบญเบ‡เบเบฒเบŸ;
  • hooks - เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบฅเบฐเบ”เบฑเบšเบ•เปˆเปเบฒ, เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบเบปเบเบ•เบปเบงเบขเปˆเบฒเบ‡, เป€เบžเบทเปˆเบญเปƒเบซเป‰เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ (เปƒเบŠเป‰เปƒเบ™เบ„เปเบฒเบ–เบฐเปเบซเบผเบ‡เบเบฒเบ™);
  • เปเบฅเบฐเบญเบทเปˆเบ™เป†.

เบกเบฑเบ™เบˆเบฐเบšเปเปˆเป€เบซเบกเบฒเบฐเบชเบปเบกเบ—เบตเปˆเบˆเบฐเบญเบฐเบ—เบดเบšเบฒเบ Apache Airflow เปƒเบ™เบฅเบฒเบเบฅเบฐเบญเบฝเบ”เปƒเบ™เบšเบปเบ”เบ„เบงเบฒเบกเบ™เบตเป‰. เบเบฒเบ™เปเบ™เบฐเบ™เปเบฒเบชเบฑเป‰เบ™เป†เบชเบฒเบกเบฒเบ”เป€เบšเบดเปˆเบ‡เป„เบ”เป‰ เบ—เบตเปˆเบ™เบตเป‰ เบซเบผเบท เบ—เบตเปˆเบ™เบตเป‰.

Hook เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™

เบเปˆเบญเบ™เบญเบทเปˆเบ™ เปเบปเบ”, เป€เบžเบทเปˆเบญเปเบเป‰เป„เบ‚เบšเบฑเบ™เบซเบฒ, เบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบ‚เบฝเบ™ hook เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”:

  • เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบšเบญเบตเป€เบกเบง
  • เบŠเบญเบเบซเบฒเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡
  • เป„เบ”เป‰โ€‹เบฎเบฑเบšโ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹เบˆเบฒเบโ€‹เบˆเบปเบ”โ€‹เบซเบกเบฒเบโ€‹เบชเบฐโ€‹เบšเบฑเบšโ€‹.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ั ัะปะตะบั‚ั€ะพะฝะฝะพะน ะฟะพั‡ั‚ั‹

           :param imap_conn_id:       ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะพะดะบะปัŽั‡ะตะฝะธั ะบ ะฟะพั‡ั‚ะต
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ะŸะพะดะบะปัŽั‡ะฐะตะผัั ะบ ะฟะพั‡ั‚ะต
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ะœะตั‚ะพะด ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะธะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ะฐ ะฟะพัะปะตะดะฝะตะณะพ ะฟะธััŒะผะฐ, 
            ัƒะดะพะฒะปะตั‚ะฒะพั€ะฐััŽั‰ะตะณะพ ัƒัะปะพะฒะธัะผ ะฟะพะธัะบะฐ

            :param check_seen:      ะžั‚ะผะตั‡ะฐั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟะธััŒะผะพ ะบะฐะบ ะฟั€ะพั‡ะธั‚ะฐะฝะฝะพะต
            :type check_seen:       bool
            :param box:             ะะฐะธะผะตะฝะพะฒะฐะฝะธั ัั‰ะธะบะฐ
            :type box:              string
            :param condition:       ะฃัะปะพะฒะธั ะฟะพะธัะบะฐ ะฟะธัะตะผ
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("ะ’ ัั‰ะธะบะต ะฝะฐะนะดะตะฝั‹ ัะปะตะดัƒัŽั‰ะธะต ะฟะธััŒะผะฐ: " + str(mail_ids))

        if not mail_ids:
            logging.info("ะะต ะฝะฐะนะดะตะฝะพ ะฝะพะฒั‹ั… ะฟะธัะตะผ")
            return None

        mail_id = mail_ids[0]

        # ะตัะปะธ ั‚ะฐะบะธั… ะฟะธัะตะผ ะฝะตัะบะพะปัŒะบะพ
        if len(mail_ids) > 1:
            # ะพั‚ะผะตั‡ะฐะตะผ ะพัั‚ะฐะปัŒะฝั‹ะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผะธ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # ะฒะพะทะฒั€ะฐั‰ะฐะตะผ ะฟะพัะปะตะดะฝะตะต
            mail_id = mail_ids[-1]

        # ะฝัƒะถะฝะพ ะปะธ ะพั‚ะผะตั‚ะธั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

เป€เบซเบ”เบœเบปเบ™เปเบกเปˆเบ™เบ™เบตเป‰: เบžเบงเบเป€เบฎเบปเบฒเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ, เบŠเบญเบเบซเบฒเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ—เบตเปˆเบเปˆเบฝเบงเบ‚เป‰เบญเบ‡เบ—เบตเปˆเบชเบธเบ”, เบ–เป‰เบฒเบกเบตเบ„เบปเบ™เบญเบทเปˆเบ™, เบžเบงเบเป€เบฎเบปเบฒเบšเปเปˆเบชเบปเบ™เปƒเบˆเบžเบงเบเบกเบฑเบ™. เบŸเบฑเบ‡เบŠเบฑเบ™เบ™เบตเป‰เบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰, เป€เบžเบฒเบฐเบงเปˆเบฒเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ•เปเปˆเบกเบฒเบกเบตเบ‚เปเป‰เบกเบนเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”เบ‚เบญเบ‡เบ•เบปเบงเบซเบ™เบฑเบ‡เบชเบทเบเปˆเบญเบ™เบซเบ™เป‰เบฒ. เบ–เป‰เบฒเบ™เบตเป‰เบšเปเปˆเปเบกเปˆเบ™เบเปเบฅเบฐเบ™เบต, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบชเบปเปˆเบ‡เบ„เบทเบ™ array เบ‚เบญเบ‡เบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”, เบซเบผเบทเบ›เบธเบ‡เปเบ•เปˆเบ‡เบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ—เปเบฒเบญเบดเบ”, เปเบฅเบฐเบชเปˆเบงเบ™เบ—เบตเปˆเป€เบซเบผเบทเบญเปƒเบ™ pass เบ•เปเปˆเป„เบ›. เป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡, เบ•เบฒเบกเบชเบฐเป€เบซเบกเบต, เปเบกเปˆเบ™เบ‚เบถเป‰เบ™เบเบฑเบšเบงเบฝเบเบ‡เบฒเบ™.

เบžเบงเบเป€เบฎเบปเบฒเป€เบžเบตเปˆเบกเบชเบญเบ‡เบซเบ™เป‰เบฒเบ—เบตเปˆเบŠเปˆเบงเบเบชเปเบฒเบฅเบฑเบš hook: เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ”เบฒเบงเป‚เบซเบผเบ”เป„เบŸเบฅเปŒเปเบฅเบฐเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ”เบฒเบงเป‚เบซเบผเบ”เป„เบŸเบฅเปŒเป‚เบ”เบเปƒเบŠเป‰เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฒเบเบญเบตเป€เบกเบฅเปŒ. เป‚เบ”เบเบงเบดเบ—เบตเบ—เบฒเบ‡เบเบฒเบ™, เบžเบงเบเป€เบ‚เบปเบฒเบชเบฒเบกเบฒเบ”เบ–เบทเบเบเป‰เบฒเบเป„เบ›เบเบฑเบ‡เบœเบนเป‰เบ›เบฐเบ•เบดเบšเบฑเบ”เบ‡เบฒเบ™, เบกเบฑเบ™เบ‚เบถเป‰เบ™เบเบฑเบšเบ„เบงเบฒเบกเบ–เบตเปˆเบ‚เบญเบ‡เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบซเบ™เป‰เบฒเบ—เบตเปˆเบ™เบตเป‰. เบชเบดเปˆเบ‡เบญเบทเปˆเบ™เบ—เบตเปˆเบˆเบฐเป€เบžเบตเปˆเบกเปƒเบชเปˆ hook, เบญเบตเบเป€เบ—เบทเปˆเบญเบซเบ™เบถเปˆเบ‡, เปเบกเปˆเบ™เบ‚เบถเป‰เบ™เบเบฑเบšเบงเบฝเบเบ‡เบฒเบ™: เบ–เป‰เบฒเป„เบŸเบฅเปŒเป„เบ”เป‰เบฎเบฑเบšเบ—เบฑเบ™เบ—เบตเปƒเบ™เบˆเบปเบ”เบซเบกเบฒเบ, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ”เบฒเบงเป‚เบซเบฅเบ”เป„เบŸเบฅเปŒเปเบ™เบšเบเบฑเบšเบˆเบปเบ”เบซเบกเบฒเบ, เบ–เป‰เบฒเบ‚เปเป‰เบกเบนเบ™เป„เบ”เป‰เบฎเบฑเบšเปƒเบ™เบˆเบปเบ”เบซเบกเบฒเบ, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เปเบเบเบˆเบปเบ”เบซเบกเบฒเบ, เปเบฅเบฐเบญเบทเปˆเบ™เป† เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡เบ‚เป‰เบญเบ, เบˆเบปเบ”เบซเบกเบฒเบเบกเบฒเบžเป‰เบญเบกเบเบฑเบšเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบซเบ™เบถเปˆเบ‡เป„เบ›เบซเบฒเบšเปˆเบญเบ™เป€เบเบฑเบšเบกเป‰เบฝเบ™, เป€เบŠเบดเปˆเบ‡เบ‚เป‰เบญเบเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เปƒเบชเปˆเบขเบนเปˆเปƒเบ™เบชเบฐเบ–เบฒเบ™เบ—เบตเปˆเบ—เบตเปˆเปเบ™เปˆเบ™เบญเบ™เปเบฅเบฐเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบ›เบธเบ‡เปเบ•เปˆเบ‡เบ•เบทเปˆเบกเบญเบตเบ.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ

            :param url:              ะะดั€ะตั ะทะฐะณั€ัƒะทะบะธ
            :type url:               string
            :param path:             ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:              string
            :param chunk_size:       ะŸะพ ัะบะพะปัŒะบะพ ะฑะฐะนั‚ะพะฒ ะฟะธัะฐั‚ัŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ ะฟะพ ััั‹ะปะบะต ะธะท ะฟะธััŒะผะฐ

            :param mail_id:         ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะธััŒะผะฐ
            :type mail_id:          string
            :param path:            ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

เบฅเบฐเบซเบฑเบ”เปเบกเปˆเบ™เบ‡เปˆเบฒเบเบ”เบฒเบ, เบชเบฐเบ™เบฑเป‰เบ™เบกเบฑเบ™เบšเปเปˆเบ„เปˆเบญเบเบ•เป‰เบญเบ‡เบเบฒเบ™เบ„เปเบฒเบญเบฐเบ—เบดเบšเบฒเบเบ•เบทเปˆเบกเบญเบตเบ. เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบžเบฝเบ‡เปเบ•เปˆเบˆเบฐเบšเบญเบเบ—เปˆเบฒเบ™เบเปˆเบฝเบงเบเบฑเบšเป€เบชเบฑเป‰เบ™ magic imap_conn_id. Apache Airflow เป€เบเบฑเบšเบฎเบฑเบเบชเบฒเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ (เป€เบ‚เบปเป‰เบฒเบชเบนเปˆเบฅเบฐเบšเบปเบš, เบฅเบฐเบซเบฑเบ”เบœเปˆเบฒเบ™, เบ—เบตเปˆเบขเบนเปˆ, เปเบฅเบฐเบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบญเบทเปˆเบ™เป†) เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เป€เบ‚เบปเป‰เบฒเป€เบ–เบดเบ‡เป„เบ”เป‰เป‚เบ”เบเบ•เบปเบงเบฅเบฐเบšเบธเบชเบฐเบ•เบฃเบดเบ‡. เบชเบฒเบเบ•เบฒ, เบเบฒเบ™เบˆเบฑเบ”เบเบฒเบ™เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบ™เบตเป‰

เบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบญเบตเป€เบกเบฅเปŒเปƒเบ™ Apache Airflow

เป€เบŠเบฑเบ™เป€เบŠเบตเบฅเปเบ–เป‰เบฒเบ‚เปเป‰เบกเบนเบ™

เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบžเบงเบเป€เบฎเบปเบฒเบฎเบนเป‰เบงเบดเบ—เบตเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ เปเบฅเบฐเบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเป€เบกเบฅเปเบฅเป‰เบง, เบ•เบญเบ™เบ™เบตเป‰เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เบ‚เบฝเบ™เป€เบŠเบฑเบ™เป€เบŠเบตเป€เบžเบทเปˆเบญเบฅเปเบ–เป‰เบฒเบžเบงเบเบกเบฑเบ™เป„เบ”เป‰. เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡เบ‚เป‰เบญเบ, เบกเบฑเบ™เบšเปเปˆเป„เบ”เป‰เป€เบฎเบฑเบ”เบงเบฝเบเปƒเบ™เบเบฒเบ™เบ‚เบฝเบ™เบ•เบปเบงเบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบ—เบฑเบ™เบ—เบตเบ—เบตเปˆเบˆเบฐเบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™, เบ–เป‰เบฒเบกเบต, เป€เบžเบฒเบฐเบงเปˆเบฒเบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบญเบทเปˆเบ™เป†เป€เบฎเบฑเบ”เบงเบฝเบเป‚เบ”เบเบญเบตเบ‡เปƒเบชเปˆเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเป„เบ”เป‰เบฎเบฑเบšเบˆเบฒเบเป€เบกเบฅ, เบฅเบงเบกเบ—เบฑเบ‡เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบเปˆเบฝเบงเบ‚เป‰เบญเบ‡เบˆเบฒเบเปเบซเบผเปˆเบ‡เบญเบทเปˆเบ™เป† (API, เป‚เบ—เบฅเบฐเบชเบฑเบš. , web metrics, etc.) เปเบฅเบฐเบญเบทเปˆเบ™เป†). เบ‚เป‰เบญเบเบˆเบฐเบเบปเบเบ•เบปเบงเบขเปˆเบฒเบ‡เปƒเบซเป‰เป€เบˆเบปเป‰เบฒ. เบœเบนเป‰เปƒเบŠเป‰เปƒเบซเบกเปˆเป„เบ”เป‰เบ›เบฒเบเบปเบ”เบขเบนเปˆเปƒเบ™เบฅเบฐเบšเบปเบš CRM, เปเบฅเบฐเบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบšเปเปˆเบฎเบนเป‰เบเปˆเบฝเบงเบเบฑเบš UUID เบ‚เบญเบ‡เบฅเบฒเบง. เบˆเบฒเบเบ™เบฑเป‰เบ™, เป€เบกเบทเปˆเบญเบžเบฐเบเบฒเบเบฒเบกเบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเป‚เบ—เบฅเบฐเบชเบฑเบš SIP, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบฎเบฑเบšเบชเบฒเบเบ—เบตเปˆเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบš UUID เบ‚เบญเบ‡เบกเบฑเบ™, เปเบ•เปˆเบžเบงเบเป€เบฎเบปเบฒเบšเปเปˆเบชเบฒเบกเบฒเบ”เบšเบฑเบ™เบ—เบถเบ เปเบฅเบฐเบ™เบณเปƒเบŠเป‰เป„เบ”เป‰เบขเปˆเบฒเบ‡เบ–เบทเบเบ•เป‰เบญเบ‡. เปƒเบ™เป€เบฅเบทเปˆเบญเบ‡เบ”เบฑเปˆเบ‡เบเปˆเบฒเบง, เบกเบฑเบ™เป€เบ›เบฑเบ™เบชเบดเปˆเบ‡เบชเปเบฒเบ„เบฑเบ™เบ—เบตเปˆเบˆเบฐเบฎเบฑเบเบชเบฒเบ„เบงเบฒเบกเป€เบžเบดเปˆเบ‡เบžเปเปƒเบˆเบ‚เบญเบ‡เบ‚เปเป‰เบกเบนเบ™, เป‚เบ”เบเบชเบฐเป€เบžเบฒเบฐเบ–เป‰เบฒเบžเบงเบเป€เบ‚เบปเบฒเบกเบฒเบˆเบฒเบเปเบซเบผเปˆเบ‡เบ•เปˆเบฒเบ‡เป†. เป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰เปเบกเปˆเบ™, เปเบ™เปˆเบ™เบญเบ™, เบกเบฒเบ”เบ•เบฐเบเบฒเบ™เบšเปเปˆเบžเบฝเบ‡เบžเปเป€เบžเบทเปˆเบญเบฎเบฑเบเบชเบฒเบ„เบงเบฒเบกเบชเบปเบกเบšเบนเบ™เบ‚เบญเบ‡เบ‚เปเป‰เบกเบนเบ™, เปเบ•เปˆเปƒเบ™เบšเบฒเบ‡เบเปเบฅเบฐเบ™เบตเบกเบฑเบ™เป€เบ›เบฑเบ™เบชเบดเปˆเบ‡เบˆเปเบฒเป€เบ›เบฑเบ™. เปเบกเปˆเบ™เปเบฅเป‰เบง, เปเบฅเบฐ idling เป€เบžเบทเปˆเบญเบ„เบญเบšเบ„เบญเบ‡เบŠเบฑเบšเบžเบฐเบเบฒเบเบญเบ™เปเบกเปˆเบ™ irrational.

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เป€เบŠเบฑเบ™เป€เบŠเบตเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเป€เบ›เบตเบ”เบ•เบปเบงเบ•เบฒเบกเปเบ™เบงเบ•เบฑเป‰เบ‡เบ•เปเปˆเบกเบฒเบ‚เบญเบ‡เบเบฒเบŸเบ–เป‰เบฒเบกเบตเบ‚เปเป‰เบกเบนเบ™เบชเบปเบ”เบขเบนเปˆเปƒเบ™เป€เบกเบฅ, เปเบฅเบฐเบเบฑเบ‡เบซเบกเบฒเบเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบœเปˆเบฒเบ™เบกเบฒเบงเปˆเบฒเบšเปเปˆเบเปˆเบฝเบงเบ‚เป‰เบญเบ‡.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

เบžเบงเบเป€เบฎเบปเบฒเบฎเบฑเบš เปเบฅเบฐเบ™เบณเปƒเบŠเป‰เบ‚เปเป‰เบกเบนเบ™

เป€เบžเบทเปˆเบญเบฎเบฑเบšเปเบฅเบฐเบ›เบฐเบกเบงเบ™เบœเบปเบ™เบ‚เปเป‰เบกเบนเบ™, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ‚เบฝเบ™เบ•เบปเบงเบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบ, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ™เปเบฒเปƒเบŠเป‰เป€เบ„เบทเปˆเบญเบ‡เบ—เบตเปˆเบเบฝเบกเบžเป‰เบญเบก. เบ™เบฑเบšเบ•เบฑเป‰เบ‡เปเบ•เปˆเป€เบ–เบดเบ‡เบ•เบญเบ™เบ™เบฑเป‰เบ™เป€เบซเบ”เบœเบปเบ™เปเบกเปˆเบ™เป€เบ›เบฑเบ™เป€เบฅเบทเปˆเบญเบ‡เป€เบฅเบฑเบเป†เบ™เป‰เบญเบเป† - เป€เบžเบทเปˆเบญเป€เบญเบปเบฒเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบˆเบปเบ”เบซเบกเบฒเบ, เบ•เบปเบงเบขเปˆเบฒเบ‡, เบ‚เป‰เบญเบเปเบ™เบฐเบ™เปเบฒ PythonOperator เบกเบฒเบ”เบ•เบฐเบ–เบฒเบ™.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# ะกั‚ะฐะฝะดะฐั€ั‚ะฝะพะต ะบะพะฝั„ะธะณัƒั€ะธั€ะพะฒะฐะฝะธะต ะณั€ะฐั„ะฐ
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ะžะฟั€ะตะดะตะปัะตะผ ัะตะฝัะพั€
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# ะคัƒะฝะบั†ะธั ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ะธะท ะฟะธััŒะผะฐ
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฒะตั€ัˆะธะฝ ะณั€ะฐั„ะฐ
...

# ะ—ะฐะดะฐะตะผ ัะฒัะทัŒ ะฝะฐ ะณั€ะฐั„ะต
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฟะพั‚ะพะบะพะฒ ัƒะฟั€ะฐะฒะปะตะฝะธั

เป‚เบ”เบเบงเบดเบ—เบตเบ—เบฒเบ‡เบเบฒเบ™, เบ–เป‰เบฒเบˆเบปเบ”เบซเบกเบฒเบเบ‚เบญเบ‡เบšเปเบฅเบดเบชเบฑเบ”เบ‚เบญเบ‡เบ—เปˆเบฒเบ™เบขเบนเปˆเปƒเบ™ mail.ru, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบ—เปˆเบฒเบ™เบˆเบฐเบšเปเปˆเบชเบฒเบกเบฒเบ”เบŠเบญเบเบซเบฒเบˆเบปเบ”เบซเบกเบฒเบเป‚เบ”เบเบซเบปเบงเบ‚เปเป‰, เบœเบนเป‰เบชเบปเปˆเบ‡, เปเบฅเบฐเบญเบทเปˆเบ™เป†. เบเบฑเบšเป„เบ›เปƒเบ™เบ›เบต 2016, เบžเบงเบเป€เบ‚เบปเบฒเป€เบˆเบปเป‰เบฒเบชเบฑเบ™เบเบฒเบงเปˆเบฒเบˆเบฐเปเบ™เบฐเบ™เปเบฒเบกเบฑเบ™, เปเบ•เปˆเบ›เบฒเบเบปเบ”เบ‚เบทเป‰เบ™เบงเปˆเบฒเบกเบตเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เบˆเบดเบ”เปƒเบˆเบ‚เบญเบ‡เป€เบ‚เบปเบฒเป€เบˆเบปเป‰เบฒ. เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเป„เบ”เป‰เปเบเป‰เป„เบ‚เบšเบฑเบ™เบซเบฒเบ™เบตเป‰เป‚เบ”เบเบเบฒเบ™เบชเป‰เบฒเบ‡เป‚เบŸเป€เบ”เบตเปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบเบชเปเบฒเบฅเบฑเบšเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ—เบตเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เปเบฅเบฐเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเบเบญเบ‡เบชเปเบฒเบฅเบฑเบšเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ—เบตเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เปƒเบ™เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบšเป€เบงเบฑเบšเป€เบกเบฅ. เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบžเบฝเบ‡เปเบ•เปˆเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เปเบฅเบฐเป€เบ‡เบทเปˆเบญเบ™เป„เบ‚เบ—เบตเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ„เบปเป‰เบ™เบซเบฒ, เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡เบ‚เป‰เบญเบ, เบžเบฝเบ‡เปเบ•เปˆ (UNSEEN) เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เป‚เบŸเบ™เป€เบ”เบตเบ™เบตเป‰.

เบชเบฐเบซเบผเบธเบš, เบžเบงเบเป€เบฎเบปเบฒเบกเบตเบฅเปเบฒเบ”เบฑเบšเบ•เปเปˆเป„เบ›เบ™เบตเป‰: เบžเบงเบเป€เบฎเบปเบฒเบเบงเบ”เป€เบšเบดเปˆเบ‡เบงเปˆเบฒเบกเบตเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เปƒเบซเบกเปˆเบ—เบตเปˆเบเบปเบ‡เบเบฑเบšเป€เบ‡เบทเปˆเบญเบ™เป„เบ‚, เบ–เป‰เบฒเบงเปˆเบฒเบกเบต, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบ”เบฒเบงเป‚เบซเบฅเบ”เบšเปˆเบญเบ™เป€เบเบฑเบšเบกเป‰เบฝเบ™เป‚เบ”เบเปƒเบŠเป‰เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฒเบเบˆเบปเบ”เบซเบกเบฒเบเบชเบฐเบšเบฑเบšเบชเบธเบ”เบ—เป‰เบฒเบ.
เบžเบฒเบเปƒเบ•เป‰เบˆเบธเบ”เบชเบธเบ”เบ—เป‰เบฒเบ, เบกเบฑเบ™เบ–เบทเบเบเบปเบเป€เบงเบฑเป‰เบ™เบงเปˆเบฒเปเบŸเป‰เบกเบˆเบฑเบ”เป€เบเบฑเบšเบ™เบตเป‰เบˆเบฐเบ–เบทเบเบ–เบญเบ”เบญเบญเบ, เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบšเปˆเบญเบ™เป€เบเบฑเบšเบกเป‰เบฝเบ™เบˆเบฐเบ–เบทเบเบฅเบถเบšเบฅเป‰เบฒเบ‡เปเบฅเบฐเบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™, เปเบฅเบฐเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบชเบดเปˆเบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”เบˆเบฐเป„เบ›เบ•เบทเปˆเบกเบญเบตเบเปƒเบ™เบ—เปเปˆเบ‚เบญเบ‡เบ‚เบฐเบšเบงเบ™เบเบฒเบ™ ETL, เปเบ•เปˆเบ™เบตเป‰เปเบกเปˆเบ™เป€เบเบตเบ™เบเบงเปˆเบฒเบ™เบตเป‰. เบ‚เบญเบšเป€เบ‚เบ”เบ‚เบญเบ‡เบšเบปเบ”เบ„เบงเบฒเบก. เบ–เป‰เบฒเบกเบฑเบ™เบญเบญเบเบกเบฒเบ—เบตเปˆเบซเบ™เป‰เบฒเบชเบปเบ™เปƒเบˆเปเบฅเบฐเป€เบ›เบฑเบ™เบ›เบฐเป‚เบซเบเบ”, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบ‚เป‰เบญเบเบเบดเบ™เบ”เบตเบ—เบตเปˆเบˆเบฐเบญเบฐเบ—เบดเบšเบฒเบเบงเบดเบ—เบตเปเบเป‰เป„เบ‚ ETL เปเบฅเบฐเบชเปˆเบงเบ™เบ‚เบญเบ‡เบžเบงเบเป€เบ‚เบปเบฒเบชเปเบฒเบฅเบฑเบš Apache Airflow.

เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™: www.habr.com

เป€เบžเบตเปˆเบกเบ„เบงเบฒเบกเบ„เบดเบ”เป€เบซเบฑเบ™