Apache Airflow์˜ ์ด๋ฉ”์ผ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•œ ETL ํ”„๋กœ์„ธ์Šค

Apache Airflow์˜ ์ด๋ฉ”์ผ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•œ ETL ํ”„๋กœ์„ธ์Šค

๊ธฐ์ˆ ์ด ์•„๋ฌด๋ฆฌ ๋ฐœ์ „ํ•ด๋„ ์‹œ๋Œ€์— ๋’ค๋–จ์–ด์ง„ ์ ‘๊ทผ ๋ฐฉ์‹์€ ๋Š˜ ๋’ค์ฒ˜์ง„๋‹ค. ์ด๋Š” ์ˆœ์กฐ๋กœ์šด ์ „ํ™˜, ์ธ์  ์š”์ธ, ๊ธฐ์ˆ ์  ์š”๊ตฌ ๋˜๋Š” ๊ธฐํƒ€ ์š”์ธ ๋•Œ๋ฌธ์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ถ„์•ผ์—์„œ ๋ฐ์ดํ„ฐ ์†Œ์Šค๋Š” ์ด ๋ถ€๋ถ„์—์„œ ๊ฐ€์žฅ ๋งŽ์ด ๋“œ๋Ÿฌ๋‚œ๋‹ค. ์šฐ๋ฆฌ๊ฐ€ ์ด๊ฒƒ์„ ์ œ๊ฑฐํ•˜๋Š” ๊ฒƒ์„ ์•„๋ฌด๋ฆฌ ๊ฟˆ๊พธ๋”๋ผ๋„ ์ง€๊ธˆ๊นŒ์ง€ ๋ฐ์ดํ„ฐ์˜ ์ผ๋ถ€๋Š” ๋” ์˜ค๋ž˜๋œ ํ˜•์‹์€ ๋งํ•  ๊ฒƒ๋„ ์—†๊ณ  ์ธ์Šคํ„ดํŠธ ๋ฉ”์‹ ์ €์™€ ์ด๋ฉ”์ผ๋กœ ์ „์†ก๋ฉ๋‹ˆ๋‹ค. ์ด๋ฉ”์ผ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•˜๋Š” Apache Airflow ์˜ต์…˜ ์ค‘ ํ•˜๋‚˜๋ฅผ ๋ถ„ํ•ดํ•˜๋„๋ก ์ดˆ๋Œ€ํ•ฉ๋‹ˆ๋‹ค.

์„ ์‚ฌ ์‹œ๋Œ€

