ืชื”ืœื™ืš ETL ืœืงื‘ืœืช ื ืชื•ื ื™ื ืžืื™ืžื™ื™ืœ ื‘- Apache Airflow

ืชื”ืœื™ืš ETL ืœืงื‘ืœืช ื ืชื•ื ื™ื ืžืื™ืžื™ื™ืœ ื‘- Apache Airflow

ืœื ืžืฉื ื” ื›ืžื” ื”ื˜ื›ื ื•ืœื•ื’ื™ื” ืžืชืคืชื—ืช, ืฉื•ืจื” ืฉืœ ื’ื™ืฉื•ืช ืžื™ื•ืฉื ื•ืช ืชืžื™ื“ ื ื’ืจืจืช ืื—ืจื™ ื”ืคื™ืชื•ื—. ื–ื” ืขืฉื•ื™ ืœื ื‘ื•ืข ืžืžืขื‘ืจ ื—ืœืง, ื’ื•ืจืžื™ื ืื ื•ืฉื™ื™ื, ืฆืจื›ื™ื ื˜ื›ื ื•ืœื•ื’ื™ื™ื ืื• ืžืฉื”ื• ืื—ืจ. ื‘ืชื—ื•ื ืขื™ื‘ื•ื“ ื”ื ืชื•ื ื™ื, ืžืงื•ืจื•ืช ื”ื ืชื•ื ื™ื ื”ื ื”ื—ื•ืฉืคื ื™ื™ื ื‘ื™ื•ืชืจ ื‘ื—ืœืง ื–ื”. ืœื ืžืฉื ื” ื›ืžื” ืื ื—ื ื• ื—ื•ืœืžื™ื ืœื”ื™ืคื˜ืจ ืžื–ื”, ืื‘ืœ ืขื“ ื›ื” ื—ืœืง ืžื”ื ืชื•ื ื™ื ื ืฉืœื—ื™ื ื‘ื”ื•ื“ืขื•ืช ืžื™ื™ื“ื™ื•ืช ื•ื‘ืžื™ื™ืœื™ื, ืฉืœื ืœื“ื‘ืจ ืขืœ ืคื•ืจืžื˜ื™ื ืืจื›ืื™ื™ื ื™ื•ืชืจ. ืื ื™ ืžื–ืžื™ืŸ ืื•ืชืš ืœืคืจืง ืืช ืื—ืช ื”ืืคืฉืจื•ื™ื•ืช ืขื‘ื•ืจ Apache Airflow, ื•ืžืžื—ื™ืฉ ื›ื™ืฆื“ ื ื™ืชืŸ ืœืงื—ืช ื ืชื•ื ื™ื ืžืžื™ื™ืœื™ื.

ืคืจื”ื™ืกื˜ื•ืจื™ื”

