เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹เชฎเชพเช‚ เชˆเชฎเซ‡เชฒเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพ เชฎเชพเชŸเซ‡ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹เชฎเชพเช‚ เชˆเชฎเซ‡เชฒเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพ เชฎเชพเชŸเซ‡ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ

เชญเชฒเซ‡ เช—เชฎเซ‡ เชคเซ‡เชŸเชฒเซ€ เชŸเซ‡เช•เซเชจเซ‹เชฒเซ‹เชœเซ€เชจเซ‹ เชตเชฟเช•เชพเชธ เชฅเชพเชฏ, เชœเซ‚เชจเชพ เช…เชญเชฟเช—เชฎเซ‹เชจเซ‹ เชฆเซ‹เชฐ เชนเช‚เชฎเซ‡เชถเชพ เชตเชฟเช•เชพเชธเชจเซ€ เชชเชพเช›เชณ เชฐเชนเซ‡ เช›เซ‡. เช† เชเช• เชธเชฐเชณ เชธเช‚เช•เซเชฐเชฎเชฃ, เชฎเชพเชจเชตเซ€เชฏ เชชเชฐเชฟเชฌเชณเซ‹, เชคเช•เชจเซ€เช•เซ€ เชœเชฐเซ‚เชฐเชฟเชฏเชพเชคเซ‹ เช…เชฅเชตเชพ เช…เชจเซเชฏ เช•เช‚เชˆเช•เชจเซ‡ เช•เชพเชฐเชฃเซ‡ เชนเซ‹เชˆ เชถเช•เซ‡ เช›เซ‡. เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช—เชจเชพ เช•เซเชทเซ‡เชคเซเชฐเชฎเชพเช‚, เชกเซ‡เชŸเชพ เชธเซเชคเซเชฐเซ‹เชคเซ‹ เช† เชญเชพเช—เชฎเชพเช‚ เชธเซŒเชฅเซ€ เชตเชงเซ เช›เชคเซ€ เช•เชฐเซ‡ เช›เซ‡. เชญเชฒเซ‡ เช†เชชเชฃเซ‡ เช†เชฎเชพเช‚เชฅเซ€ เช›เซเชŸเช•เชพเชฐเซ‹ เชฎเซ‡เชณเชตเชตเชพเชจเซเช‚ เช•เซ‡เชŸเชฒเซเช‚ เชธเชชเชจเซเช‚ เชœเซ‹เชคเชพ เชนเซ‹เชˆเช, เชชเชฐเช‚เชคเซ เช…เชคเซเชฏเชพเชฐ เชธเซเชงเซ€ เชกเซ‡เชŸเชพเชจเซ‹ เชเช• เชญเชพเช— เช‡เชจเซเชธเซเชŸเชจเซเชŸ เชฎเซ‡เชธเซ‡เชจเซเชœเชฐเซเชธ เช…เชจเซ‡ เชˆเชฎเซ‡เชˆเชฒเชฎเชพเช‚ เชฎเซ‹เช•เชฒเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡, เชตเชงเซ เชชเซเชฐเชพเชšเซ€เชจ เชซเซ‹เชฐเซเชฎเซ‡เชŸเชจเซ‹ เช‰เชฒเซเชฒเซ‡เช– เชจ เช•เชฐเชตเซ‹. เชนเซเช‚ เชคเชฎเชจเซ‡ เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ เชฎเชพเชŸเซ‡เชจเชพ เชตเชฟเช•เชฒเซเชชเซ‹เชฎเชพเช‚เชฅเซ€ เชเช•เชจเซ‡ เชกเชฟเชธเชเชธเซ‡เชฎเซเชฌเชฒ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช†เชฎเช‚เชคเซเชฐเชฟเชค เช•เชฐเซเช‚ เช›เซเช‚, เชคเชฎเซ‡ เช‡เชฎเซ‡เช‡เชฒเซเชธเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เชฒเชˆ เชถเช•เซ‹ เช›เซ‹ เชคเซ‡ เชธเชฎเชœเชพเชตเซ‡ เช›เซ‡.

เชชเซเชฐเชพเช—เซˆเชคเชฟเชนเชพเชธเชฟเช•