๋Œ€์ธ ์ปค๋ฎค๋‹ˆ์ผ€์ด์…˜์—์„œ ํšŒ์‚ฌ ๊ฐ„ ์ƒํ˜ธ ์ž‘์šฉ ํ‘œ์ค€์— ์ด๋ฅด๊ธฐ๊นŒ์ง€ ๋งŽ์€ ๋ฐ์ดํ„ฐ๊ฐ€ ์—ฌ์ „ํžˆ ์ „์ž ๋ฉ”์ผ์„ ํ†ตํ•ด ์ „์†ก๋ฉ๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ๋ฅผ ์–ป๊ธฐ ์œ„ํ•œ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ž‘์„ฑํ•˜๊ฑฐ๋‚˜ ์ด ์ •๋ณด๋ฅผ ๋” ํŽธ๋ฆฌํ•œ ์†Œ์Šค์— ์ž…๋ ฅํ•  ์‚ฌ๋žŒ๋“ค์„ ์‚ฌ๋ฌด์‹ค์— ๋ฐฐ์น˜ํ•  ์ˆ˜ ์žˆ๋‹ค๋ฉด ์ข‹์ง€๋งŒ ์ข…์ข… ์ด๊ฒƒ์€ ๋‹จ์ˆœํžˆ ๋ถˆ๊ฐ€๋Šฅํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‚ด๊ฐ€ ์ง๋ฉดํ•œ ๊ตฌ์ฒด์ ์ธ ์ž‘์—…์€ ์•…๋ช… ๋†’์€ CRM ์‹œ์Šคํ…œ์„ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค์— ์—ฐ๊ฒฐํ•œ ๋‹ค์Œ OLAP ์‹œ์Šคํ…œ์— ์—ฐ๊ฒฐํ•˜๋Š” ๊ฒƒ์ด์—ˆ์Šต๋‹ˆ๋‹ค. ์—ญ์‚ฌ์ ์œผ๋กœ ์šฐ๋ฆฌ ํšŒ์‚ฌ์˜ ๊ฒฝ์šฐ ํŠน์ • ๋น„์ฆˆ๋‹ˆ์Šค ์˜์—ญ์—์„œ์ด ์‹œ์Šคํ…œ์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ํŽธ๋ฆฌํ–ˆ์Šต๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ๋ชจ๋‘๊ฐ€ ์ด ํƒ€์‚ฌ ์‹œ์Šคํ…œ์˜ ๋ฐ์ดํ„ฐ๋กœ๋„ ์ž‘๋™ํ•  ์ˆ˜ ์žˆ๊ธฐ๋ฅผ ์›ํ–ˆ์Šต๋‹ˆ๋‹ค. ๋ฌผ๋ก  ๋จผ์ € ๊ณต๊ฐœ API์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ๋Š” ๊ฐ€๋Šฅ์„ฑ์— ๋Œ€ํ•ด ์—ฐ๊ตฌํ–ˆ์Šต๋‹ˆ๋‹ค. ๋ถˆํ–‰ํžˆ๋„ API๋Š” ํ•„์š”ํ•œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ์–ป๋Š” ๊ฒƒ์„ ๋‹ค๋ฃจ์ง€ ์•Š์•˜๊ณ , ๊ฐ„๋‹จํžˆ ๋งํ•ด์„œ ์—ฌ๋Ÿฌ ๋ฉด์—์„œ ๋น„๋šค์–ด์ ธ ์žˆ์—ˆ๊ณ  ๊ธฐ์ˆ  ์ง€์›์€ ๋” ํฌ๊ด„์ ์ธ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•˜๊ธฐ๋ฅผ ์›ํ•˜์ง€ ์•Š์•˜๊ฑฐ๋‚˜ ๋งŒ๋‚  ์ˆ˜ ์—†์—ˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜์ด ์‹œ์Šคํ…œ์€ ๋ˆ„๋ฝ ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์•„์นด์ด๋ธŒ ์–ธ๋กœ๋“œ ๋งํฌ ํ˜•ํƒœ๋กœ ๋ฉ”์ผ๋กœ ์ฃผ๊ธฐ์ ์œผ๋กœ๋ฐ›์„ ์ˆ˜์žˆ๋Š” ๊ธฐํšŒ๋ฅผ ์ œ๊ณตํ–ˆ์Šต๋‹ˆ๋‹ค.

๋น„์ฆˆ๋‹ˆ์Šค๊ฐ€ ์ด๋ฉ”์ผ์ด๋‚˜ ์ธ์Šคํ„ดํŠธ ๋ฉ”์‹ ์ €์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ธฐ๋ฅผ ์›ํ–ˆ๋˜ ๊ฒฝ์šฐ๋Š” ์ด๊ฒƒ์ด ์œ ์ผํ•œ ๊ฒฝ์šฐ๊ฐ€ ์•„๋‹ˆ๋ผ๋Š” ์ ์— ์œ ์˜ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ด ๊ฒฝ์šฐ ์ด๋Ÿฌํ•œ ๋ฐฉ์‹์œผ๋กœ๋งŒ ๋ฐ์ดํ„ฐ์˜ ์ผ๋ถ€๋ฅผ ์ œ๊ณตํ•˜๋Š” ์ œXNUMX์ž ํšŒ์‚ฌ์— ์˜ํ–ฅ์„ ๋ฏธ์น  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.

์•„ํŒŒ์น˜ ์—์–ด ํ”Œ๋กœ์šฐ

ETL ํ”„๋กœ์„ธ์Šค๋ฅผ ๊ตฌ์ถ•ํ•˜๊ธฐ ์œ„ํ•ด Apache Airflow๋ฅผ ๊ฐ€์žฅ ์ž์ฃผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ด ๊ธฐ์ˆ ์— ์ต์ˆ™ํ•˜์ง€ ์•Š์€ ๋…์ž๊ฐ€ ์ƒํ™ฉ๊ณผ ์ผ๋ฐ˜์ ์œผ๋กœ ์–ด๋–ป๊ฒŒ ๋ณด์ด๋Š”์ง€ ๋” ์ž˜ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๋„๋ก ๋ช‡ ๊ฐ€์ง€ ์†Œ๊ฐœ๋ฅผ ์„ค๋ช…ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