ืžื™ื“ืข ืจื‘ ืขื“ื™ื™ืŸ ืžื•ืขื‘ืจ ื‘ืืžืฆืขื•ืช ื“ื•ืืจ ืืœืงื˜ืจื•ื ื™, ืžืชืงืฉื•ืจืช ื‘ื™ืŸ ืื™ืฉื™ืช ื•ืขื“ ืœืกื˜ื ื“ืจื˜ื™ื ืฉืœ ืื™ื ื˜ืจืืงืฆื™ื” ื‘ื™ืŸ ื—ื‘ืจื•ืช. ื–ื” ื˜ื•ื‘ ืื ืืคืฉืจ ืœื›ืชื•ื‘ ืžืžืฉืง ืœื”ืฉื’ืช ื ืชื•ื ื™ื ืื• ืœื”ื›ื ื™ืก ืื ืฉื™ื ืœืžืฉืจื“ ืฉื™ื›ื ื™ืกื• ืืช ื”ืžื™ื“ืข ื”ื–ื” ืœืžืงื•ืจื•ืช ื ื•ื—ื™ื ื™ื•ืชืจ, ืื‘ืœ ืœืขืชื™ื ืงืจื•ื‘ื•ืช ื–ื” ืคืฉื•ื˜ ืœื ืืคืฉืจื™. ื”ืžืฉื™ืžื” ื”ืกืคืฆื™ืคื™ืช ืฉืขืžื“ืชื™ ื‘ืคื ื™ ื”ื™ื™ืชื” ื—ื™ื‘ื•ืจ ืžืขืจื›ืช ื”-CRM ื”ื™ื“ื•ืขื” ืœืฉืžืฆื” ืœืžื—ืกืŸ ื”ื ืชื•ื ื™ื, ื•ืœืื—ืจ ืžื›ืŸ ืœืžืขืจื›ืช OLAP. ื›ืš ืงืจื” ื‘ืื•ืคืŸ ื”ื™ืกื˜ื•ืจื™ ืฉืขื‘ื•ืจ ื”ื—ื‘ืจื” ืฉืœื ื• ื”ืฉื™ืžื•ืฉ ื‘ืžืขืจื›ืช ื–ื• ื”ื™ื” ื ื•ื— ื‘ืชื—ื•ื ืžืกื•ื™ื ืฉืœ ืขืกืงื™ื. ืœื›ืŸ, ื›ื•ืœื ื‘ืืžืช ืจืฆื• ืœื”ื™ื•ืช ืžืกื•ื’ืœื™ื ืœืคืขื•ืœ ื’ื ืขื ื ืชื•ื ื™ื ืžืžืขืจื›ืช ืฆื“ ืฉืœื™ืฉื™ ื–ื•. ืงื•ื“ื ื›ืœ, ื›ืžื•ื‘ืŸ, ื ื‘ื“ืงื” ื”ืืคืฉืจื•ืช ืœืงื‘ืœ ื ืชื•ื ื™ื ืž-API ืคืชื•ื—. ืœืจื•ืข ื”ืžื–ืœ, ื”-API ืœื ื›ื™ืกื” ืืช ืงื‘ืœืช ื›ืœ ื”ื ืชื•ื ื™ื ื”ื“ืจื•ืฉื™ื, ื•ื‘ืžื•ื ื—ื™ื ืคืฉื•ื˜ื™ื, ื”ื•ื ื”ื™ื” ืขืงื•ื ื‘ืžื•ื‘ื ื™ื ืจื‘ื™ื, ื•ื”ืชืžื™ื›ื” ื”ื˜ื›ื ื™ืช ืœื ืจืฆืชื” ืื• ืœื ื™ื›ืœื” ืœื”ื™ืคื’ืฉ ื‘ืืžืฆืข ื”ื“ืจืš ื›ื“ื™ ืœืกืคืง ืคื•ื ืงืฆื™ื•ื ืœื™ื•ืช ืžืงื™ืคื” ื™ื•ืชืจ. ืื‘ืœ ืžืขืจื›ืช ื–ื• ืกื™ืคืงื” ืืช ื”ื”ื–ื“ืžื ื•ืช ืœืงื‘ืœ ืžืขืช ืœืขืช ืืช ื”ื ืชื•ื ื™ื ื”ื—ืกืจื™ื ื‘ื“ื•ืืจ ื‘ืฆื•ืจื” ืฉืœ ืงื™ืฉื•ืจ ืœืคืจื™ืงืช ื”ืืจื›ื™ื•ืŸ.

ื™ืฉ ืœืฆื™ื™ืŸ ืฉื–ื” ืœื ื”ื™ื” ื”ืžืงืจื” ื”ื™ื—ื™ื“ ื‘ื• ืจืฆื” ื”ืขืกืง ืœืืกื•ืฃ ื ืชื•ื ื™ื ืžืžื™ื™ืœื™ื ืื• ืžืกืจื™ื ืžื™ื“ื™ื™ื. ืขื ื–ืืช, ื‘ืžืงืจื” ื–ื”, ืœื ื™ื›ื•ืœื ื• ืœื”ืฉืคื™ืข ืขืœ ื—ื‘ืจืช ืฆื“ ืฉืœื™ืฉื™ ื”ืžืกืคืงืช ื—ืœืง ืžื”ื ืชื•ื ื™ื ืจืง ื‘ื“ืจืš ื–ื•.