เช†เช‚เชคเชฐเชตเซเชฏเช•เซเชคเชฟเชคเซเชต เชธเช‚เชšเชพเชฐเชฅเซ€ เชฎเชพเช‚เชกเซ€เชจเซ‡ เช•เช‚เชชเชจเซ€เช“ เชตเชšเซเชšเซ‡เชจเซ€ เช•เซเชฐเชฟเชฏเชพเชชเซเชฐเชคเชฟเช•เซเชฐเชฟเชฏเชพเชจเชพ เชงเซ‹เชฐเชฃเซ‹ เชธเซเชงเซ€ เชˆ-เชฎเซ‡เชฒ เชฆเซเชตเชพเชฐเชพ เชนเชœเซ เชชเชฃ เช˜เชฃเซ‹ เชกเซ‡เชŸเชพ เชŸเซเชฐเชพเชจเซเชธเชซเชฐ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡. เชคเซ‡ เชธเชพเชฐเซเช‚ เช›เซ‡ เชœเซ‹ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพ เชฎเชพเชŸเซ‡ เช‡เชจเซเชŸเชฐเชซเซ‡เชธ เชฒเช–เชตเชพเชจเซเช‚ เช…เชฅเชตเชพ เช“เชซเชฟเชธเชฎเชพเช‚ เชเชตเชพ เชฒเซ‹เช•เซ‹เชจเซ‡ เชฎเซ‚เช•เชตเชพเชจเซเช‚ เชถเช•เซเชฏ เชนเซ‹เชฏ เช•เซ‡ เชœเซ‡เช“ เช† เชฎเชพเชนเชฟเชคเซ€เชจเซ‡ เชตเชงเซ เช…เชจเซเช•เซ‚เชณ เชธเซเชคเซเชฐเซ‹เชคเซ‹เชฎเชพเช‚ เชฆเชพเช–เชฒ เช•เชฐเชถเซ‡, เชชเชฐเช‚เชคเซ เช˜เชฃเซ€เชตเชพเชฐ เช† เชถเช•เซเชฏ เชจ เชชเชฃ เชฌเชจเซ‡. เชฎเซ‡เช‚ เชœเซ‡ เชตเชฟเชถเชฟเชทเซเชŸ เช•เชพเชฐเซเชฏเชจเซ‹ เชธเชพเชฎเชจเซ‹ เช•เชฐเชตเซ‹ เชชเชกเซเชฏเซ‹ เชนเชคเซ‹ เชคเซ‡ เช•เซเช–เซเชฏเชพเชค CRM เชธเชฟเชธเซเชŸเชฎเชจเซ‡ เชกเซ‡เชŸเชพ เชตเซ‡เชฐเชนเชพเช‰เชธ เชธเชพเชฅเซ‡ เช…เชจเซ‡ เชชเช›เซ€ OLAP เชธเชฟเชธเซเชŸเชฎ เชธเชพเชฅเซ‡ เชœเซ‹เชกเชตเชพเชจเซเช‚ เชนเชคเซเช‚. เชเชคเชฟเชนเชพเชธเชฟเช• เชฐเซ€เชคเซ‡ เชเชตเซเช‚ เชฌเชจเซเชฏเซเช‚ เช›เซ‡ เช•เซ‡ เช…เชฎเชพเชฐเซ€ เช•เช‚เชชเชจเซ€ เชฎเชพเชŸเซ‡ เช† เชธเชฟเชธเซเชŸเชฎเชจเซ‹ เช‰เชชเชฏเซ‹เช— เชตเซเชฏเชตเชธเชพเชฏเชจเชพ เชšเซ‹เช•เซเช•เชธ เช•เซเชทเซ‡เชคเซเชฐเชฎเชพเช‚ เช…เชจเซเช•เซ‚เชณ เชนเชคเซ‹. เชคเซ‡เชฅเซ€, เชฆเชฐเซ‡เช• เชตเซเชฏเช•เซเชคเชฟ เช–เชฐเซ‡เช–เชฐ เช† เชคเซƒเชคเซ€เชฏ-เชชเช•เซเชท เชธเชฟเชธเซเชŸเชฎเชจเชพ เชกเซ‡เชŸเชพ เชธเชพเชฅเซ‡ เชชเชฃ เช•เชพเชฐเซเชฏ เช•เชฐเชตเชพ เชธเช•เซเชทเชฎ เชฌเชจเชตเชพ เชฎเชพเช‚เช—เซ‡ เช›เซ‡. เชธเซŒ เชชเซเชฐเชฅเชฎ, เช…เชฒเชฌเชคเซเชค, เช“เชชเชจ API เชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพเชจเซ€ เชถเช•เซเชฏเชคเชพเชจเซ‹ เช…เชญเซเชฏเชพเชธ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเซ‹ เชนเชคเซ‹. เช•เชฎเชจเชธเซ€เชฌเซ‡, API เช เชคเชฎเชพเชฎ เชœเชฐเซ‚เชฐเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพเชจเซ‡ เช†เชตเชฐเซ€ เชฒเซ€เชงเซเช‚ เชจ เชนเชคเซเช‚, เช…เชจเซ‡, เชธเชฐเชณ เชถเชฌเซเชฆเซ‹เชฎเชพเช‚ เช•เชนเซ€เช เชคเซ‹, เชคเซ‡ เช˜เชฃเซ€ เชฐเซ€เชคเซ‡ เช•เซเชŸเชฟเชฒ เชนเชคเซเช‚, เช…เชจเซ‡ เชตเชงเซ เชตเซเชฏเชพเชชเช• เช•เชพเชฐเซเชฏเช•เซเชทเชฎเชคเชพ เชชเซเชฐเชฆเชพเชจ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชŸเซ‡เช•เชจเชฟเช•เชฒ เชธเชชเซ‹เชฐเซเชŸ เช‡เชšเซเช›เชคเซ‹ เชจ เชนเชคเซ‹ เช…เชฅเชตเชพ เชนเชพเชซ เชฐเชธเซเชคเซ‡ เชฎเชณเซ€ เชถเช•เซเชฏเซ‹ เชจ เชนเชคเซ‹. เชชเชฐเช‚เชคเซ เช† เชธเชฟเชธเซเชŸเชฎเซ‡ เชธเชฎเชฏเชพเช‚เชคเชฐเซ‡ เช†เชฐเซเช•เชพเช‡เชตเชจเซ‡ เช…เชจเชฒเซ‹เชก เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡เชจเซ€ เชฒเชฟเช‚เช•เชจเชพ เชฐเซ‚เชชเชฎเชพเช‚ เชฎเซ‡เช‡เชฒ เชฆเซเชตเชพเชฐเชพ เช—เซเชฎ เชฅเชฏเซ‡เชฒ เชกเซ‡เชŸเชพ เชชเซเชฐเชพเชชเซเชค เช•เชฐเชตเชพเชจเซ€ เชคเช• เชชเซ‚เชฐเซ€ เชชเชพเชกเซ€ เชนเชคเซ€.