Apache Airflow๋Š” Python์—์„œ ETL(Extract-Transform-Loading) ํ”„๋กœ์„ธ์Šค๋ฅผ ๋นŒ๋“œ, ์‹คํ–‰ ๋ฐ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋Š” ๋ฌด๋ฃŒ ํ”Œ๋žซํผ์ž…๋‹ˆ๋‹ค. Airflow์˜ ์ฃผ์š” ๊ฐœ๋…์€ ๊ทธ๋ž˜ํ”„์˜ ์ •์ ์ด ํŠน์ • ํ”„๋กœ์„ธ์Šค์ด๊ณ  ๊ทธ๋ž˜ํ”„์˜ ๊ฐ€์žฅ์ž๋ฆฌ๊ฐ€ ์ œ์–ด ๋˜๋Š” ์ •๋ณด์˜ ํ๋ฆ„์ธ ๋ฐฉํ–ฅ์„ฑ ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„์ž…๋‹ˆ๋‹ค. ํ”„๋กœ์„ธ์Šค๋Š” ๋‹จ์ˆœํžˆ Python ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜๊ฑฐ๋‚˜ ํด๋ž˜์Šค ์ปจํ…์ŠคํŠธ์—์„œ ์—ฌ๋Ÿฌ ํ•จ์ˆ˜๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ํ˜ธ์ถœํ•˜๋Š” ๋” ๋ณต์žกํ•œ ๋…ผ๋ฆฌ๋ฅผ ๊ฐ€์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐ€์žฅ ๋นˆ๋ฒˆํ•œ ์ž‘์—…์˜ ๊ฒฝ์šฐ ํ”„๋กœ์„ธ์Šค๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ์„ฑ ๊ฐœ๋ฐœ์ด ์ด๋ฏธ ๋งŽ์ด ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ๊ฐœ๋ฐœ์—๋Š” ๋‹ค์Œ์ด ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.

  • ์—ฐ์‚ฐ์ž - ์˜ˆ๋ฅผ ๋“ค์–ด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ํ…Œ์ด๋ธ”์—์„œ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ ๊ณณ์—์„œ ๋‹ค๋ฅธ ๊ณณ์œผ๋กœ ์ „์†กํ•ฉ๋‹ˆ๋‹ค.
  • ์„ผ์„œ - ํŠน์ • ์ด๋ฒคํŠธ์˜ ๋ฐœ์ƒ์„ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์ œ์–ด ํ๋ฆ„์„ ๊ทธ๋ž˜ํ”„์˜ ํ›„์† ๊ผญ์ง€์ ์œผ๋กœ ๋ณด๋ƒ…๋‹ˆ๋‹ค.
  • ํ›„ํฌ - ์˜ˆ๋ฅผ ๋“ค์–ด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ํ…Œ์ด๋ธ”์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•œ ํ•˜์œ„ ์ˆ˜์ค€ ์ž‘์—…์šฉ(๋ฌธ์—์„œ ์‚ฌ์šฉ๋จ)
  • ๊ธฐํƒ€

์ด ๊ธฐ์‚ฌ์—์„œ Apache Airflow๋ฅผ ์ž์„ธํžˆ ์„ค๋ช…ํ•˜๋Š” ๊ฒƒ์€ ๋ถ€์ ์ ˆํ•ฉ๋‹ˆ๋‹ค. ๊ฐ„๋žตํ•œ ์†Œ๊ฐœ๋ฅผ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์— ๋˜๋Š” ์—ฌ๊ธฐ์—.

๋ฐ์ดํ„ฐ๋ฅผ ์–ป๊ธฐ ์œ„ํ•œ ํ›„ํฌ