ื–ืจื™ืžืช ืื•ื•ื™ืจ ืฉืœ ืืคืืฆ'ื™

ื›ื“ื™ ืœื‘ื ื•ืช ืชื”ืœื™ื›ื™ ETL, ืื ื• ืžืฉืชืžืฉื™ื ืœืจื•ื‘ ื‘- Apache Airflow. ื›ื“ื™ ืฉืงื•ืจื ืฉืœื ืžื›ื™ืจ ืืช ื”ื˜ื›ื ื•ืœื•ื’ื™ื” ื”ื–ื• ื™ื‘ื™ืŸ ื˜ื•ื‘ ื™ื•ืชืจ ืื™ืš ื”ื™ื ื ืจืื™ืช ื‘ื”ืงืฉืจ ื•ื‘ื›ืœืœ, ืืชืืจ ื›ืžื” ืžื‘ื•ื.

Apache Airflow ื”ื™ื ืคืœื˜ืคื•ืจืžื” ื—ื™ื ืžื™ืช ื”ืžืฉืžืฉืช ืœื‘ื ื™ื™ื”, ื‘ื™ืฆื•ืข ื•ื ื™ื˜ื•ืจ ืชื”ืœื™ื›ื™ ETL (Extract-Transform-Loading) ื‘-Python. ื”ืจืขื™ื•ืŸ ื”ืžืจื›ื–ื™ ื‘- Airflow ื”ื•ื ื’ืจืฃ ื-ืฆื™ืงืœื™ ืžื›ื•ื•ืŸ, ื›ืืฉืจ ืงื•ื“ืงื•ื“ื™ ื”ื’ืจืฃ ื”ื ืชื”ืœื™ื›ื™ื ืกืคืฆื™ืคื™ื™ื, ื•ืงืฆื•ื•ืช ื”ื’ืจืฃ ื”ื ื–ืจื™ืžืช ื”ืฉืœื™ื˜ื” ืื• ื”ืžื™ื“ืข. ืชื”ืœื™ืš ื™ื›ื•ืœ ืคืฉื•ื˜ ืœืงืจื•ื ืœื›ืœ ืคื•ื ืงืฆื™ื” ืฉืœ Python, ืื• ืฉื™ื›ื•ืœ ืœื”ื™ื•ืช ืœื• ื”ื™ื’ื™ื•ืŸ ืžื•ืจื›ื‘ ื™ื•ืชืจ ืžืงืจื™ืืช ืžืกืคืจ ืคื•ื ืงืฆื™ื•ืช ื‘ืจืฆืฃ ื‘ื”ืงืฉืจ ืฉืœ ืžื—ืœืงื”. ืขื‘ื•ืจ ื”ืคืขื•ืœื•ืช ื”ืฉื›ื™ื—ื•ืช ื‘ื™ื•ืชืจ, ื™ืฉ ื›ื‘ืจ ื”ืจื‘ื” ืคื™ืชื•ื—ื™ื ืžื•ื›ื ื™ื ืฉื™ื›ื•ืœื™ื ืœืฉืžืฉ ื›ืชื”ืœื™ื›ื™ื. ืคื™ืชื•ื—ื™ื ื›ืืœื” ื›ื•ืœืœื™ื:

  • ืžืคืขื™ืœื™ื - ืœื”ืขื‘ืจืช ื ืชื•ื ื™ื ืžืžืงื•ื ืœืžืงื•ื, ืœืžืฉืœ ืžื˜ื‘ืœืช ืžืกื“ ื ืชื•ื ื™ื ืœืžื—ืกืŸ ื ืชื•ื ื™ื;
  • ื—ื™ื™ืฉื ื™ื - ืœื”ืžืชื ื” ืœื”ืชืจื—ืฉื•ืช ืฉืœ ืื™ืจื•ืข ืžืกื•ื™ื ื•ื”ื›ื•ื•ื ืช ื–ืจื™ืžืช ื”ืฉืœื™ื˜ื” ืœืงื•ื“ืงื•ื“ื™ื ื”ื‘ืื™ื ืฉืœ ื”ื’ืจืฃ;
  • hooks - ืขื‘ื•ืจ ืคืขื•ืœื•ืช ื‘ืจืžื” ื ืžื•ื›ื” ื™ื•ืชืจ, ืœืžืฉืœ, ื›ื“ื™ ืœืงื‘ืœ ื ืชื•ื ื™ื ืžื˜ื‘ืœืช ืžืกื“ ื ืชื•ื ื™ื (ื‘ืฉื™ืžื•ืฉ ื‘ื”ืฆื”ืจื•ืช);
  • ื•ื›ื• '