เช เชจเซ‹เช‚เชงเชตเซเช‚ เชœเซ‹เช‡เช เช•เซ‡ เช† เชเช•เชฎเชพเชคเซเชฐ เช•เซ‡เชธ เชจเชฅเซ€ เชœเซ‡เชฎเชพเช‚ เชตเซเชฏเชตเชธเชพเชฏ เช‡เชฎเซ‡เชฒ เช…เชฅเชตเชพ เช‡เชจเซเชธเซเชŸเชจเซเชŸ เชฎเซ‡เชธเซ‡เชจเซเชœเชฐ เชชเชพเชธเซ‡เชฅเซ€ เชกเซ‡เชŸเชพ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเชตเชพ เชฎเชพเช‚เช—เชคเซ‹ เชนเชคเซ‹. เชœเซ‹ เช•เซ‡, เช† เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เช…เชฎเซ‡ เชซเช•เซเชค เช† เชฐเซ€เชคเซ‡ เชกเซ‡เชŸเชพเชจเซ‹ เชญเชพเช— เชชเซเชฐเชฆเชพเชจ เช•เชฐเชคเซ€ เชคเซƒเชคเซ€เชฏ-เชชเช•เซเชท เช•เช‚เชชเชจเซ€เชจเซ‡ เชชเซเชฐเชญเชพเชตเชฟเชค เช•เชฐเซ€ เชถเช•เซเชฏเชพ เชจเชฅเซ€.

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹

ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชŸเซ‡, เช…เชฎเซ‡ เชฎเซ‹เชŸเชพเชญเชพเช—เซ‡ Apache Airflow เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เช เช›เซ€เช. เชธเช‚เชฆเชฐเซเชญเชฎเชพเช‚ เช…เชจเซ‡ เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡ เชคเซ‡ เช•เซ‡เชตเซ€ เชฆเซ‡เช–เชพเชฏ เช›เซ‡ เชคเซ‡ เชตเชงเซ เชธเชพเชฐเซ€ เชฐเซ€เชคเซ‡ เชธเชฎเชœเชตเชพ เชฎเชพเชŸเซ‡ เช† เชŸเซ‡เช•เซเชจเซ‰เชฒเซ‰เชœเซ€เชฅเซ€ เช…เชœเชพเชฃ เชนเซ‹เชฏ เชคเซ‡เชตเชพ เชตเชพเชšเช• เชฎเชพเชŸเซ‡, เชนเซเช‚ เช•เซ‡เชŸเชฒเชพเช• เชชเซเชฐเชพเชฐเช‚เชญเชฟเช• เชฎเซเชฆเซเชฆเชพเช“เชจเซเช‚ เชตเชฐเซเชฃเชจ เช•เชฐเซ€เชถ.

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ เช เชเช• เชฎเชซเชค เชชเซเชฒเซ‡เชŸเชซเซ‹เชฐเซเชฎ เช›เซ‡ เชœเซ‡เชจเซ‹ เช‰เชชเชฏเซ‹เช— เชชเชพเชฏเชฅเซ‹เชจเชฎเชพเช‚ ETL (เชเช•เซเชธเซเชŸเซเชฐเซ‡เช•เซเชŸ-เชŸเซเชฐเชพเชจเซเชธเชซเซ‹เชฐเซเชฎ-เชฒเซ‹เชกเชฟเช‚เช—) เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เชฌเชจเชพเชตเชตเชพ, เชเช•เซเชเชฟเช•เซเชฏเซเชŸ เช•เชฐเชตเชพ เช…เชจเซ‡ เชฎเซ‹เชจเชฟเชŸเชฐ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชฅเชพเชฏ เช›เซ‡. เชเชฐเชซเซเชฒเซ‹เชฎเชพเช‚ เชฎเซเช–เซเชฏ เช–เซเชฏเชพเชฒ เช เชจเชฟเชฐเซเชฆเซ‡เชถเชฟเชค เชเชธเชพเชฏเช•เซเชฒเชฟเช• เช—เซเชฐเชพเชซ เช›เซ‡, เชœเซเชฏเชพเช‚ เช—เซเชฐเชพเชซเชจเชพ เชถเชฟเชฐเซ‹เชฌเชฟเช‚เชฆเซเช“ เชšเซ‹เช•เซเช•เชธ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เช›เซ‡, เช…เชจเซ‡ เช—เซเชฐเชพเชซเชจเซ€ เช•เชฟเชจเชพเชฐเซ€เช“ เชจเชฟเชฏเช‚เชคเซเชฐเชฃ เช…เชฅเชตเชพ เชฎเชพเชนเชฟเชคเซ€เชจเซ‹ เชชเซเชฐเชตเชพเชน เช›เซ‡. เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เชซเช•เซเชค เช•เซ‹เชˆเชชเชฃ เชชเชพเชฏเชฅเซ‹เชจ เชซเช‚เช•เซเชถเชจเชจเซ‡ เช•เซ‰เชฒ เช•เชฐเซ€ เชถเช•เซ‡ เช›เซ‡, เช…เชฅเชตเชพ เชคเซ‡ เชตเชฐเซเช—เชจเชพ เชธเช‚เชฆเชฐเซเชญเชฎเชพเช‚ เช•เซเชฐเชฎเชฟเช• เชฐเซ€เชคเซ‡ เช˜เชฃเชพ เชซเช‚เช•เซเชถเชจเชจเซ‡ เช•เซ‰เชฒ เช•เชฐเชตเชพเชฅเซ€ เชตเชงเซ เชœเชŸเชฟเชฒ เชคเชฐเซเช• เชนเซ‹เชˆ เชถเช•เซ‡ เช›เซ‡. เชธเซŒเชฅเซ€ เชตเชงเซ เชตเชพเชฐเช‚เชตเชพเชฐเชจเซ€ เช•เชพเชฎเช—เซ€เชฐเซ€ เชฎเชพเชŸเซ‡, เชคเซเชฏเชพเช‚ เชชเชนเซ‡เชฒเซ‡เชฅเซ€ เชœ เช˜เชฃเชพ เชคเซˆเชฏเชพเชฐ เชตเชฟเช•เชพเชธ เช›เซ‡ เชœเซ‡เชจเซ‹ เช‰เชชเชฏเซ‹เช— เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เชคเชฐเซ€เช•เซ‡ เชฅเชˆ เชถเช•เซ‡ เช›เซ‡. เช†เชตเชพ เชตเชฟเช•เชพเชธเชฎเชพเช‚ เชถเชพเชฎเซ‡เชฒ เช›เซ‡:

  • เช“เชชเชฐเซ‡เชŸเชฐเซ‹ - เชเช• เชœเช—เซเชฏเชพเชเชฅเซ€ เชฌเซ€เชœเชพ เชธเซเชฅเชพเชจเซ‡ เชกเซ‡เชŸเชพ เชŸเซเชฐเชพเชจเซเชธเชซเชฐ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชกเซ‡เชŸเชพเชฌเซ‡เช เชŸเซ‡เชฌเชฒเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชตเซ‡เชฐเชนเชพเช‰เชธเชฎเชพเช‚;
  • เชธเซ‡เชจเซเชธเชฐเซเชธ - เชšเซ‹เช•เซเช•เชธ เช˜เชŸเชจเชพเชจเซ€ เช˜เชŸเชจเชพเชจเซ€ เชฐเชพเชน เชœเซ‹เชตเชพ เชฎเชพเชŸเซ‡ เช…เชจเซ‡ เช—เซเชฐเชพเชซเชจเชพ เช…เชจเซเช—เชพเชฎเซ€ เชถเชฟเชฐเซ‹เชฌเชฟเช‚เชฆเซเช“ เชชเชฐ เชจเชฟเชฏเช‚เชคเซเชฐเชฃเชจเชพ เชชเซเชฐเชตเชพเชนเชจเซ‡ เชฆเชฟเชถเชพเชฎเชพเชจ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡;
  • เชนเซเช•เซเชธ - เชจเซ€เชšเชฒเชพ-เชธเซเชคเชฐเชจเซ€ เช•เชพเชฎเช—เซ€เชฐเซ€ เชฎเชพเชŸเซ‡, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชกเซ‡เชŸเชพเชฌเซ‡เช เช•เซ‹เชทเซเชŸเช•เชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพ เชฎเชพเชŸเซ‡ (เชตเชฟเชงเชพเชจเซ‹เชฎเชพเช‚ เชตเชชเชฐเชพเชฏ เช›เซ‡);
  • เช…เชจเซ‡ เชคเซ‡เชฅเซ€ เชชเชฐ