์šฐ์„ , ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๋ ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ํ›„ํฌ๋ฅผ ์ž‘์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  • ์ด๋ฉ”์ผ์— ์—ฐ๊ฒฐ
  • ์˜ฌ๋ฐ”๋ฅธ ๋ฌธ์ž ์ฐพ๊ธฐ
  • ํŽธ์ง€์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์Šต๋‹ˆ๋‹ค.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ั ัะปะตะบั‚ั€ะพะฝะฝะพะน ะฟะพั‡ั‚ั‹

           :param imap_conn_id:       ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะพะดะบะปัŽั‡ะตะฝะธั ะบ ะฟะพั‡ั‚ะต
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ะŸะพะดะบะปัŽั‡ะฐะตะผัั ะบ ะฟะพั‡ั‚ะต
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ะœะตั‚ะพะด ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะธะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ะฐ ะฟะพัะปะตะดะฝะตะณะพ ะฟะธััŒะผะฐ, 
            ัƒะดะพะฒะปะตั‚ะฒะพั€ะฐััŽั‰ะตะณะพ ัƒัะปะพะฒะธัะผ ะฟะพะธัะบะฐ

            :param check_seen:      ะžั‚ะผะตั‡ะฐั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟะธััŒะผะพ ะบะฐะบ ะฟั€ะพั‡ะธั‚ะฐะฝะฝะพะต
            :type check_seen:       bool
            :param box:             ะะฐะธะผะตะฝะพะฒะฐะฝะธั ัั‰ะธะบะฐ
            :type box:              string
            :param condition:       ะฃัะปะพะฒะธั ะฟะพะธัะบะฐ ะฟะธัะตะผ
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("ะ’ ัั‰ะธะบะต ะฝะฐะนะดะตะฝั‹ ัะปะตะดัƒัŽั‰ะธะต ะฟะธััŒะผะฐ: " + str(mail_ids))

        if not mail_ids:
            logging.info("ะะต ะฝะฐะนะดะตะฝะพ ะฝะพะฒั‹ั… ะฟะธัะตะผ")
            return None

        mail_id = mail_ids[0]

        # ะตัะปะธ ั‚ะฐะบะธั… ะฟะธัะตะผ ะฝะตัะบะพะปัŒะบะพ
        if len(mail_ids) > 1:
            # ะพั‚ะผะตั‡ะฐะตะผ ะพัั‚ะฐะปัŒะฝั‹ะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผะธ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # ะฒะพะทะฒั€ะฐั‰ะฐะตะผ ะฟะพัะปะตะดะฝะตะต
            mail_id = mail_ids[-1]

        # ะฝัƒะถะฝะพ ะปะธ ะพั‚ะผะตั‚ะธั‚ัŒ ะฟะพัะปะตะดะฝะตะต ะฟั€ะพั‡ะธั‚ะฐะฝะฝั‹ะผ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

๋…ผ๋ฆฌ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. ์—ฐ๊ฒฐํ•˜๊ณ  ๊ฐ€์žฅ ๊ด€๋ จ์„ฑ์ด ๋†’์€ ๋งˆ์ง€๋ง‰ ๋ฌธ์ž๋ฅผ ์ฐพ๊ณ  ๋‹ค๋ฅธ ๋ฌธ์ž๊ฐ€ ์žˆ์œผ๋ฉด ๋ฌด์‹œํ•ฉ๋‹ˆ๋‹ค. ์ด ๊ธฐ๋Šฅ์€ ์ดํ›„ ๋ฌธ์ž๊ฐ€ ์ด์ „ ๋ฌธ์ž์˜ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ํฌํ•จํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ๊ทธ๋ ‡์ง€ ์•Š์€ ๊ฒฝ์šฐ ๋ชจ๋“  ๋ฌธ์ž์˜ ๋ฐฐ์—ด์„ ๋ฐ˜ํ™˜ํ•˜๊ฑฐ๋‚˜ ์ฒซ ๋ฒˆ์งธ ๋ฌธ์ž๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ๋‚˜๋จธ์ง€๋Š” ๋‹ค์Œ ํŒจ์Šค์—์„œ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ผ๋ฐ˜์ ์œผ๋กœ ํ•ญ์ƒ ๊ทธ๋ ‡๋“ฏ์ด ๋ชจ๋“  ๊ฒƒ์€ ์ž‘์—…์— ๋”ฐ๋ผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค.