ื–ื” ื™ื”ื™ื” ืœื ื”ื•ืœื ืœืชืืจ ืืช ื–ืจื™ืžืช ื”ืื•ื•ื™ืจ ืฉืœ Apache ื‘ืคื™ืจื•ื˜ ื‘ืžืืžืจ ื–ื”. ื ื™ืชืŸ ืœืฆืคื•ืช ื‘ืžื‘ื•ืื•ืช ืงืฆืจื™ื ื›ืืŸ ืื• ื›ืืŸ.

ื•ื• ืœืงื‘ืœืช ื ืชื•ื ื™ื

ืงื•ื“ื ื›ืœ, ื›ื“ื™ ืœืคืชื•ืจ ืืช ื”ื‘ืขื™ื”, ืื ื—ื ื• ืฆืจื™ื›ื™ื ืœื›ืชื•ื‘ ื•ื• ืฉืื™ืชื• ื ื•ื›ืœ:

  • ืœื”ืชื—ื‘ืจ ืœืžื™ื™ืœ
  • ืœืžืฆื•ื ืืช ื”ืื•ืช ื”ื ื›ื•ื ื”
  • ืœืงื‘ืœ ื ืชื•ื ื™ื ืžื”ืžื›ืชื‘.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ั ัะปะตะบั‚ั€ะพะฝะฝะพะน ะฟะพั‡ั‚ั‹

           :param imap_conn_id:       ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะพะดะบะปัŽั‡ะตะฝะธั ะบ ะฟะพั‡ั‚ะต
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ะŸะพะดะบะปัŽั‡ะฐะตะผัั ะบ ะฟะพั‡ั‚ะต
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ะœะตั‚ะพะด ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะธะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ะฐ ะฟะพัะปะตะดะฝะตะณะพ ะฟะธััŒะผะฐ, 
            ัƒะดะพะฒะปะตั‚ะฒะพั€ะฐััŽั‰ะตะณะพ ัƒัะปะพะฒะธัะผ ะฟะพะธัะบะฐ

            :param check_seen:      ะžั‚ะผะตั‡ะฐั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟะธััŒะผะพ ะบะฐะบ ะฟั€ะพั‡ะธั‚ะฐะฝะฝะพะต
            :type check_seen:       bool
            :param box:             ะะฐะธะผะตะฝะพะฒะฐะฝะธั ัั‰ะธะบะฐ
            :type box:              string
            :param condition:       ะฃัะปะพะฒะธั ะฟะพะธัะบะฐ ะฟะธัะตะผ
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("ะ’ ัั‰ะธะบะต ะฝะฐะนะดะตะฝั‹ ัะปะตะดัƒัŽั‰ะธะต ะฟะธััŒะผะฐ: " + str(mail_ids))

        if not mail_ids:
            logging.info("ะะต ะฝะฐะนะดะตะฝะพ ะฝะพะฒั‹ั… ะฟะธัะตะผ")
            return None

        mail_id = mail_ids[0]

        # ะตัะปะธ ั‚ะฐะบะธั… ะฟะธัะตะผ ะฝะตัะบะพะปัŒะบะพ
        if len(mail_ids) > 1:
            # ะพั‚ะผะตั‡ะฐะตะผ ะพัั‚ะฐะปัŒะฝั‹ะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผะธ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # ะฒะพะทะฒั€ะฐั‰ะฐะตะผ ะฟะพัะปะตะดะฝะตะต
            mail_id = mail_ids[-1]

        # ะฝัƒะถะฝะพ ะปะธ ะพั‚ะผะตั‚ะธั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