เช† เชฒเซ‡เช–เชฎเชพเช‚ เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹เชจเซเช‚ เชตเชฟเช—เชคเชตเชพเชฐ เชตเชฐเซเชฃเชจ เช•เชฐเชตเซเช‚ เช…เชฏเซ‹เช—เซเชฏ เชฐเชนเซ‡เชถเซ‡. เชธเช‚เช•เซเชทเชฟเชชเซเชค เชชเชฐเชฟเชšเชฏ เชœเซ‹เชˆ เชถเช•เชพเชฏ เช›เซ‡ เช…เชนเซ€เช‚ เช…เชฅเชตเชพ เช…เชนเซ€เช‚.

เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพ เชฎเชพเชŸเซ‡ เชนเซ‚เช•

เชธเซŒ เชชเซเชฐเชฅเชฎ, เชธเชฎเชธเซเชฏเชพ เชนเชฒ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡, เช†เชชเชฃเซ‡ เชเช• เชนเซ‚เช• เชฒเช–เชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡ เชœเซ‡เชจเซ€ เชธเชพเชฅเซ‡ เช†เชชเชฃเซ‡ เช•เชฐเซ€ เชถเช•เซ€เช:

  • เชˆเชฎเซ‡เชฒ เชธเชพเชฅเซ‡ เช•เชจเซ‡เช•เซเชŸ เช•เชฐเซ‹
  • เชฏเซ‹เช—เซเชฏ เชชเชคเซเชฐ เชถเซ‹เชงเซ‹
  • เชชเชคเซเชฐเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเซ‹.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ั ัะปะตะบั‚ั€ะพะฝะฝะพะน ะฟะพั‡ั‚ั‹

           :param imap_conn_id:       ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะพะดะบะปัŽั‡ะตะฝะธั ะบ ะฟะพั‡ั‚ะต
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ะŸะพะดะบะปัŽั‡ะฐะตะผัั ะบ ะฟะพั‡ั‚ะต
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ะœะตั‚ะพะด ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะธะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ะฐ ะฟะพัะปะตะดะฝะตะณะพ ะฟะธััŒะผะฐ, 
            ัƒะดะพะฒะปะตั‚ะฒะพั€ะฐััŽั‰ะตะณะพ ัƒัะปะพะฒะธัะผ ะฟะพะธัะบะฐ

            :param check_seen:      ะžั‚ะผะตั‡ะฐั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟะธััŒะผะพ ะบะฐะบ ะฟั€ะพั‡ะธั‚ะฐะฝะฝะพะต
            :type check_seen:       bool
            :param box:             ะะฐะธะผะตะฝะพะฒะฐะฝะธั ัั‰ะธะบะฐ
            :type box:              string
            :param condition:       ะฃัะปะพะฒะธั ะฟะพะธัะบะฐ ะฟะธัะตะผ
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("ะ’ ัั‰ะธะบะต ะฝะฐะนะดะตะฝั‹ ัะปะตะดัƒัŽั‰ะธะต ะฟะธััŒะผะฐ: " + str(mail_ids))

        if not mail_ids:
            logging.info("ะะต ะฝะฐะนะดะตะฝะพ ะฝะพะฒั‹ั… ะฟะธัะตะผ")
            return None

        mail_id = mail_ids[0]

        # ะตัะปะธ ั‚ะฐะบะธั… ะฟะธัะตะผ ะฝะตัะบะพะปัŒะบะพ
        if len(mail_ids) > 1:
            # ะพั‚ะผะตั‡ะฐะตะผ ะพัั‚ะฐะปัŒะฝั‹ะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผะธ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # ะฒะพะทะฒั€ะฐั‰ะฐะตะผ ะฟะพัะปะตะดะฝะตะต
            mail_id = mail_ids[-1]

        # ะฝัƒะถะฝะพ ะปะธ ะพั‚ะผะตั‚ะธั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