ํ›„ํฌ์— ๋‘ ๊ฐ€์ง€ ๋ณด์กฐ ๊ธฐ๋Šฅ์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค: ํŒŒ์ผ์„ ๋‹ค์šด๋กœ๋“œํ•˜๊ณ  ์ด๋ฉ”์ผ์˜ ๋งํฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒ์ผ์„ ๋‹ค์šด๋กœ๋“œํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๊ทธ๊ฑด ๊ทธ๋ ‡๊ณ , ๊ทธ๋“ค์€ ์šด์˜์ž์—๊ฒŒ ์ด๋™ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ์ด ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜๋Š” ๋นˆ๋„์— ๋”ฐ๋ผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค. ๋‹ค์‹œ ํ›„ํฌ์— ์ถ”๊ฐ€ํ•  ํ•ญ๋ชฉ์€ ์ž‘์—…์— ๋”ฐ๋ผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค. ํŒŒ์ผ์ด ํŽธ์ง€๋กœ ์ฆ‰์‹œ ์ˆ˜์‹ ๋˜๋ฉด ํŽธ์ง€์— ์ฒจ๋ถ€ ํŒŒ์ผ์„ ๋‹ค์šด๋กœ๋“œํ•  ์ˆ˜ ์žˆ๊ณ , ๋ฐ์ดํ„ฐ๊ฐ€ ํŽธ์ง€๋กœ ์ˆ˜์‹ ๋˜๋ฉด ํŽธ์ง€๋ฅผ ๊ตฌ๋ฌธ ๋ถ„์„ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๋“ฑ. ๋‚ด ๊ฒฝ์šฐ ํŽธ์ง€์—๋Š” ์•„์นด์ด๋ธŒ์— ๋Œ€ํ•œ ํ•˜๋‚˜์˜ ๋งํฌ๊ฐ€ ํ•จ๊ป˜ ์ œ๊ณต๋˜๋ฉฐ ํŠน์ • ์œ„์น˜์— ๋„ฃ๊ณ  ์ถ”๊ฐ€ ์ฒ˜๋ฆฌ ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹œ์ž‘ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ

            :param url:              ะะดั€ะตั ะทะฐะณั€ัƒะทะบะธ
            :type url:               string
            :param path:             ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:              string
            :param chunk_size:       ะŸะพ ัะบะพะปัŒะบะพ ะฑะฐะนั‚ะพะฒ ะฟะธัะฐั‚ัŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ะœะตั‚ะพะด ะดะปั ัะบะฐั‡ะธะฒะฐะฝะธั ั„ะฐะนะปะฐ ะฟะพ ััั‹ะปะบะต ะธะท ะฟะธััŒะผะฐ

            :param mail_id:         ะ˜ะดะตะฝั‚ะธั„ะธะบะฐั‚ะพั€ ะฟะธััŒะผะฐ
            :type mail_id:          string
            :param path:            ะšัƒะดะฐ ะฟะพะปะพะถะธั‚ัŒ ั„ะฐะนะป
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

์ฝ”๋“œ๋Š” ๊ฐ„๋‹จํ•˜๋ฏ€๋กœ ์ถ”๊ฐ€ ์„ค๋ช…์ด ๊ฑฐ์˜ ํ•„์š”ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋งค์ง๋ผ์ธ imap_conn_id์— ๋Œ€ํ•ด์„œ๋งŒ ๋ง์”€๋“œ๋ฆฌ๊ฒ ์Šต๋‹ˆ๋‹ค. Apache Airflow๋Š” ๋ฌธ์ž์—ด ์‹๋ณ„์ž๋กœ ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ๋Š” ์—ฐ๊ฒฐ ๋งค๊ฐœ๋ณ€์ˆ˜(๋กœ๊ทธ์ธ, ๋น„๋ฐ€๋ฒˆํ˜ธ, ์ฃผ์†Œ ๋ฐ ๊ธฐํƒ€ ๋งค๊ฐœ๋ณ€์ˆ˜)๋ฅผ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค. ์‹œ๊ฐ์ ์œผ๋กœ ์—ฐ๊ฒฐ ๊ด€๋ฆฌ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

Apache Airflow์˜ ์ด๋ฉ”์ผ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•œ ETL ํ”„๋กœ์„ธ์Šค

๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์„ผ์„œ