ื”ื”ื™ื’ื™ื•ืŸ ื”ื•ื ื›ื–ื”: ืื ื—ื ื• ืžืชื—ื‘ืจื™ื, ืžื•ืฆืื™ื ืืช ื”ืื•ืช ื”ืื—ืจื•ื ื” ื”ื›ื™ ืจืœื•ื•ื ื˜ื™ืช, ืื ื™ืฉ ืขื•ื“, ืื ื—ื ื• ืžืชืขืœืžื™ื ืžื”ื. ืคื•ื ืงืฆื™ื” ื–ื• ืžืฉืžืฉืช, ื›ื™ ืื•ืชื™ื•ืช ืžืื•ื—ืจื•ืช ื™ื•ืชืจ ืžื›ื™ืœื•ืช ืืช ื›ืœ ื”ื ืชื•ื ื™ื ืฉืœ ืงื•ื“ืžื•ืช. ืื ื–ื” ืœื ื”ืžืงืจื”, ืื– ืืชื” ื™ื›ื•ืœ ืœื”ื—ื–ื™ืจ ืžืขืจืš ืฉืœ ื›ืœ ื”ืื•ืชื™ื•ืช, ืื• ืœืขื‘ื“ ืืช ื”ืจืืฉื•ื ื”, ื•ืืช ื”ืฉืืจ ื‘ืžืขื‘ืจ ื”ื‘ื. ื‘ืื•ืคืŸ ื›ืœืœื™, ื”ื›ืœ, ื›ืžื• ืชืžื™ื“, ืชืœื•ื™ ื‘ืžืฉื™ืžื”.

ืื ื• ืžื•ืกื™ืคื™ื ืฉืชื™ ืคื•ื ืงืฆื™ื•ืช ืขื–ืจ ืœื”ื•ืง: ืœื”ื•ืจื“ืช ืงื•ื‘ืฅ ื•ืœื”ื•ืจื“ืช ืงื•ื‘ืฅ ื‘ืืžืฆืขื•ืช ืงื™ืฉื•ืจ ืžืžื™ื™ืœ. ืื’ื‘, ื ื™ืชืŸ ืœื”ืขื‘ื™ืจ ืื•ืชื ืœืžืคืขื™ืœ, ื–ื” ืชืœื•ื™ ื‘ืชื“ื™ืจื•ืช ื”ืฉื™ืžื•ืฉ ื‘ืคื•ื ืงืฆื™ื•ื ืœื™ื•ืช ื–ื•. ืžื” ืขื•ื“ ืœื”ื•ืกื™ืฃ ืœืงืจืก, ืฉื•ื‘, ืชืœื•ื™ ื‘ืžืฉื™ืžื”: ืื ืงื‘ืฆื™ื ืžืชืงื‘ืœื™ื ืžื™ื“ ื‘ืžื›ืชื‘, ืื– ืืชื” ื™ื›ื•ืœ ืœื”ื•ืจื™ื“ ืงื‘ืฆื™ื ืžืฆื•ืจืคื™ื ืœืžื›ืชื‘, ืื ื”ื ืชื•ื ื™ื ืžืชืงื‘ืœื™ื ื‘ืžื›ืชื‘, ืื– ืืชื” ืฆืจื™ืš ืœื ืชื— ืืช ื”ืžื›ืชื‘, ื•ื›ื• ' ื‘ืžืงืจื” ืฉืœื™, ื”ืžื›ืชื‘ ืžื’ื™ืข ืขื ืงื™ืฉื•ืจ ืื—ื“ ืœืืจื›ื™ื•ืŸ, ืื•ืชื• ืื ื™ ืฆืจื™ืš ืœืฉื™ื ื‘ืžืงื•ื ืžืกื•ื™ื ื•ืœื”ืชื—ื™ืœ ื‘ืชื”ืœื™ืš ื”ืขื™ื‘ื•ื“.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ

            :param url:              ะะดั€ะตั ะทะฐะณั€ัƒะทะบะธ
            :type url:               string
            :param path:             ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:              string
            :param chunk_size:       ะŸะพ ัะบะพะปัŒะบะพ ะฑะฐะนั‚ะพะฒ ะฟะธัะฐั‚ัŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ ะฟะพ ััั‹ะปะบะต ะธะท ะฟะธััŒะผะฐ

            :param mail_id:         ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะธััŒะผะฐ
            :type mail_id:          string
            :param path:            ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

