๊ธฐ์ ์ด ์๋ฌด๋ฆฌ ๋ฐ์ ํด๋ ์๋์ ๋ค๋จ์ด์ง ์ ๊ทผ ๋ฐฉ์์ ๋ ๋ค์ฒ์ง๋ค. ์ด๋ ์์กฐ๋ก์ด ์ ํ, ์ธ์ ์์ธ, ๊ธฐ์ ์ ์๊ตฌ ๋๋ ๊ธฐํ ์์ธ ๋๋ฌธ์ผ ์ ์์ต๋๋ค. ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ถ์ผ์์ ๋ฐ์ดํฐ ์์ค๋ ์ด ๋ถ๋ถ์์ ๊ฐ์ฅ ๋ง์ด ๋๋ฌ๋๋ค. ์ฐ๋ฆฌ๊ฐ ์ด๊ฒ์ ์ ๊ฑฐํ๋ ๊ฒ์ ์๋ฌด๋ฆฌ ๊ฟ๊พธ๋๋ผ๋ ์ง๊ธ๊น์ง ๋ฐ์ดํฐ์ ์ผ๋ถ๋ ๋ ์ค๋๋ ํ์์ ๋งํ ๊ฒ๋ ์๊ณ ์ธ์คํดํธ ๋ฉ์ ์ ์ ์ด๋ฉ์ผ๋ก ์ ์ก๋ฉ๋๋ค. ์ด๋ฉ์ผ์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๋ ๋ฐฉ๋ฒ์ ์ค๋ช ํ๋ Apache Airflow ์ต์ ์ค ํ๋๋ฅผ ๋ถํดํ๋๋ก ์ด๋ํฉ๋๋ค.
์ ์ฌ ์๋
๋์ธ ์ปค๋ฎค๋์ผ์ด์ ์์ ํ์ฌ ๊ฐ ์ํธ ์์ฉ ํ์ค์ ์ด๋ฅด๊ธฐ๊น์ง ๋ง์ ๋ฐ์ดํฐ๊ฐ ์ฌ์ ํ ์ ์ ๋ฉ์ผ์ ํตํด ์ ์ก๋ฉ๋๋ค. ๋ฐ์ดํฐ๋ฅผ ์ป๊ธฐ ์ํ ์ธํฐํ์ด์ค๋ฅผ ์์ฑํ๊ฑฐ๋ ์ด ์ ๋ณด๋ฅผ ๋ ํธ๋ฆฌํ ์์ค์ ์ ๋ ฅํ ์ฌ๋๋ค์ ์ฌ๋ฌด์ค์ ๋ฐฐ์นํ ์ ์๋ค๋ฉด ์ข์ง๋ง ์ข ์ข ์ด๊ฒ์ ๋จ์ํ ๋ถ๊ฐ๋ฅํ ์๋ ์์ต๋๋ค. ๋ด๊ฐ ์ง๋ฉดํ ๊ตฌ์ฒด์ ์ธ ์์ ์ ์ ๋ช ๋์ CRM ์์คํ ์ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ์ฐ๊ฒฐํ ๋ค์ OLAP ์์คํ ์ ์ฐ๊ฒฐํ๋ ๊ฒ์ด์์ต๋๋ค. ์ญ์ฌ์ ์ผ๋ก ์ฐ๋ฆฌ ํ์ฌ์ ๊ฒฝ์ฐ ํน์ ๋น์ฆ๋์ค ์์ญ์์์ด ์์คํ ์ ์ฌ์ฉํ๋ ๊ฒ์ด ํธ๋ฆฌํ์ต๋๋ค. ๋ฐ๋ผ์ ๋ชจ๋๊ฐ ์ด ํ์ฌ ์์คํ ์ ๋ฐ์ดํฐ๋ก๋ ์๋ํ ์ ์๊ธฐ๋ฅผ ์ํ์ต๋๋ค. ๋ฌผ๋ก ๋จผ์ ๊ณต๊ฐ API์์ ๋ฐ์ดํฐ๋ฅผ ์ป์ ์ ์๋ ๊ฐ๋ฅ์ฑ์ ๋ํด ์ฐ๊ตฌํ์ต๋๋ค. ๋ถํํ๋ API๋ ํ์ํ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ์ป๋ ๊ฒ์ ๋ค๋ฃจ์ง ์์๊ณ , ๊ฐ๋จํ ๋งํด์ ์ฌ๋ฌ ๋ฉด์์ ๋น๋ค์ด์ ธ ์์๊ณ ๊ธฐ์ ์ง์์ ๋ ํฌ๊ด์ ์ธ ๊ธฐ๋ฅ์ ์ ๊ณตํ๊ธฐ๋ฅผ ์ํ์ง ์์๊ฑฐ๋ ๋ง๋ ์ ์์์ต๋๋ค. ๊ทธ๋ฌ๋์ด ์์คํ ์ ๋๋ฝ ๋ ๋ฐ์ดํฐ๋ฅผ ์์นด์ด๋ธ ์ธ๋ก๋ ๋งํฌ ํํ๋ก ๋ฉ์ผ๋ก ์ฃผ๊ธฐ์ ์ผ๋ก๋ฐ์ ์์๋ ๊ธฐํ๋ฅผ ์ ๊ณตํ์ต๋๋ค.
๋น์ฆ๋์ค๊ฐ ์ด๋ฉ์ผ์ด๋ ์ธ์คํดํธ ๋ฉ์ ์ ์์ ๋ฐ์ดํฐ๋ฅผ ์์งํ๊ธฐ๋ฅผ ์ํ๋ ๊ฒฝ์ฐ๋ ์ด๊ฒ์ด ์ ์ผํ ๊ฒฝ์ฐ๊ฐ ์๋๋ผ๋ ์ ์ ์ ์ํด์ผ ํฉ๋๋ค. ๊ทธ๋ฌ๋ ์ด ๊ฒฝ์ฐ ์ด๋ฌํ ๋ฐฉ์์ผ๋ก๋ง ๋ฐ์ดํฐ์ ์ผ๋ถ๋ฅผ ์ ๊ณตํ๋ ์ XNUMX์ ํ์ฌ์ ์ํฅ์ ๋ฏธ์น ์ ์์ต๋๋ค.
์ํ์น ์์ด ํ๋ก์ฐ
ETL ํ๋ก์ธ์ค๋ฅผ ๊ตฌ์ถํ๊ธฐ ์ํด Apache Airflow๋ฅผ ๊ฐ์ฅ ์์ฃผ ์ฌ์ฉํฉ๋๋ค. ์ด ๊ธฐ์ ์ ์ต์ํ์ง ์์ ๋ ์๊ฐ ์ํฉ๊ณผ ์ผ๋ฐ์ ์ผ๋ก ์ด๋ป๊ฒ ๋ณด์ด๋์ง ๋ ์ ์ดํดํ ์ ์๋๋ก ๋ช ๊ฐ์ง ์๊ฐ๋ฅผ ์ค๋ช ํ๊ฒ ์ต๋๋ค.
Apache Airflow๋ Python์์ ETL(Extract-Transform-Loading) ํ๋ก์ธ์ค๋ฅผ ๋น๋, ์คํ ๋ฐ ๋ชจ๋ํฐ๋งํ๋ ๋ฐ ์ฌ์ฉ๋๋ ๋ฌด๋ฃ ํ๋ซํผ์ ๋๋ค. Airflow์ ์ฃผ์ ๊ฐ๋ ์ ๊ทธ๋ํ์ ์ ์ ์ด ํน์ ํ๋ก์ธ์ค์ด๊ณ ๊ทธ๋ํ์ ๊ฐ์ฅ์๋ฆฌ๊ฐ ์ ์ด ๋๋ ์ ๋ณด์ ํ๋ฆ์ธ ๋ฐฉํฅ์ฑ ๋น์ํ ๊ทธ๋ํ์ ๋๋ค. ํ๋ก์ธ์ค๋ ๋จ์ํ Python ํจ์๋ฅผ ํธ์ถํ๊ฑฐ๋ ํด๋์ค ์ปจํ ์คํธ์์ ์ฌ๋ฌ ํจ์๋ฅผ ์์ฐจ์ ์ผ๋ก ํธ์ถํ๋ ๋ ๋ณต์กํ ๋ ผ๋ฆฌ๋ฅผ ๊ฐ์ง ์ ์์ต๋๋ค. ๊ฐ์ฅ ๋น๋ฒํ ์์ ์ ๊ฒฝ์ฐ ํ๋ก์ธ์ค๋ก ์ฌ์ฉํ ์ ์๋ ๊ธฐ์ฑ ๊ฐ๋ฐ์ด ์ด๋ฏธ ๋ง์ด ์์ต๋๋ค. ์ด๋ฌํ ๊ฐ๋ฐ์๋ ๋ค์์ด ํฌํจ๋ฉ๋๋ค.
- ์ฐ์ฐ์ - ์๋ฅผ ๋ค์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ ์ด๋ธ์์ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค๋ก ๋ฐ์ดํฐ๋ฅผ ํ ๊ณณ์์ ๋ค๋ฅธ ๊ณณ์ผ๋ก ์ ์กํฉ๋๋ค.
- ์ผ์ - ํน์ ์ด๋ฒคํธ์ ๋ฐ์์ ๊ธฐ๋ค๋ฆฌ๊ณ ์ ์ด ํ๋ฆ์ ๊ทธ๋ํ์ ํ์ ๊ผญ์ง์ ์ผ๋ก ๋ณด๋ ๋๋ค.
- ํํฌ - ์๋ฅผ ๋ค์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ ์ด๋ธ์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ธฐ ์ํ ํ์ ์์ค ์์ ์ฉ(๋ฌธ์์ ์ฌ์ฉ๋จ)
- ๊ธฐํ
์ด ๊ธฐ์ฌ์์ Apache Airflow๋ฅผ ์์ธํ ์ค๋ช
ํ๋ ๊ฒ์ ๋ถ์ ์ ํฉ๋๋ค. ๊ฐ๋ตํ ์๊ฐ๋ฅผ ๋ณผ ์ ์์ต๋๋ค.
๋ฐ์ดํฐ๋ฅผ ์ป๊ธฐ ์ํ ํํฌ
์ฐ์ , ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ ค๋ฉด ๋ค์๊ณผ ๊ฐ์ ์์ ์ ์ํํ ์ ์๋ ํํฌ๋ฅผ ์์ฑํด์ผ ํฉ๋๋ค.
- ์ด๋ฉ์ผ์ ์ฐ๊ฒฐ
- ์ฌ๋ฐ๋ฅธ ๋ฌธ์ ์ฐพ๊ธฐ
- ํธ์ง์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ต๋๋ค.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook ะดะปั ะฟะพะปััะตะฝะธั ะดะฐะฝะฝัั
ั ัะปะตะบััะพะฝะฝะพะน ะฟะพััั
:param imap_conn_id: ะะดะตะฝัะธัะธะบะฐัะพั ะฟะพะดะบะปััะตะฝะธั ะบ ะฟะพััะต
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ะะพะดะบะปััะฐะตะผัั ะบ ะฟะพััะต
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ะะตัะพะด ะดะปั ะฟะพะปััะตะฝะธั ะธะดะตะฝัะธัะธะบะฐัะพัะฐ ะฟะพัะปะตะดะฝะตะณะพ ะฟะธััะผะฐ,
ัะดะพะฒะปะตัะฒะพัะฐัััะตะณะพ ััะปะพะฒะธัะผ ะฟะพะธัะบะฐ
:param check_seen: ะัะผะตัะฐัั ะฟะพัะปะตะดะฝะตะต ะฟะธััะผะพ ะบะฐะบ ะฟัะพัะธัะฐะฝะฝะพะต
:type check_seen: bool
:param box: ะะฐะธะผะตะฝะพะฒะฐะฝะธั ััะธะบะฐ
:type box: string
:param condition: ะฃัะปะพะฒะธั ะฟะพะธัะบะฐ ะฟะธัะตะผ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("ะ ััะธะบะต ะฝะฐะนะดะตะฝั ัะปะตะดัััะธะต ะฟะธััะผะฐ: " + str(mail_ids))
if not mail_ids:
logging.info("ะะต ะฝะฐะนะดะตะฝะพ ะฝะพะฒัั
ะฟะธัะตะผ")
return None
mail_id = mail_ids[0]
# ะตัะปะธ ัะฐะบะธั
ะฟะธัะตะผ ะฝะตัะบะพะปัะบะพ
if len(mail_ids) > 1:
# ะพัะผะตัะฐะตะผ ะพััะฐะปัะฝัะต ะฟัะพัะธัะฐะฝะฝัะผะธ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# ะฒะพะทะฒัะฐัะฐะตะผ ะฟะพัะปะตะดะฝะตะต
mail_id = mail_ids[-1]
# ะฝัะถะฝะพ ะปะธ ะพัะผะตัะธัั ะฟะพัะปะตะดะฝะตะต ะฟัะพัะธัะฐะฝะฝัะผ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
๋ ผ๋ฆฌ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค. ์ฐ๊ฒฐํ๊ณ ๊ฐ์ฅ ๊ด๋ จ์ฑ์ด ๋์ ๋ง์ง๋ง ๋ฌธ์๋ฅผ ์ฐพ๊ณ ๋ค๋ฅธ ๋ฌธ์๊ฐ ์์ผ๋ฉด ๋ฌด์ํฉ๋๋ค. ์ด ๊ธฐ๋ฅ์ ์ดํ ๋ฌธ์๊ฐ ์ด์ ๋ฌธ์์ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ํฌํจํ๊ธฐ ๋๋ฌธ์ ์ฌ์ฉ๋ฉ๋๋ค. ๊ทธ๋ ์ง ์์ ๊ฒฝ์ฐ ๋ชจ๋ ๋ฌธ์์ ๋ฐฐ์ด์ ๋ฐํํ๊ฑฐ๋ ์ฒซ ๋ฒ์งธ ๋ฌธ์๋ฅผ ์ฒ๋ฆฌํ๊ณ ๋๋จธ์ง๋ ๋ค์ ํจ์ค์์ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค. ์ผ๋ฐ์ ์ผ๋ก ํญ์ ๊ทธ๋ ๋ฏ์ด ๋ชจ๋ ๊ฒ์ ์์ ์ ๋ฐ๋ผ ๋ค๋ฆ ๋๋ค.
ํํฌ์ ๋ ๊ฐ์ง ๋ณด์กฐ ๊ธฐ๋ฅ์ ์ถ๊ฐํฉ๋๋ค: ํ์ผ์ ๋ค์ด๋ก๋ํ๊ณ ์ด๋ฉ์ผ์ ๋งํฌ๋ฅผ ์ฌ์ฉํ์ฌ ํ์ผ์ ๋ค์ด๋ก๋ํ๋ ๊ฒ์ ๋๋ค. ๊ทธ๊ฑด ๊ทธ๋ ๊ณ , ๊ทธ๋ค์ ์ด์์์๊ฒ ์ด๋ํ ์ ์์ผ๋ฉฐ์ด ๊ธฐ๋ฅ์ ์ฌ์ฉํ๋ ๋น๋์ ๋ฐ๋ผ ๋ค๋ฆ ๋๋ค. ๋ค์ ํํฌ์ ์ถ๊ฐํ ํญ๋ชฉ์ ์์ ์ ๋ฐ๋ผ ๋ค๋ฆ ๋๋ค. ํ์ผ์ด ํธ์ง๋ก ์ฆ์ ์์ ๋๋ฉด ํธ์ง์ ์ฒจ๋ถ ํ์ผ์ ๋ค์ด๋ก๋ํ ์ ์๊ณ , ๋ฐ์ดํฐ๊ฐ ํธ์ง๋ก ์์ ๋๋ฉด ํธ์ง๋ฅผ ๊ตฌ๋ฌธ ๋ถ์ํด์ผ ํฉ๋๋ค. ๋ฑ. ๋ด ๊ฒฝ์ฐ ํธ์ง์๋ ์์นด์ด๋ธ์ ๋ํ ํ๋์ ๋งํฌ๊ฐ ํจ๊ป ์ ๊ณต๋๋ฉฐ ํน์ ์์น์ ๋ฃ๊ณ ์ถ๊ฐ ์ฒ๋ฆฌ ํ๋ก์ธ์ค๋ฅผ ์์ํด์ผ ํฉ๋๋ค.
def download_from_url(self, url, path, chunk_size=128):
"""
ะะตัะพะด ะดะปั ัะบะฐัะธะฒะฐะฝะธั ัะฐะนะปะฐ
:param url: ะะดัะตั ะทะฐะณััะทะบะธ
:type url: string
:param path: ะัะดะฐ ะฟะพะปะพะถะธัั ัะฐะนะป
:type path: string
:param chunk_size: ะะพ ัะบะพะปัะบะพ ะฑะฐะนัะพะฒ ะฟะธัะฐัั
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ะะตัะพะด ะดะปั ัะบะฐัะธะฒะฐะฝะธั ัะฐะนะปะฐ ะฟะพ ัััะปะบะต ะธะท ะฟะธััะผะฐ
:param mail_id: ะะดะตะฝัะธัะธะบะฐัะพั ะฟะธััะผะฐ
:type mail_id: string
:param path: ะัะดะฐ ะฟะพะปะพะถะธัั ัะฐะนะป
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
์ฝ๋๋ ๊ฐ๋จํ๋ฏ๋ก ์ถ๊ฐ ์ค๋ช ์ด ๊ฑฐ์ ํ์ํ์ง ์์ต๋๋ค. ๋งค์ง๋ผ์ธ imap_conn_id์ ๋ํด์๋ง ๋ง์๋๋ฆฌ๊ฒ ์ต๋๋ค. Apache Airflow๋ ๋ฌธ์์ด ์๋ณ์๋ก ์ก์ธ์คํ ์ ์๋ ์ฐ๊ฒฐ ๋งค๊ฐ๋ณ์(๋ก๊ทธ์ธ, ๋น๋ฐ๋ฒํธ, ์ฃผ์ ๋ฐ ๊ธฐํ ๋งค๊ฐ๋ณ์)๋ฅผ ์ ์ฅํฉ๋๋ค. ์๊ฐ์ ์ผ๋ก ์ฐ๊ฒฐ ๊ด๋ฆฌ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์ผ์
๋ฉ์ผ์์ ๋ฐ์ดํฐ๋ฅผ ์ฐ๊ฒฐํ๊ณ ์์ ํ๋ ๋ฐฉ๋ฒ์ ์ด๋ฏธ ์๊ณ ์์ผ๋ฏ๋ก ์ด์ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์ผ์๋ฅผ ์์ฑํ ์ ์์ต๋๋ค. ๋ด ๊ฒฝ์ฐ์๋ ๋ค๋ฅธ ์์ค(API, ์ ํ ํต์ )์์ ๊ด๋ จ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๋ ํ๋ก์ธ์ค๋ฅผ ํฌํจํ์ฌ ๋ฉ์ผ์์ ๋ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๋ค๋ฅธ ํ๋ก์ธ์ค๊ฐ ์๋ํ๊ธฐ ๋๋ฌธ์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ฐ์ฐ์๋ฅผ ์ฆ์ โโ์์ฑํ์ง ๋ชปํ์ต๋๋ค. , ์น ๋ฉํธ๋ฆญ ๋ฑ) ๋ฑ). ์๋ฅผ ๋ค์ด ๋ณด๊ฒ ์ต๋๋ค. CRM ์์คํ ์ ์๋ก์ด ์ฌ์ฉ์๊ฐ ๋ํ ๋ฌ์ผ๋ฉฐ ์ฌ์ ํ ๊ทธ์ UUID์ ๋ํด ์์ง ๋ชปํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ SIP ์ ํ ํต์ ์์ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๋ ค๊ณ ํ๋ฉด UUID์ ์ฐ๊ฒฐ๋ ์ ํ๋ฅผ ๋ฐ์ง๋ง ์ฌ๋ฐ๋ฅด๊ฒ ์ ์ฅํ๊ณ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ด๋ฌํ ๋ฌธ์ ์์ ๋ฐ์ดํฐ์ ์์กด์ฑ์ ์ผ๋์ ๋๋ ๊ฒ์ด ์ค์ํฉ๋๋ค. ํนํ ๋ฐ์ดํฐ ์์ค๊ฐ ์๋ก ๋ค๋ฅธ ๊ฒฝ์ฐ์๋ ๋์ฑ ๊ทธ๋ ์ต๋๋ค. ๋ฌผ๋ก ์ด๋ฌํ ์กฐ์น๋ ๋ฐ์ดํฐ ๋ฌด๊ฒฐ์ฑ์ ์ ์งํ๊ธฐ์๋ ๋ถ์ถฉ๋ถํ์ง๋ง ๊ฒฝ์ฐ์ ๋ฐ๋ผ ํ์ํฉ๋๋ค. ์, ์์์ ์ฐจ์งํ๊ธฐ ์ํด ์ ํด ์ํ๋ฅผ ์ ์งํ๋ ๊ฒ๋ ๋นํฉ๋ฆฌ์ ์ ๋๋ค.
๋ฐ๋ผ์ ์ผ์๋ ๋ฉ์ผ์ ์๋ก์ด ์ ๋ณด๊ฐ ์๋ ๊ฒฝ์ฐ ๊ทธ๋ํ์ ํ์ ์ ์ ์ ์์ํ๊ณ ์ด์ ์ ๋ณด๋ฅผ ๊ด๋ จ์ด ์๋ ๊ฒ์ผ๋ก ํ์ํฉ๋๋ค.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
์ฐ๋ฆฌ๋ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๊ณ ์ฌ์ฉํฉ๋๋ค
๋ฐ์ดํฐ๋ฅผ ์์ ํ๊ณ ์ฒ๋ฆฌํ๊ธฐ ์ํด ๋ณ๋์ ์ฐ์ฐ์๋ฅผ ์์ฑํ ์ ์์ผ๋ฉฐ ๊ธฐ์ฑํ์ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ง๊ธ๊น์ง ๋ ผ๋ฆฌ๋ ์ฌ์ํ๊ธฐ ๋๋ฌธ์ ์๋ฅผ ๋ค์ด ๋ฌธ์์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๋ ค๋ฉด ํ์ค PythonOperator๋ฅผ ์ ์ํฉ๋๋ค.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# ะกัะฐะฝะดะฐััะฝะพะต ะบะพะฝัะธะณััะธัะพะฒะฐะฝะธะต ะณัะฐัะฐ
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ะะฟัะตะดะตะปัะตะผ ัะตะฝัะพั
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# ะคัะฝะบัะธั ะดะปั ะฟะพะปััะตะฝะธั ะดะฐะฝะฝัั
ะธะท ะฟะธััะผะฐ
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ะะฟะธัะฐะฝะธะต ะพััะฐะปัะฝัั
ะฒะตััะธะฝ ะณัะฐัะฐ
...
# ะะฐะดะฐะตะผ ัะฒัะทั ะฝะฐ ะณัะฐัะต
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ะะฟะธัะฐะฝะธะต ะพััะฐะปัะฝัั
ะฟะพัะพะบะพะฒ ัะฟัะฐะฒะปะตะฝะธั
๊ทธ๊ฑด ๊ทธ๋ ๊ณ , ํ์ฌ ๋ฉ์ผ์ด mail.ru์๋ ์์ผ๋ฉด ์ ๋ชฉ, ๋ฐ์ ์ ๋ฑ์ผ๋ก ํธ์ง๋ฅผ ๊ฒ์ํ ์ ์์ต๋๋ค. 2016๋ ์ ๋์ ํ๊ฒ ๋ค๊ณ ์ฝ์ํ์ง๋ง ๋ง์์ด ๋ฐ๋ ๊ฒ ๊ฐ๋ค. ํ์ํ ๊ธ์์ ๋ํ ๋ณ๋์ ํด๋๋ฅผ ๋ง๋ค๊ณ ๋ฉ์ผ ์น ์ธํฐํ์ด์ค์์ ํ์ํ ๊ธ์์ ๋ํ ํํฐ๋ฅผ ์ค์ ํ์ฌ ์ด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ์ต๋๋ค. ๋ฐ๋ผ์ ๊ฒ์์ ํ์ํ ๋ฌธ์์ ์กฐ๊ฑด, ์ ๊ฒฝ์ฐ์๋ ๋จ์ํ (UNSEEN) ์ด ํด๋์ ๋ค์ด๊ฐ๋๋ค.
์์ฝํ๋ฉด ๋ค์๊ณผ ๊ฐ์ ์์๊ฐ ์์ต๋๋ค. ์กฐ๊ฑด์ ์ถฉ์กฑํ๋ ์ ๋ฌธ์๊ฐ ์๋์ง ํ์ธํ๊ณ ์๋ ๊ฒฝ์ฐ ๋ง์ง๋ง ๋ฌธ์์ ๋งํฌ๋ฅผ ์ฌ์ฉํ์ฌ ์์นด์ด๋ธ๋ฅผ ๋ค์ด๋ก๋ํฉ๋๋ค.
๋ง์ง๋ง ์ ์๋์๋ ์ด ์์นด์ด๋ธ๊ฐ ์์ถ ํด์ ๋๊ณ ์์นด์ด๋ธ์ ๋ฐ์ดํฐ๊ฐ ์ง์์ง๊ณ ์ฒ๋ฆฌ๋๋ฉฐ ๊ฒฐ๊ณผ์ ์ผ๋ก ๋ชจ๋ ๊ฒ์ด ETL ํ๋ก์ธ์ค์ ํ์ดํ๋ผ์ธ์ผ๋ก ์ด๋ํ๋ค๋ ๊ฒ์ด ์๋ต๋์ด ์์ง๋ง ์ด๊ฒ์ ์ด๋ฏธ ๋์ด์ฐ์ต๋๋ค. ๊ธฐ์ฌ์ ๋ฒ์. ํฅ๋ฏธ๋กญ๊ณ ์ ์ฉํ ๊ฒ์ผ๋ก ํ๋ช
๋๋ฉด ETL ์๋ฃจ์
๊ณผ Apache Airflow์ฉ ๋ถํ์ ๋ํด ๊ธฐ๊บผ์ด ๊ณ์ ์ค๋ช
ํ๊ฒ ์ต๋๋ค.
์ถ์ฒ : habr.com