เชคเชฐเซเช• เช† เช›เซ‡: เช…เชฎเซ‡ เช•เชจเซ‡เช•เซเชŸ เช•เชฐเซ€เช เช›เซ€เช, เช›เซ‡เชฒเซเชฒเซ‹ เชธเซŒเชฅเซ€ เชธเซเชธเช‚เช—เชค เช…เช•เซเชทเชฐ เชถเซ‹เชงเซ€เช เช›เซ€เช, เชœเซ‹ เชคเซเชฏเชพเช‚ เช…เชจเซเชฏ เชนเซ‹เชฏ, เชคเซ‹ เช…เชฎเซ‡ เชคเซ‡เชจเซ‡ เช…เชตเช—เชฃเซ€เช เช›เซ€เช. เช† เชซเช‚เช•เซเชถเชจเชจเซ‹ เช‰เชชเชฏเซ‹เช— เชฅเชพเชฏ เช›เซ‡, เช•เชพเชฐเชฃ เช•เซ‡ เชชเช›เซ€เชจเชพ เช…เช•เซเชทเชฐเซ‹เชฎเชพเช‚ เชชเชนเซ‡เชฒเชพเชจเชพ เชฌเชงเชพ เชกเซ‡เชŸเชพ เชนเซ‹เชฏ เช›เซ‡. เชœเซ‹ เช† เช•เชฟเชธเซเชธเซ‹ เชจ เชนเซ‹เชฏ เชคเซ‹, เชชเช›เซ€ เชคเชฎเซ‡ เชฌเชงเชพ เช…เช•เซเชทเชฐเซ‹เชจเซ€ เชเชฐเซ‡ เชชเชฐเชค เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹, เช…เชฅเชตเชพ เชชเซเชฐเชฅเชฎ เชเช• เชชเชฐ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹, เช…เชจเซ‡ เชฌเชพเช•เซ€เชจเชพ เช†เช—เชณเชจเชพ เชชเชพเชธ เชชเชฐ. เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡, เชฌเชงเซเช‚, เชนเช‚เชฎเซ‡เชถเชจเซ€ เชœเซ‡เชฎ, เช•เชพเชฐเซเชฏ เชชเชฐ เช†เชงเชพเชฐ เชฐเชพเช–เซ‡ เช›เซ‡.

เช…เชฎเซ‡ เชนเซ‚เช•เชฎเชพเช‚ เชฌเซ‡ เชธเชนเชพเชฏเช• เช•เชพเชฐเซเชฏเซ‹ เช‰เชฎเซ‡เชฐเซ€เช เช›เซ€เช: เชซเชพเช‡เชฒ เชกเชพเช‰เชจเชฒเซ‹เชก เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช…เชจเซ‡ เช‡เชฎเซ‡เช‡เชฒเชฎเชพเช‚เชฅเซ€ เชฒเชฟเช‚เช•เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชซเชพเช‡เชฒ เชกเชพเช‰เชจเชฒเซ‹เชก เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡. เชฎเชพเชฐเซเช— เชฆเซเชตเชพเชฐเชพ, เชคเซ‡เช“ เช“เชชเชฐเซ‡เชŸเชฐเชฎเชพเช‚ เช–เชธเซ‡เชกเซ€ เชถเช•เชพเชฏ เช›เซ‡, เชคเซ‡ เช† เช•เชพเชฐเซเชฏเช•เซเชทเชฎเชคเชพเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเชตเชพเชจเซ€ เช†เชตเชฐเซเชคเชจ เชชเชฐ เช†เชงเชพเชฐเชฟเชค เช›เซ‡. เชนเซ‚เช•เชฎเชพเช‚ เชฌเซ€เชœเซเช‚ เชถเซเช‚ เช‰เชฎเซ‡เชฐเชตเซเช‚, เชซเชฐเซ€เชฅเซ€, เช•เชพเชฐเซเชฏ เชชเชฐ เช†เชงเชพเชฐ เชฐเชพเช–เซ‡ เช›เซ‡: เชœเซ‹ เชชเชคเซเชฐเชฎเชพเช‚ เชซเชพเช‡เชฒเซ‹ เชคเชฐเชค เชœ เชชเซเชฐเชพเชชเซเชค เชฅเชพเชฏ, เชคเซ‹ เชชเช›เซ€ เชคเชฎเซ‡ เชชเชคเซเชฐเชฎเชพเช‚ เชœเซ‹เชกเชพเชฃเซ‹ เชกเชพเช‰เชจเชฒเซ‹เชก เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹, เชœเซ‹ เชชเชคเซเชฐเชฎเชพเช‚ เชกเซ‡เชŸเชพ เชชเซเชฐเชพเชชเซเชค เชฅเชพเชฏ เช›เซ‡, เชคเซ‹ เชคเชฎเชพเชฐเซ‡ เชชเชคเซเชฐเชจเซเช‚ เชตเชฟเชถเซเชฒเซ‡เชทเชฃ เช•เชฐเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡, เชตเช—เซ‡เชฐเซ‡ เชฎเชพเชฐเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชชเชคเซเชฐ เช†เชฐเซเช•เชพเช‡เชตเชจเซ€ เชเช• เชฒเชฟเช‚เช• เชธเชพเชฅเซ‡ เช†เชตเซ‡ เช›เซ‡, เชœเซ‡ เชฎเชพเชฐเซ‡ เชšเซ‹เช•เซเช•เชธ เชœเช—เซเชฏเชพเช เชฎเซ‚เช•เชตเชพเชจเซ€ เช…เชจเซ‡ เช†เช—เชณเชจเซ€ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเชจเซ€ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เชถเชฐเซ‚ เช•เชฐเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ

            :param url:              ะะดั€ะตั ะทะฐะณั€ัƒะทะบะธ
            :type url:               string
            :param path:             ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:              string
            :param chunk_size:       ะŸะพ ัะบะพะปัŒะบะพ ะฑะฐะนั‚ะพะฒ ะฟะธัะฐั‚ัŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ ะฟะพ ััั‹ะปะบะต ะธะท ะฟะธััŒะผะฐ

            :param mail_id:         ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะธััŒะผะฐ
            :type mail_id:          string
            :param path:            ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