ื”ืงื•ื“ ืคืฉื•ื˜, ื›ืš ืฉื”ื•ื ื›ืžืขื˜ ืœื ืฆืจื™ืš ื”ืกื‘ืจ ื ื•ืกืฃ. ืื ื™ ืจืง ืืกืคืจ ืœืš ืขืœ ืงื• ื”ืงืกื imap_conn_id. Apache Airflow ืžืื—ืกืŸ ืคืจืžื˜ืจื™ื ืฉืœ ื—ื™ื‘ื•ืจ (ื›ื ื™ืกื”, ืกื™ืกืžื”, ื›ืชื•ื‘ืช ื•ืคืจืžื˜ืจื™ื ืื—ืจื™ื) ืฉื ื™ืชืŸ ืœื’ืฉืช ืืœื™ื”ื ื‘ืืžืฆืขื•ืช ืžื–ื”ื” ืžื—ืจื•ื–ืช. ืžื‘ื—ื™ื ื” ื•ื™ื–ื•ืืœื™ืช, ื ื™ื”ื•ืœ ื”ื—ื™ื‘ื•ืจื™ื ื ืจืื” ื›ืš

ืชื”ืœื™ืš ETL ืœืงื‘ืœืช ื ืชื•ื ื™ื ืžืื™ืžื™ื™ืœ ื‘- Apache Airflow

ื—ื™ื™ืฉืŸ ืœื”ืžืชื ื” ืœื ืชื•ื ื™ื

ืžื›ื™ื•ื•ืŸ ืฉืื ื• ื›ื‘ืจ ื™ื•ื“ืขื™ื ื›ื™ืฆื“ ืœื”ืชื—ื‘ืจ ื•ืœืงื‘ืœ ื ืชื•ื ื™ื ืžื”ื“ื•ืืจ, ืื ื• ื™ื›ื•ืœื™ื ื›ืขืช ืœื›ืชื•ื‘ ื—ื™ื™ืฉืŸ ืฉื™ื—ื›ื” ืœื”ื. ื‘ืžืงืจื” ืฉืœื™, ื–ื” ืœื ืขื‘ื“ ืœื›ืชื•ื‘ ืžื™ื“ ืžืคืขื™ืœ ืฉื™ืขื‘ื“ ืืช ื”ื ืชื•ื ื™ื, ืื ื™ืฉ, ื›ื™ ืชื”ืœื™ื›ื™ื ืื—ืจื™ื ืขื•ื‘ื“ื™ื ืขืœ ืกืžืš ื”ื ืชื•ื ื™ื ื”ืžืชืงื‘ืœื™ื ืžื”ื“ื•ืืจ, ื›ื•ืœืœ ืืœื” ืฉืœื•ืงื—ื™ื ื ืชื•ื ื™ื ืงืฉื•ืจื™ื ืžืžืงื•ืจื•ืช ืื—ืจื™ื (API, ื˜ืœืคื•ื ื™ื” , ืžื“ื“ื™ ืื™ื ื˜ืจื ื˜ ื•ื›ื•'). ื•ื›ื•'). ืื ื™ ืืชืŸ ืœืš ื“ื•ื’ืžื”. ืžืฉืชืžืฉ ื—ื“ืฉ ื”ื•ืคื™ืข ื‘ืžืขืจื›ืช ื”-CRM, ื•ืขื“ื™ื™ืŸ ืื™ื ื ื• ื™ื•ื“ืขื™ื ืขืœ ื”-UUID ืฉืœื•. ืœืื—ืจ ืžื›ืŸ, ื›ืืฉืจ ืžื ืกื™ื ืœืงื‘ืœ ื ืชื•ื ื™ื ืžื˜ืœืคื•ื ื™ื” SIP, ื ืงื‘ืœ ืฉื™ื—ื•ืช ื”ืงืฉื•ืจื•ืช ืœ-UUID ืฉืœื”, ืืš ืœื ื ื•ื›ืœ ืœืฉืžื•ืจ ื•ืœื”ืฉืชืžืฉ ื‘ื”ืŸ ื‘ืฆื•ืจื” ื ื›ื•ื ื”. ื‘ื ื•ืฉืื™ื ื›ืืœื”, ื—ืฉื•ื‘ ืœื–ื›ื•ืจ ืืช ื”ืชืœื•ืช ืฉืœ ื”ื ืชื•ื ื™ื, ื‘ืžื™ื•ื—ื“ ืื ื”ื ืžืžืงื•ืจื•ืช ืฉื•ื ื™ื. ืืœื•, ื›ืžื•ื‘ืŸ, ืืžืฆืขื™ื ืœื ืžืกืคื™ืงื™ื ืœืฉืžื™ืจื” ืขืœ ืฉืœืžื•ืช ื”ื ืชื•ื ื™ื, ืืš ื‘ืžืงืจื™ื ืžืกื•ื™ืžื™ื ื”ื ื ื—ื•ืฆื™ื. ื›ืŸ, ื•ื”ื‘ื˜ืœื” ื›ื“ื™ ืœื›ื‘ื•ืฉ ืžืฉืื‘ื™ื ื”ื™ื ื’ื ืœื ื”ื’ื™ื•ื ื™ืช.