๋ฉ”์ผ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์—ฐ๊ฒฐํ•˜๊ณ  ์ˆ˜์‹ ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์ด๋ฏธ ์•Œ๊ณ  ์žˆ์œผ๋ฏ€๋กœ ์ด์ œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์„ผ์„œ๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‚ด ๊ฒฝ์šฐ์—๋Š” ๋‹ค๋ฅธ ์†Œ์Šค(API, ์ „ํ™” ํ†ต์‹ )์—์„œ ๊ด€๋ จ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ํ”„๋กœ์„ธ์Šค๋ฅผ ํฌํ•จํ•˜์—ฌ ๋ฉ”์ผ์—์„œ ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๋‹ค๋ฅธ ํ”„๋กœ์„ธ์Šค๊ฐ€ ์ž‘๋™ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์—ฐ์‚ฐ์ž๋ฅผ ์ฆ‰์‹œ โ€‹โ€‹์ž‘์„ฑํ•˜์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค. , ์›น ๋ฉ”ํŠธ๋ฆญ ๋“ฑ) ๋“ฑ). ์˜ˆ๋ฅผ ๋“ค์–ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. CRM ์‹œ์Šคํ…œ์— ์ƒˆ๋กœ์šด ์‚ฌ์šฉ์ž๊ฐ€ ๋‚˜ํƒ€ ๋‚ฌ์œผ๋ฉฐ ์—ฌ์ „ํžˆ ๊ทธ์˜ UUID์— ๋Œ€ํ•ด ์•Œ์ง€ ๋ชปํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ SIP ์ „ํ™” ํ†ต์‹ ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜๋ ค๊ณ  ํ•˜๋ฉด UUID์— ์—ฐ๊ฒฐ๋œ ์ „ํ™”๋ฅผ ๋ฐ›์ง€๋งŒ ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์ €์žฅํ•˜๊ณ  ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ๋ฌธ์ œ์—์„œ ๋ฐ์ดํ„ฐ์˜ ์˜์กด์„ฑ์„ ์—ผ๋‘์— ๋‘๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค. ํŠนํžˆ ๋ฐ์ดํ„ฐ ์†Œ์Šค๊ฐ€ ์„œ๋กœ ๋‹ค๋ฅธ ๊ฒฝ์šฐ์—๋Š” ๋”์šฑ ๊ทธ๋ ‡์Šต๋‹ˆ๋‹ค. ๋ฌผ๋ก  ์ด๋Ÿฌํ•œ ์กฐ์น˜๋Š” ๋ฐ์ดํ„ฐ ๋ฌด๊ฒฐ์„ฑ์„ ์œ ์ง€ํ•˜๊ธฐ์—๋Š” ๋ถˆ์ถฉ๋ถ„ํ•˜์ง€๋งŒ ๊ฒฝ์šฐ์— ๋”ฐ๋ผ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ, ์ž์›์„ ์ฐจ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ์œ ํœด ์ƒํƒœ๋ฅผ ์œ ์ง€ํ•˜๋Š” ๊ฒƒ๋„ ๋น„ํ•ฉ๋ฆฌ์ ์ž…๋‹ˆ๋‹ค.

๋”ฐ๋ผ์„œ ์„ผ์„œ๋Š” ๋ฉ”์ผ์— ์ƒˆ๋กœ์šด ์ •๋ณด๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ ๊ทธ๋ž˜ํ”„์˜ ํ›„์† ์ •์ ์„ ์‹œ์ž‘ํ•˜๊ณ  ์ด์ „ ์ •๋ณด๋ฅผ ๊ด€๋ จ์ด ์—†๋Š” ๊ฒƒ์œผ๋กœ ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

์šฐ๋ฆฌ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค

๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ๋ณ„๋„์˜ ์—ฐ์‚ฐ์ž๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ๊ธฐ์„ฑํ’ˆ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ง€๊ธˆ๊นŒ์ง€ ๋…ผ๋ฆฌ๋Š” ์‚ฌ์†Œํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์˜ˆ๋ฅผ ๋“ค์–ด ๋ฌธ์ž์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๋ ค๋ฉด ํ‘œ์ค€ PythonOperator๋ฅผ ์ œ์•ˆํ•ฉ๋‹ˆ๋‹ค.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# ะกั‚ะฐะฝะดะฐั€ั‚ะฝะพะต ะบะพะฝั„ะธะณัƒั€ะธั€ะพะฒะฐะฝะธะต ะณั€ะฐั„ะฐ
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ะžะฟั€ะตะดะตะปัะตะผ ัะตะฝัะพั€
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# ะคัƒะฝะบั†ะธั ะดะปั ะฟะพะปัƒั‡ะตะฝะธั ะดะฐะฝะฝั‹ั… ะธะท ะฟะธััŒะผะฐ
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฒะตั€ัˆะธะฝ ะณั€ะฐั„ะฐ
...