เช•เซ‹เชก เชธเชฐเชณ เช›เซ‡, เชคเซ‡เชฅเซ€ เชคเซ‡เชจเซ‡ เชญเชพเช—เซเชฏเซ‡ เชœ เชตเชงเซ เชธเชฎเชœเซ‚เชคเซ€เชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡. เชนเซเช‚ เชคเชฎเชจเซ‡ เชฎเชพเชคเซเชฐ เชœเชพเชฆเซเชˆ เชฐเซ‡เช–เชพ imap_conn_id เชตเชฟเชถเซ‡ เช•เชนเซ€เชถ. เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ เช•เชจเซ‡เช•เซเชถเชจ เชชเชฐเชฟเชฎเชพเชฃเซ‹ (เชฒเซ‹เช—เชฟเชจ, เชชเชพเชธเชตเชฐเซเชก, เชธเชฐเชจเชพเชฎเซเช‚ เช…เชจเซ‡ เช…เชจเซเชฏ เชชเชฐเชฟเชฎเชพเชฃเซ‹) เชธเซเชŸเซ‹เชฐ เช•เชฐเซ‡ เช›เซ‡ เชœเซ‡ เชธเซเชŸเซเชฐเชฟเช‚เช— เช“เชณเช–เช•เชฐเซเชคเชพ เชฆเซเชตเชพเชฐเชพ เชเช•เซเชธเซ‡เชธ เช•เชฐเซ€ เชถเช•เชพเชฏ เช›เซ‡. เชฆเซƒเชทเซเชŸเชฟเชจเซ€ เชฐเซ€เชคเซ‡, เช•เชจเซ‡เช•เซเชถเชจ เชฎเซ‡เชจเซ‡เชœเชฎเซ‡เชจเซเชŸ เช†เชจเชพ เชœเซ‡เชตเซเช‚ เชฒเชพเช—เซ‡ เช›เซ‡

เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹เชฎเชพเช‚ เชˆเชฎเซ‡เชฒเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฎเซ‡เชณเชตเชตเชพ เชฎเชพเชŸเซ‡ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ

เชกเซ‡เชŸเชพเชจเซ€ เชฐเชพเชน เชœเซ‹เชตเชพ เชฎเชพเชŸเซ‡ เชธเซ‡เชจเซเชธเชฐ

เช…เชฎเซ‡ เชชเชนเซ‡เชฒเซ‡เชฅเซ€ เชœ เชœเชพเชฃเซ€เช เช›เซ€เช เช•เซ‡ เชฎเซ‡เช‡เชฒเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เช•เซ‡เชตเซ€ เชฐเซ€เชคเซ‡ เช•เชจเซ‡เช•เซเชŸ เช•เชฐเชตเซ‹ เช…เชจเซ‡ เชชเซเชฐเชพเชชเซเชค เช•เชฐเชตเซ‹, เช…เชฎเซ‡ เชนเชตเซ‡ เชคเซ‡เชฎเชจเซ€ เชฐเชพเชน เชœเซ‹เชตเชพ เชฎเชพเชŸเซ‡ เชธเซ‡เชจเซเชธเชฐ เชฒเช–เซ€ เชถเช•เซ€เช เช›เซ€เช. เชฎเชพเชฐเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชคเซ‡ เชคเชฐเชค เชœ เช“เชชเชฐเซ‡เชŸเชฐ เชฒเช–เชตเชพเชจเซเช‚ เช•เชพเชฎ เช•เชฐเชคเซเช‚ เชจเชฅเซ€ เชœเซ‡ เชกเซ‡เชŸเชพ เชชเชฐ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชถเซ‡, เชœเซ‹ เช•เซ‹เชˆ เชนเซ‹เชฏ, เช•เชพเชฐเชฃ เช•เซ‡ เช…เชจเซเชฏ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเช“ เชฎเซ‡เช‡เชฒเชฎเชพเช‚เชฅเซ€ เชชเซเชฐเชพเชชเซเชค เชกเซ‡เชŸเชพเชจเชพ เช†เชงเชพเชฐเซ‡ เช•เชพเชฐเซเชฏ เช•เชฐเซ‡ เช›เซ‡, เชœเซ‡เชฎเชพเช‚ เช…เชจเซเชฏ เชธเซเชฐเซ‹เชคเซ‹ (API, เชŸเซ‡เชฒเชฟเชซเซ‹เชจเซ€) เชฎเชพเช‚เชฅเซ€ เชธเช‚เชฌเช‚เชงเชฟเชค เชกเซ‡เชŸเชพ เชฒเซ‡ เช›เซ‡. , เชตเซ‡เชฌ เชฎเซ‡เชŸเซเชฐเชฟเช•เซเชธ, เชตเช—เซ‡เชฐเซ‡.) เชตเช—เซ‡เชฐเซ‡). เชนเซเช‚ เชคเชฎเชจเซ‡ เชเช• เช‰เชฆเชพเชนเชฐเชฃ เช†เชชเซ€เชถ. CRM เชธเชฟเชธเซเชŸเชฎเชฎเชพเช‚ เชเช• เชจเชตเซ‹ เชตเชชเชฐเชพเชถเช•เชฐเซเชคเชพ เชฆเซ‡เช–เชพเชฏเซ‹ เช›เซ‡, เช…เชจเซ‡ เช…เชฎเซ‡ เชนเชœเซ เชชเชฃ เชคเซ‡เชจเชพ UUID เชตเชฟเชถเซ‡ เชœเชพเชฃเชคเชพ เชจเชฅเซ€. เชชเช›เซ€, เชœเซเชฏเชพเชฐเซ‡ SIP เชŸเซ‡เชฒเชฟเชซเซ‹เชจเซ€เชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชชเซเชฐเชพเชชเซเชค เช•เชฐเชตเชพเชจเซ‹ เชชเซเชฐเชฏเชพเชธ เช•เชฐเซ€เช เช›เซ€เช, เชคเซเชฏเชพเชฐเซ‡ เช…เชฎเซ‡ เชคเซ‡เชจเชพ UUID เชธเชพเชฅเซ‡ เชœเซ‹เชกเชพเชฏเซ‡เชฒเชพ เช•เซ‰เชฒเซเชธ เชชเซเชฐเชพเชชเซเชค เช•เชฐเซ€เชถเซเช‚, เชชเชฐเช‚เชคเซ เช…เชฎเซ‡ เชคเซ‡เชจเซ‡ เชธเชพเชšเชตเซ€ เช…เชจเซ‡ เชคเซ‡เชจเซ‹ เชฏเซ‹เช—เซเชฏ เชฐเซ€เชคเซ‡ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€ เชถเช•เซ€เชถเซเช‚ เชจเชนเซ€เช‚. เช†เชตเซ€ เชฌเชพเชฌเชคเซ‹เชฎเชพเช‚, เชกเซ‡เชŸเชพเชจเซ€ เช…เชตเชฒเช‚เชฌเชจเชจเซ‡ เชงเซเชฏเชพเชจเชฎเชพเช‚ เชฐเชพเช–เชตเซ€ เชœเชฐเซ‚เชฐเซ€ เช›เซ‡, เช–เชพเชธ เช•เชฐเซ€เชจเซ‡ เชœเซ‹ เชคเซ‡ เชตเชฟเชตเชฟเชง เชธเซเชฐเซ‹เชคเซ‹เชฎเชพเช‚เชฅเซ€ เชนเซ‹เชฏ. เช†, เช…เชฒเชฌเชคเซเชค, เชกเซ‡เชŸเชพเชจเซ€ เช…เช–เช‚เชกเชฟเชคเชคเชพ เชœเชพเชณเชตเชตเชพ เชฎเชพเชŸเซ‡ เช…เชชเซ‚เชฐเชคเชพ เชชเช—เชฒเชพเช‚ เช›เซ‡, เชชเชฐเช‚เชคเซ เช•เซ‡เชŸเชฒเชพเช• เช•เชฟเชธเซเชธเชพเช“เชฎเชพเช‚ เชคเซ‡ เชœเชฐเซ‚เชฐเซ€ เช›เซ‡. เชนเชพ, เช…เชจเซ‡ เชธเช‚เชธเชพเชงเชจเซ‹เชจเซ‡ เช•เชฌเชœเซ‡ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชจเชฟเชทเซเช•เซเชฐเชฟเชฏเชคเชพ เชชเชฃ เช…เชคเชพเชฐเซเช•เชฟเช• เช›เซ‡.