ืœืคื™ื›ืš, ื”ื—ื™ื™ืฉืŸ ืฉืœื ื• ื™ืฉื™ืง ืงื•ื“ืงื•ื“ื™ื ืฉืœ ื”ื’ืจืฃ ืื ื™ืฉ ืžื™ื“ืข ื˜ืจื™ ื‘ื“ื•ืืจ, ื•ื’ื ื™ืกืžืŸ ืืช ื”ืžื™ื“ืข ื”ืงื•ื“ื ื›ืœื ืจืœื•ื•ื ื˜ื™.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

ืื ื• ืžืงื‘ืœื™ื ื ืชื•ื ื™ื ื•ืžืฉืชืžืฉื™ื ื‘ื”ื

ื›ื“ื™ ืœืงื‘ืœ ื•ืœืขื‘ื“ ื ืชื•ื ื™ื, ืืชื” ื™ื›ื•ืœ ืœื›ืชื•ื‘ ืื•ืคืจื˜ื•ืจ ื ืคืจื“, ืืชื” ื™ื›ื•ืœ ืœื”ืฉืชืžืฉ ื‘ืืœื” ืžื•ื›ื ื™ื. ืžื›ื™ื•ื•ืŸ ืฉืขื“ ื›ื” ื”ื”ื™ื’ื™ื•ืŸ ื”ื•ื ื˜ืจื™ื•ื•ื™ืืœื™ - ืœืงื—ืช ื ืชื•ื ื™ื ืžื”ืžื›ืชื‘, ืœืžืฉืœ, ืื ื™ ืžืฆื™ืข ืืช ื”-PythonOperator ื”ืกื˜ื ื“ืจื˜ื™

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# ะกั‚ะฐะฝะดะฐั€ั‚ะฝะพะต ะบะพะฝั„ะธะณัƒั€ะธั€ะพะฒะฐะฝะธะต ะณั€ะฐั„ะฐ
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ะžะฟั€ะตะดะตะปัะตะผ ัะตะฝัะพั€
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# ะคัƒะฝะบั†ะธั ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ะธะท ะฟะธััŒะผะฐ
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฒะตั€ัˆะธะฝ ะณั€ะฐั„ะฐ
...

# ะ—ะฐะดะฐะตะผ ัะฒัะทัŒ ะฝะฐ ะณั€ะฐั„ะต
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฟะพั‚ะพะบะพะฒ ัƒะฟั€ะฐะฒะปะตะฝะธั