# ะ—ะฐะดะฐะตะผ ัะฒัะทัŒ ะฝะฐ ะณั€ะฐั„ะต
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ะžะฟะธัะฐะฝะธะต ะพัั‚ะฐะปัŒะฝั‹ั… ะฟะพั‚ะพะบะพะฒ ัƒะฟั€ะฐะฒะปะตะฝะธั

๊ทธ๊ฑด ๊ทธ๋ ‡๊ณ , ํšŒ์‚ฌ ๋ฉ”์ผ์ด mail.ru์—๋„ ์žˆ์œผ๋ฉด ์ œ๋ชฉ, ๋ฐœ์‹ ์ž ๋“ฑ์œผ๋กœ ํŽธ์ง€๋ฅผ ๊ฒ€์ƒ‰ํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. 2016๋…„์— ๋„์ž…ํ•˜๊ฒ ๋‹ค๊ณ  ์•ฝ์†ํ–ˆ์ง€๋งŒ ๋งˆ์Œ์ด ๋ฐ”๋€ ๊ฒƒ ๊ฐ™๋‹ค. ํ•„์š”ํ•œ ๊ธ€์ž์— ๋Œ€ํ•œ ๋ณ„๋„์˜ ํด๋”๋ฅผ ๋งŒ๋“ค๊ณ  ๋ฉ”์ผ ์›น ์ธํ„ฐํŽ˜์ด์Šค์—์„œ ํ•„์š”ํ•œ ๊ธ€์ž์— ๋Œ€ํ•œ ํ•„ํ„ฐ๋ฅผ ์„ค์ •ํ•˜์—ฌ ์ด ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ–ˆ์Šต๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ๊ฒ€์ƒ‰์— ํ•„์š”ํ•œ ๋ฌธ์ž์™€ ์กฐ๊ฑด, ์ œ ๊ฒฝ์šฐ์—๋Š” ๋‹จ์ˆœํžˆ (UNSEEN) ์ด ํด๋”์— ๋“ค์–ด๊ฐ‘๋‹ˆ๋‹ค.

์š”์•ฝํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ˆœ์„œ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์กฐ๊ฑด์„ ์ถฉ์กฑํ•˜๋Š” ์ƒˆ ๋ฌธ์ž๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๊ณ  ์žˆ๋Š” ๊ฒฝ์šฐ ๋งˆ์ง€๋ง‰ ๋ฌธ์ž์˜ ๋งํฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์•„์นด์ด๋ธŒ๋ฅผ ๋‹ค์šด๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค.
๋งˆ์ง€๋ง‰ ์  ์•„๋ž˜์—๋Š” ์ด ์•„์นด์ด๋ธŒ๊ฐ€ ์••์ถ• ํ•ด์ œ๋˜๊ณ  ์•„์นด์ด๋ธŒ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์ง€์›Œ์ง€๊ณ  ์ฒ˜๋ฆฌ๋˜๋ฉฐ ๊ฒฐ๊ณผ์ ์œผ๋กœ ๋ชจ๋“  ๊ฒƒ์ด ETL ํ”„๋กœ์„ธ์Šค์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ์ด๋™ํ•œ๋‹ค๋Š” ๊ฒƒ์ด ์ƒ๋žต๋˜์–ด ์žˆ์ง€๋งŒ ์ด๊ฒƒ์€ ์ด๋ฏธ ๋„˜์–ด์„ฐ์Šต๋‹ˆ๋‹ค. ๊ธฐ์‚ฌ์˜ ๋ฒ”์œ„. ํฅ๋ฏธ๋กญ๊ณ  ์œ ์šฉํ•œ ๊ฒƒ์œผ๋กœ ํŒ๋ช…๋˜๋ฉด ETL ์†”๋ฃจ์…˜๊ณผ Apache Airflow์šฉ ๋ถ€ํ’ˆ์— ๋Œ€ํ•ด ๊ธฐ๊บผ์ด ๊ณ„์† ์„ค๋ช…ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

์ถœ์ฒ˜ : habr.com

์ฝ”๋ฉ˜ํŠธ๋ฅผ ์ถ”๊ฐ€