เช†เชฎ, เชœเซ‹ เชฎเซ‡เชฒเชฎเชพเช‚ เชจเชตเซ€ เชฎเชพเชนเชฟเชคเซ€ เชนเชถเซ‡ เชคเซ‹ เช…เชฎเชพเชฐเซเช‚ เชธเซ‡เชจเซเชธเชฐ เช—เซเชฐเชพเชซเชจเชพ เช…เชจเซเช—เชพเชฎเซ€ เชถเชฟเชฐเซ‹เชฌเชฟเช‚เชฆเซเช“ เชถเชฐเซ‚ เช•เชฐเชถเซ‡, เช…เชจเซ‡ เช…เช—เชพเช‰เชจเซ€ เชฎเชพเชนเชฟเชคเซ€เชจเซ‡ เช…เชชเซเชฐเชธเซเชคเซเชค เชคเชฐเซ€เช•เซ‡ เชชเชฃ เชšเชฟเชนเซเชจเชฟเชค เช•เชฐเชถเซ‡.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

เช…เชฎเซ‡ เชกเซ‡เชŸเชพ เชชเซเชฐเชพเชชเซเชค เช•เชฐเซ€เช เช›เซ€เช เช…เชจเซ‡ เชคเซ‡เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เช เช›เซ€เช

เชกเซ‡เชŸเชพ เชชเซเชฐเชพเชชเซเชค เช•เชฐเชตเชพ เช…เชจเซ‡ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡, เชคเชฎเซ‡ เชเช• เช…เชฒเช— เช“เชชเชฐเซ‡เชŸเชฐ เชฒเช–เซ€ เชถเช•เซ‹ เช›เซ‹, เชคเชฎเซ‡ เชคเซˆเชฏเชพเชฐ เช•เชฐเซ‡เชฒเชพเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€ เชถเช•เซ‹ เช›เซ‹. เช•เชพเชฐเชฃ เช•เซ‡ เชคเชฐเซเช• เชนเชœเซ เชชเชฃ เชคเซเชšเซเช› เช›เซ‡ - เชชเชคเซเชฐเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชฒเซ‡เชตเชพ เชฎเชพเชŸเซ‡, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชนเซเช‚ เชฎเชพเชจเช• PythonOperator เชธเซ‚เชšเชตเซ‡ เช›เซ‡

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# ะกั‚ะฐะฝะดะฐั€ั‚ะฝะพะต ะบะพะฝั„ะธะณัƒั€ะธั€ะพะฒะฐะฝะธะต ะณั€ะฐั„ะฐ
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ะžะฟั€ะตะดะตะปัะตะผ ัะตะฝัะพั€
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# ะคัƒะฝะบั†ะธั ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ะธะท ะฟะธััŒะผะฐ
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฒะตั€ัˆะธะฝ ะณั€ะฐั„ะฐ
...

# ะ—ะฐะดะฐะตะผ ัะฒัะทัŒ ะฝะฐ ะณั€ะฐั„ะต
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฟะพั‚ะพะบะพะฒ ัƒะฟั€ะฐะฒะปะตะฝะธั