ืื’ื‘, ืื ื”ื“ื•ืืจ ื”ืืจื’ื•ื ื™ ืฉืœืš ื ืžืฆื ื’ื ื‘-mail.ru, ืื– ืœื ืชื•ื›ืœ ืœื—ืคืฉ ืžื›ืชื‘ื™ื ืœืคื™ ื ื•ืฉื, ืฉื•ืœื— ื•ื›ื•'. ืขื•ื“ ื‘-2016, ื”ื ื”ื‘ื˜ื™ื—ื• ืœื”ืฆื™ื’ ืืช ื–ื”, ืื‘ืœ ื›ื ืจืื” ืฉื™ื ื• ืืช ื“ืขืชื. ืคืชืจืชื™ ื‘ืขื™ื” ื–ื• ืขืœ ื™ื“ื™ ื™ืฆื™ืจืช ืชื™ืงื™ื™ื” ื ืคืจื“ืช ืœืื•ืชื™ื•ืช ื”ื ื—ื•ืฆื•ืช ื•ื”ื’ื“ืจืช ืžืกื ืŸ ืœืื•ืชื™ื•ืช ื”ื“ืจื•ืฉื•ืช ื‘ืžืžืฉืง ื”ืื™ื ื˜ืจื ื˜ ืฉืœ ื”ื“ื•ืืจ. ืœืคื™ื›ืš, ืจืง ื”ืžื›ืชื‘ื™ื ื•ื”ืชื ืื™ื ื”ื“ืจื•ืฉื™ื ืœื—ื™ืคื•ืฉ, ื‘ืžืงืจื” ืฉืœื™, ืคืฉื•ื˜ (UNSEEN) ื ื›ื ืกื™ื ืœืชื™ืงื™ื” ื–ื•.

ืœืกื™ื›ื•ื, ื™ืฉ ืœื ื• ืืช ื”ืจืฆืฃ ื”ื‘ื: ืื ื—ื ื• ื‘ื•ื“ืงื™ื ืื ื™ืฉ ืื•ืชื™ื•ืช ื—ื“ืฉื•ืช ืฉืขื•ืžื“ื•ืช ื‘ืชื ืื™ื, ืื ื™ืฉ, ืื– ืื ื—ื ื• ืžื•ืจื™ื“ื™ื ืืช ื”ืืจื›ื™ื•ืŸ ื‘ืืžืฆืขื•ืช ื”ืงื™ืฉื•ืจ ืžื”ืžื›ืชื‘ ื”ืื—ืจื•ืŸ.
ืžืชื—ืช ืœื ืงื•ื“ื•ืช ื”ืื—ืจื•ื ื•ืช, ื ืฉืžื˜ ืฉื”ืืจื›ื™ื•ืŸ ื”ื–ื” ื™ืคื•ืจืง, ื”ื ืชื•ื ื™ื ืžื”ืืจื›ื™ื•ืŸ ื™ื ื•ืงื• ื•ื™ืขื•ื‘ื“ื•, ื•ื›ืชื•ืฆืื” ืžื›ืš, ื›ืœ ื”ืขื ื™ื™ืŸ ื™ืœืš ืจื—ื•ืง ื™ื•ืชืจ ืœืฆื™ื ื•ืจ ืฉืœ ืชื”ืœื™ืš ETL, ืื‘ืœ ื–ื” ื›ื‘ืจ ืžืขื‘ืจ ื”ื™ืงืฃ ื”ืžืืžืจ. ืื ื–ื” ื™ืฆื ืžืขื ื™ื™ืŸ ื•ืฉื™ืžื•ืฉื™, ืื– ื‘ืฉืžื—ื” ืืžืฉื™ืš ืœืชืืจ ืคืชืจื•ื ื•ืช ETL ื•ื”ื—ืœืงื™ื ืฉืœื”ื ืขื‘ื•ืจ Apache Airflow.

ืžืงื•ืจ: www.habr.com

ื”ื•ืกืคืช ืชื’ื•ื‘ื”