เชฎเชพเชฐเซเช— เชฆเซเชตเชพเชฐเชพ, เชœเซ‹ เชคเชฎเชพเชฐเซ€ เช•เซ‹เชฐเซเชชเซ‹เชฐเซ‡เชŸ เชฎเซ‡เช‡เชฒ เชชเชฃ mail.ru เชชเชฐ เช›เซ‡, เชคเซ‹ เชชเช›เซ€ เชคเชฎเซ‡ เชตเชฟเชทเชฏ, เชชเซเชฐเซ‡เชทเช•, เชตเช—เซ‡เชฐเซ‡ เชฆเซเชตเชพเชฐเชพ เชชเชคเซเชฐเซ‹ เชถเซ‹เชงเชตเชพ เชฎเชพเชŸเซ‡ เชธเชฎเชฐเซเชฅ เชนเชถเซ‹ เชจเชนเซ€เช‚. เชชเชพเช›เชพ 2016 เชฎเชพเช‚, เชคเซ‡เช“เช เชคเซ‡เชจเซ‡ เชฐเชœเซ‚ เช•เชฐเชตเชพเชจเซเช‚ เชตเชšเชจ เช†เชชเซเชฏเซเช‚ เชนเชคเซเช‚, เชชเชฐเช‚เชคเซ เชฆเซ‡เช–เซ€เชคเซ€ เชฐเซ€เชคเซ‡ เชคเซ‡เชฎเชจเซ‹ เชตเชฟเชšเชพเชฐ เชฌเชฆเชฒเชพเชˆ เช—เชฏเซ‹ เชนเชคเซ‹. เชฎเซ‡เช‚ เชœเชฐเซ‚เชฐเซ€ เช…เช•เซเชทเชฐเซ‹ เชฎเชพเชŸเซ‡ เช…เชฒเช— เชซเซ‹เชฒเซเชกเชฐ เชฌเชจเชพเชตเซ€เชจเซ‡ เช…เชจเซ‡ เชฎเซ‡เชˆเชฒ เชตเซ‡เชฌ เชˆเชจเซเชŸเชฐเชซเซ‡เชธเชฎเชพเช‚ เชœเชฐเซ‚เชฐเซ€ เช…เช•เซเชทเชฐเซ‹ เชฎเชพเชŸเซ‡ เชซเชฟเชฒเซเชŸเชฐ เชธเซ‡เชŸ เช•เชฐเซ€เชจเซ‡ เช† เชธเชฎเชธเซเชฏเชพ เชนเชฒ เช•เชฐเซ€ เช›เซ‡. เช†เชฎ, เชถเซ‹เชง เชฎเชพเชŸเซ‡ เชซเช•เซเชค เชœเชฐเซ‚เชฐเซ€ เช…เช•เซเชทเชฐเซ‹ เช…เชจเซ‡ เชถเชฐเชคเซ‹, เชฎเชพเชฐเชพ เช•เชฟเชธเซเชธเชพเชฎเชพเช‚, เชซเช•เซเชค (เช…เชฆเซเชฐเชถเซเชฏ) เช† เชซเซ‹เชฒเซเชกเชฐเชฎเชพเช‚ เชชเซเชฐเชตเซ‡เชถ เช•เชฐเซ‹.

เชธเชพเชฐเชพเช‚เชถ เช†เชชเชคเชพ, เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชจเซ€เชšเซ‡เชจเซ‹ เช•เซเชฐเชฎ เช›เซ‡: เช…เชฎเซ‡ เชคเชชเชพเชธ เช•เชฐเซ€เช เช›เซ€เช เช•เซ‡ เชคเซเชฏเชพเช‚ เชจเชตเชพ เช…เช•เซเชทเชฐเซ‹ เช›เซ‡ เช•เซ‡ เชœเซ‡ เชถเชฐเชคเซ‹เชจเซ‡ เชชเซ‚เชฐเซเชฃ เช•เชฐเซ‡ เช›เซ‡, เชœเซ‹ เชคเซเชฏเชพเช‚ เชนเซ‹เชฏ, เชคเซ‹ เช…เชฎเซ‡ เช›เซ‡เชฒเซเชฒเชพ เช…เช•เซเชทเชฐเชจเซ€ เชฒเชฟเช‚เช•เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เช†เชฐเซเช•เชพเช‡เชต เชกเชพเช‰เชจเชฒเซ‹เชก เช•เชฐเซ€เช เช›เซ€เช.
เช›เซ‡เชฒเซเชฒเชพ เชฌเชฟเช‚เชฆเซเช“ เชนเซ‡เช เชณ, เชคเซ‡ เช…เชตเช—เชฃเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡ เช•เซ‡ เช† เช†เชฐเซเช•เชพเช‡เชตเชจเซ‡ เช…เชจเชชเซ‡เช• เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡, เช†เชฐเซเช•เชพเช‡เชตเชฎเชพเช‚เชฅเซ€ เชกเซ‡เชŸเชพ เชธเชพเชซ เช…เชจเซ‡ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเชถเซ‡, เช…เชจเซ‡ เชชเชฐเชฟเชฃเชพเชฎเซ‡, เช†เช–เซ€ เชตเชธเซเชคเซ ETL เชชเซเชฐเช•เซเชฐเชฟเชฏเชพเชจเซ€ เชชเชพเช‡เชชเชฒเชพเช‡เชจเชฎเชพเช‚ เช†เช—เชณ เชตเชงเชถเซ‡, เชชเชฐเช‚เชคเซ เช† เชชเชนเซ‡เชฒเชพเชฅเซ€ เชœ เช†เช—เชณ เช›เซ‡. เชฒเซ‡เช–เชจเซ‹ เช…เชตเช•เชพเชถ. เชœเซ‹ เชคเซ‡ เชฐเชธเชชเซเชฐเชฆ เช…เชจเซ‡ เช‰เชชเชฏเซ‹เช—เซ€ เชฌเชจเซเชฏเซเช‚, เชคเซ‹ เชนเซเช‚ เชฐเชพเชœเซ€เช–เซเชถเซ€เชฅเซ€ เช…เชชเชพเชšเซ‡ เชเชฐเชซเซเชฒเซ‹ เชฎเชพเชŸเซ‡ ETL เชธเซ‹เชฒเซเชฏเซเชถเชจเซเชธ เช…เชจเซ‡ เชคเซ‡เชฎเชจเชพ เชญเชพเช—เซ‹เชจเซเช‚ เชตเชฐเซเชฃเชจ เช•เชฐเชตเชพเชจเซเช‚ เชšเชพเชฒเซ เชฐเชพเช–เซ€เชถ.

เชธเซ‹เชฐเซเชธ: www.habr.com

เชเช• เชŸเชฟเชชเซเชชเชฃเซ€ เช‰เชฎเซ‡เชฐเซ‹