เบเปเปเบงเปเบฒเปเบเบเปเบเปเบฅเบเบตเบเบฐเบเบฑเบเบเบฐเบเบฒเบซเบผเบฒเบเบเบฒเบเปเบ, เบงเบดเบเบตเบเบฒเบเบเบตเปเบฅเปเบฒเบชเบฐเปเบซเบกเบกเบฑเบเบเบฐเบเบดเบเบเบฒเบกเบเบฒเบเบเบฑเบเบเบฐเบเบฒ. เบเบตเปเบญเบฒเบเบเบฐเปเบเบฑเบเบเปเบญเบเบเบฒเบเบซเบฑเบเบเปเบฝเบเบเบตเปเบฅเบฝเบเบเปเบฒเบ, เบเบฑเบเปเบเบเบญเบเบกเบฐเบเบธเบ, เบเบงเบฒเบกเบเปเบญเบเบเบฒเบเบเปเบฒเบเปเบเบฑเบเปเบเปเบฅเบขเบต, เบซเบผเบทเบชเบดเปเบเบญเบทเปเบ. เปเบเบเบปเบเปเบเบเบเบฒเบเบเบธเบเปเบเปเบเบเปเปเบกเบนเบ, เปเบซเบผเปเบเบเปเปเบกเบนเบเปเบกเปเบเปเบเบตเบเปเบเบตเบเบซเบผเบฒเบเบเบตเปเบชเบธเบเปเบเบชเปเบงเบเบเบตเป. เบเปเปเบงเปเบฒเบเบงเบเปเบฎเบปเบฒเบเบฑเบเบขเบฒเบเบเปเบฒเบเบฑเบเบชเบดเปเบเบเบตเปเบซเบผเบฒเบเบเบฒเบเปเบ, เปเบเปเบกเบฒเบฎเบญเบเบเบฑเบเบเบธเบเบฑเบเบเปเปเบกเบนเบเบชเปเบงเบเบซเบเบถเปเบเปเบกเปเบเบเบทเบเบชเบปเปเบเปเบเปเบเบเบปเบงเบชเบปเปเบเบเปเปเบเบงเบฒเบกเปเบฅเบฐเบญเบตเปเบกเบง, เบเปเปเปเบเปเบเปเบฒเบงเปเบเบดเบเบฎเบนเบเปเบเบเปเบเบปเปเบฒเปเบเปเบซเบผเบฒเบ. เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเปเปเบเบทเปเบญเปเบเบตเบเบเปเบฒเบเปเบซเปเบเบญเบเบซเบเบถเปเบเปเบเบเบฒเบเปเบฅเบทเบญเบเบชเปเบฒเบฅเบฑเบ Apache Airflow, เบชเบฐเปเบเบเบงเบดเบเบตเบเบตเปเบเปเบฒเบเบชเบฒเบกเบฒเบเปเบญเบปเบฒเบเปเปเบกเบนเบเบเบฒเบเบญเบตเปเบกเบง.
เบเบฐเบงเบฑเบเบชเบฒเบ
เบเปเปเบกเบนเบเบเปเบฒเบเบงเบเบซเบฅเบฒเบเบเบฑเบเบเบทเบเปเบญเบเบเปเบฒเบเบเบฒเบเบญเบตเปเบกเบฅเป, เบเบฒเบเบเบฒเบเบชเบทเปเบชเบฒเบเบฅเบฐเบซเบงเปเบฒเบเบเบธเบเบเบปเบเบเบฑเบเบกเบฒเบเบเบฐเบเบฒเบเบเบฒเบเบเบปเบงเบเบฑเบเบฅเบฐเบซเบงเปเบฒเบเบเปเบฅเบดเบชเบฑเบ. เบกเบฑเบเปเบเบฑเบเบเบฒเบเบเบตเบเปเบฒเบกเบฑเบเปเบเบฑเบเปเบเปเบเปเบเบตเปเบเบฐเบเบฝเบเบเบฒเบเปเบเปเบเบญเบเปเบเบทเปเบญเปเบซเปเปเบเปเบเปเปเบกเบนเบเบซเบผเบทเปเบญเบปเบฒเบเบปเบเบขเบนเปเปเบเบซเปเบญเบเบเบฒเบเบเบตเปเบเบฐเปเบญเบปเบฒเบเปเปเบกเบนเบเบเบตเปเปเบเบปเปเบฒเปเบเปเบเปเบซเบผเปเบเบเบตเปเบชเบฐเบเบงเบเบเบงเปเบฒ, เปเบเปเปเบฅเบทเปเบญเบเปเบเบตเปเบญเบฒเบเบเบฐเบเปเปเปเบเบฑเบเปเบเปเบเป. เบงเบฝเบเบเบฒเบเบชเบฐเปเบเบฒเบฐเบเบตเปเบเปเบฒเบเบฐเปเบเบปเปเบฒเบเบฐเปเบเบตเบเปเบกเปเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบฅเบฐเบเบปเบ CRM เบเบตเปเบกเบตเบเบทเปเบชเบฝเบเบเบฑเบเบเบฑเบเบเปเปเบกเบนเบ, เปเบฅเบฐเบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเบฑเบเบฅเบฐเบเบปเบ OLAP. เบกเบฑเบเปเบเบตเบเบเบถเปเบเปเบเบเบฐเบซเบงเบฑเบเบชเบฒเบเบงเปเบฒเบชเปเบฒเบฅเบฑเบเบเปเบฅเบดเบชเบฑเบเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบฒเบเบเปเบฒเปเบเปเบฅเบฐเบเบปเบเบเบตเปเปเบกเปเบเบชเบฐเบเบงเบเปเบเบเบทเปเบเบเบตเปเบเบธเบฅเบฐเบเบดเบเปเบเบเบชเบฐเปเบเบฒเบฐ. เปเบเบฒเบฐเบชเบฐเบเบฑเปเบ, เบเบธเบเบเบปเบเบเปเปเบเปเบญเบเบเบฒเบเบชเบฒเบกเบฒเบเบเปเบฒเปเบเบตเบเบเบฒเบเบเบฑเบเบเปเปเบกเบนเบเบเบฒเบเบฅเบฐเบเบปเบเบเบฒเบเบชเปเบงเบเบเบตเบชเบฒเบกเบเบตเปเปเบเบฑเปเบเบเบฝเบงเบเบฑเบ. เบเปเบญเบเบญเบทเปเบ เปเบปเบ, เปเบเปเบเบญเบ, เบเบงเบฒเบกเปเบเบฑเบเปเบเปเบเปเบเบญเบเบเบฒเบเปเบเปเบฎเบฑเบเบเปเปเบกเบนเบเบเบฒเบ API เปเบเบตเบเปเบเปเบเบทเบเบชเบถเบเบชเบฒ. เปเบเปเบซเบเปเบฒเปเบชเบเบเบฒเบ, API เบเปเปเปเบเปเบเบงเบกเปเบญเบปเบฒเบเบฒเบเปเบเปเบฎเบฑเบเบเปเปเบกเบนเบเบเบตเปเบเปเบฒเปเบเบฑเบเบเบฑเบเบซเบกเบปเบ, เปเบฅเบฐ, เปเบเบเปเบฒเบชเบฑเบเบเบตเปเบเปเบฒเบเบเบฒเบ, เบกเบฑเบเบขเบนเปเปเบเบซเบผเบฒเบเบเบฒเบเบเบตเปเบซเบเบฒเบเบเบฒเบ, เปเบฅเบฐเบเบฒเบเบชเบฐเบซเบเบฑเบเบชเบฐเบซเบเบนเบเบเปเบฒเบเบงเบดเบเบฒเบเบฒเบเบเปเปเบเปเบญเบเบเบฒเบเบซเบผเบทเบเปเปเบชเบฒเบกเบฒเบเบเบญเบเบชเบฐเบซเบเบญเบเปเบเบดเปเบเบซเบเบถเปเบเปเบเบทเปเบญเปเบซเปเบกเบตเบซเบเปเบฒเบเบตเปเบเบตเปเบชเบปเบกเบเบนเบเปเบเบเบซเบผเบฒเบเบเบถเปเบ. เปเบเปเบฅเบฐเบเบปเบเบเบตเปเปเบซเปเปเบญเบเบฒเบเบเบตเปเบเบฐเปเบเปเบฎเบฑเบเบเปเปเบกเบนเบเบเบตเปเบเบฒเบเบซเบฒเบเปเบเปเบเบฑเบเปเบฅเบเบฐเบเบฒเบเปเบเบชเบฐเบเบตเปเบเบฎเบนเบเปเบเบเบเบญเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบชเปเบฒเบฅเบฑเบเบเบฒเบ unloading เบฎเบงเบเบฎเบงเบก.
เบกเบฑเบเบเบงเบเบเบฐเบชเบฑเบเปเบเบเบงเปเบฒเบเบตเปเบเปเปเปเบกเปเบเบเปเบฅเบฐเบเบตเบเบฝเบงเบเบตเปเบเบธเบฅเบฐเบเบดเบเบเปเบญเบเบเบฒเบเปเบเบฑเบเบเปเบฒเบเปเปเบกเบนเบเบเบฒเบเบญเบตเปเบกเบฅเปเบซเบผเบทเบเบนเปเบชเบปเปเบเบเปเปเบเบงเบฒเบกเบเบฑเบเบเบต. เบขเปเบฒเบเปเบเบเปเบเบฒเบก, เปเบเบเปเบฅเบฐเบเบตเบเบตเป, เบเบงเบเปเบฎเบปเบฒเบเปเปเบชเบฒเบกเบฒเบเบกเบตเบญเบดเบเบเบดเบเบปเบเบเปเปเบเปเบฅเบดเบชเบฑเบเบเบฒเบเบชเปเบงเบเบเบตเบชเบฒเบกเบเบตเปเบชเบฐเบซเบเบญเบเบชเปเบงเบเบซเบเบถเปเบเบเบญเบเบเปเปเบกเบนเบเบเบฝเบเปเบเปเปเบเบงเบดเบเบตเบเบฒเบเบเบตเป.
apache airflow
เปเบเบทเปเบญเบชเปเบฒเบเบเบฐเบเบงเบเบเบฒเบ ETL, เบเบงเบเปเบฎเบปเบฒเบชเปเบงเบเบซเบผเบฒเบเปเบกเปเบเปเบเป Apache Airflow. เปเบเบทเปเบญเปเบซเปเบเบนเปเบญเปเบฒเบเบเบตเปเบเปเปเบเบธเปเบเปเบเบตเบเบเบฑเบเปเบเบเปเบเปเบฅเบเบตเบเบตเปเปเบเบปเปเบฒเปเบเบเบตเบเบงเปเบฒเบงเปเบฒเบกเบฑเบเบกเบตเบฅเบฑเบเบชเบฐเบเบฐเปเบเบงเปเบเปเบเบชเบฐเบเบฒเบเบเบฒเบเปเบฅเบฐเปเบเบเบเบปเปเบงเปเบ, เบเปเบญเบเบเบฐเบญเบฐเบเบดเบเบฒเบเบชเบญเบเบชเบฒเบกเบเปเปเปเบเบฐเบเปเบฒ.
Apache Airflow เปเบเบฑเบเปเบเบฅเบฐเบเบฐเบเบญเบกเบเบฃเบตเบเบตเปเบเบทเบเบเปเบฒเปเบเปเปเบเบทเปเบญเบชเปเบฒเบ, เบเบฐเบเบดเบเบฑเบเปเบฅเบฐเบเบดเบเบเบฒเบกเบเบฐเบเบงเบเบเบฒเบ ETL (Extract-Transform-Loading) เปเบ Python. เปเบเบงเบเบงเบฒเบกเบเบดเบเบเบปเปเบเบเปเปเบ Airflow เปเบกเปเบเบเบฒเบ acyclic เบเบตเป, เบเปเบญเบเบเบตเปเบเบธเบเบเบญเบเปเบชเบฑเปเบเบชเบฐเปเบเบเปเบกเปเบเบเบฐเบเบงเบเบเบฒเบเบชเบฐเปเบเบฒเบฐ, เปเบฅเบฐเบเบญเบเบเบญเบเบเบฒเบเปเบกเปเบเบเบฒเบเปเบซเบผเปเบเบปเปเบฒเบเบญเบเบเบฒเบเบเบงเบเบเบธเบกเบซเบผเบทเบเปเปเบกเบนเบ. เบเบฐเบเบงเบเบเบฒเบเบชเบฒเบกเบฒเบเปเบญเบตเปเบเบเบฑเบเบเบฑเบ Python เปเบเป, เบซเบผเบทเบกเบฑเบเบชเบฒเบกเบฒเบเบกเบตเปเบซเบเบเบปเบเบเบตเปเบชเบฑเบเบชเบปเบเบซเบผเบฒเบเบเบฒเบเบเบฒเบเปเบญเบตเปเบเบซเบผเบฒเบเบซเบเปเบฒเบเบตเปเบเบฒเบกเบฅเปเบฒเบเบฑเบเปเบเบชเบฐเบเบฒเบเบเบฒเบเบเบญเบเบซเปเบญเบเบฎเบฝเบ. เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบฐเบเบดเบเบฑเบเบเบฒเบเปเบฅเบทเปเบญเบเปเบเบตเปเบชเบธเบ, เบกเบตเบเบฒเบเบเบฑเบเบเบฐเบเบฒเบเบตเปเบเบฝเบกเบเปเบญเบกเบซเบผเบฒเบเปเบฅเปเบงเบเบตเปเบชเบฒเบกเบฒเบเบเบทเบเบเปเบฒเปเบเปเปเบเบฑเบเบเบฐเบเบงเบเบเบฒเบ. เบเบฒเบเบเบฑเบเบเบฐเบเบฒเบเบฑเปเบเบเปเบฒเบงเบเบฐเบเบญเบเบกเบต:
- operators - เบชเปเบฒเบฅเบฑเบเบเบฒเบเปเบญเบเบเปเปเบกเบนเบเบเบฒเบเบเปเบญเบเบซเบเบถเปเบเปเบเบซเบฒเบเปเบญเบเบญเบทเปเบ, เบเบปเบงเบขเปเบฒเบ, เบเบฒเบเบเบฒเบเบฐเบฅเบฒเบเบเบฒเบเบเปเปเบกเบนเบเปเบเบซเบฒเบเบฑเบเบเปเปเบกเบนเบ;
- เปเบเบฑเบเปเบเบต - เบชเปเบฒเบฅเบฑเบเบเบฒเบเบฅเปเบเปเบฒเบเบฒเบเบเบฐเบเบปเบเบเบปเบงเบเบญเบเปเบซเบเบเบฒเบเบชเบฐเปเบเบฒเบฐเปเบเบซเบเบถเปเบเปเบฅเบฐเบเบตเปเบเปเบฒเบเบฒเบเปเบซเบผเปเบเบปเปเบฒเบเบญเบเบเบฒเบเบเบงเบเบเบธเบกเปเบเบชเบนเปเบเบธเบเปเบเปเปเบกเบฒเบเบญเบเบเบฒเบ;
- hooks - เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบฐเบเบดเบเบฑเบเบฅเบฐเบเบฑเบเบเปเปเบฒ, เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบปเบเบเบปเบงเบขเปเบฒเบ, เปเบเบทเปเบญเปเบซเปเปเบเปเบฎเบฑเบเบเปเปเบกเบนเบเบเบฒเบเบเบฒเบเบฐเบฅเบฒเบเบเบฒเบเบเปเปเบกเบนเบ (เปเบเปเปเบเบเปเบฒเบเบฐเปเบซเบผเบเบเบฒเบ);
- เปเบฅเบฐเบญเบทเปเบเป.
เบกเบฑเบเบเบฐเบเปเปเปเบซเบกเบฒเบฐเบชเบปเบกเบเบตเปเบเบฐเบญเบฐเบเบดเบเบฒเบ Apache Airflow เปเบเบฅเบฒเบเบฅเบฐเบญเบฝเบเปเบเบเบปเบเบเบงเบฒเบกเบเบตเป. เบเบฒเบเปเบเบฐเบเปเบฒเบชเบฑเปเบเปเบชเบฒเบกเบฒเบเปเบเบดเปเบเปเบเป
Hook เบชเปเบฒเบฅเบฑเบเบเบฒเบเบฎเบฑเบเบเปเปเบกเบนเบ
เบเปเบญเบเบญเบทเปเบ เปเบปเบ, เปเบเบทเปเบญเปเบเปเปเบเบเบฑเบเบซเบฒ, เบเบงเบเปเบฎเบปเบฒเบเปเบญเบเบเบฝเบ hook เบเบตเปเบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบ:
- เปเบเบทเปเบญเบกเบเปเปเบเบฑเบเบญเบตเปเบกเบง
- เบเบญเบเบซเบฒเบเบปเบงเบญเบฑเบเบชเบญเบเบเบตเปเบเบทเบเบเปเบญเบ
- เปเบเปโเบฎเบฑเบโเบเปเปโเบกเบนเบโเบเบฒเบโเบเบปเบโเบซเบกเบฒเบโเบชเบฐโเบเบฑเบโ.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook ะดะปั ะฟะพะปััะตะฝะธั ะดะฐะฝะฝัั
ั ัะปะตะบััะพะฝะฝะพะน ะฟะพััั
:param imap_conn_id: ะะดะตะฝัะธัะธะบะฐัะพั ะฟะพะดะบะปััะตะฝะธั ะบ ะฟะพััะต
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ะะพะดะบะปััะฐะตะผัั ะบ ะฟะพััะต
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ะะตัะพะด ะดะปั ะฟะพะปััะตะฝะธั ะธะดะตะฝัะธัะธะบะฐัะพัะฐ ะฟะพัะปะตะดะฝะตะณะพ ะฟะธััะผะฐ,
ัะดะพะฒะปะตัะฒะพัะฐัััะตะณะพ ััะปะพะฒะธัะผ ะฟะพะธัะบะฐ
:param check_seen: ะัะผะตัะฐัั ะฟะพัะปะตะดะฝะตะต ะฟะธััะผะพ ะบะฐะบ ะฟัะพัะธัะฐะฝะฝะพะต
:type check_seen: bool
:param box: ะะฐะธะผะตะฝะพะฒะฐะฝะธั ััะธะบะฐ
:type box: string
:param condition: ะฃัะปะพะฒะธั ะฟะพะธัะบะฐ ะฟะธัะตะผ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("ะ ััะธะบะต ะฝะฐะนะดะตะฝั ัะปะตะดัััะธะต ะฟะธััะผะฐ: " + str(mail_ids))
if not mail_ids:
logging.info("ะะต ะฝะฐะนะดะตะฝะพ ะฝะพะฒัั
ะฟะธัะตะผ")
return None
mail_id = mail_ids[0]
# ะตัะปะธ ัะฐะบะธั
ะฟะธัะตะผ ะฝะตัะบะพะปัะบะพ
if len(mail_ids) > 1:
# ะพัะผะตัะฐะตะผ ะพััะฐะปัะฝัะต ะฟัะพัะธัะฐะฝะฝัะผะธ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# ะฒะพะทะฒัะฐัะฐะตะผ ะฟะพัะปะตะดะฝะตะต
mail_id = mail_ids[-1]
# ะฝัะถะฝะพ ะปะธ ะพัะผะตัะธัั ะฟะพัะปะตะดะฝะตะต ะฟัะพัะธัะฐะฝะฝัะผ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
เปเบซเบเบเบปเบเปเบกเปเบเบเบตเป: เบเบงเบเปเบฎเบปเบฒเปเบเบทเปเบญเบกเบเปเป, เบเบญเบเบซเบฒเบเบปเบงเบญเบฑเบเบชเบญเบเบเบตเปเบเปเบฝเบงเบเปเบญเบเบเบตเปเบชเบธเบ, เบเปเบฒเบกเบตเบเบปเบเบญเบทเปเบ, เบเบงเบเปเบฎเบปเบฒเบเปเปเบชเบปเบเปเบเบเบงเบเบกเบฑเบ. เบเบฑเบเบเบฑเบเบเบตเปเบเบทเบเบเปเบฒเปเบเป, เปเบเบฒเบฐเบงเปเบฒเบเบปเบงเบญเบฑเบเบชเบญเบเบเปเปเบกเบฒเบกเบตเบเปเปเบกเบนเบเบเบฑเบเบซเบกเบปเบเบเบญเบเบเบปเบงเบซเบเบฑเบเบชเบทเบเปเบญเบเบซเบเปเบฒ. เบเปเบฒเบเบตเปเบเปเปเปเบกเปเบเบเปเบฅเบฐเบเบต, เบเปเบฒเบเบชเบฒเบกเบฒเบเบชเบปเปเบเบเบทเบ array เบเบญเบเบเบปเบงเบญเบฑเบเบชเบญเบเบเบฑเบเบซเบกเบปเบ, เบซเบผเบทเบเบธเบเปเบเปเบเบเบปเบงเบญเบฑเบเบชเบญเบเบเปเบฒเบญเบดเบ, เปเบฅเบฐเบชเปเบงเบเบเบตเปเปเบซเบผเบทเบญเปเบ pass เบเปเปเปเบ. เปเบเบเบเบปเปเบงเปเบ, เบเบธเบเบชเบดเปเบเบเบธเบเบขเปเบฒเบ, เบเบฒเบกเบชเบฐเปเบซเบกเบต, เปเบกเปเบเบเบถเปเบเบเบฑเบเบงเบฝเบเบเบฒเบ.
เบเบงเบเปเบฎเบปเบฒเปเบเบตเปเบกเบชเบญเบเบซเบเปเบฒเบเบตเปเบเปเบงเบเบชเปเบฒเบฅเบฑเบ hook: เบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบฒเบงเปเบซเบผเบเปเบเบฅเปเปเบฅเบฐเบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบฒเบงเปเบซเบผเบเปเบเบฅเปเปเบเบเปเบเปเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบฒเบเบญเบตเปเบกเบฅเป. เปเบเบเบงเบดเบเบตเบเบฒเบเบเบฒเบ, เบเบงเบเปเบเบปเบฒเบชเบฒเบกเบฒเบเบเบทเบเบเปเบฒเบเปเบเบเบฑเบเบเบนเปเบเบฐเบเบดเบเบฑเบเบเบฒเบ, เบกเบฑเบเบเบถเปเบเบเบฑเบเบเบงเบฒเบกเบเบตเปเบเบญเบเบเบฒเบเบเปเบฒเปเบเปเบซเบเปเบฒเบเบตเปเบเบตเป. เบชเบดเปเบเบญเบทเปเบเบเบตเปเบเบฐเปเบเบตเปเบกเปเบชเป hook, เบญเบตเบเปเบเบทเปเบญเบซเบเบถเปเบ, เปเบกเปเบเบเบถเปเบเบเบฑเบเบงเบฝเบเบเบฒเบ: เบเปเบฒเปเบเบฅเปเปเบเปเบฎเบฑเบเบเบฑเบเบเบตเปเบเบเบปเบเบซเบกเบฒเบ, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเปเบฒเบเบชเบฒเบกเบฒเบเบเบฒเบงเปเบซเบฅเบเปเบเบฅเปเปเบเบเบเบฑเบเบเบปเบเบซเบกเบฒเบ, เบเปเบฒเบเปเปเบกเบนเบเปเบเปเบฎเบฑเบเปเบเบเบปเบเบซเบกเบฒเบ, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเปเบฒเบเบเปเบฒเปเบเบฑเบเบเปเบญเบเปเบเบเบเบปเบเบซเบกเบฒเบ, เปเบฅเบฐเบญเบทเปเบเป เปเบเบเปเบฅเบฐเบเบตเบเบญเบเบเปเบญเบ, เบเบปเบเบซเบกเบฒเบเบกเบฒเบเปเบญเบกเบเบฑเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบซเบเบถเปเบเปเบเบซเบฒเบเปเบญเบเปเบเบฑเบเบกเปเบฝเบ, เปเบเบดเปเบเบเปเบญเบเบเปเบฒเปเบเบฑเบเบเปเบญเบเปเบชเปเบขเบนเปเปเบเบชเบฐเบเบฒเบเบเบตเปเบเบตเปเปเบเปเบเบญเบเปเบฅเบฐเปเบฅเบตเปเบกเบเบปเปเบเบเบฐเบเบงเบเบเบฒเบเบเบธเบเปเบเปเบเบเบทเปเบกเบญเบตเบ.
def download_from_url(self, url, path, chunk_size=128):
"""
ะะตัะพะด ะดะปั ัะบะฐัะธะฒะฐะฝะธั ัะฐะนะปะฐ
:param url: ะะดัะตั ะทะฐะณััะทะบะธ
:type url: string
:param path: ะัะดะฐ ะฟะพะปะพะถะธัั ัะฐะนะป
:type path: string
:param chunk_size: ะะพ ัะบะพะปัะบะพ ะฑะฐะนัะพะฒ ะฟะธัะฐัั
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ะะตัะพะด ะดะปั ัะบะฐัะธะฒะฐะฝะธั ัะฐะนะปะฐ ะฟะพ ัััะปะบะต ะธะท ะฟะธััะผะฐ
:param mail_id: ะะดะตะฝัะธัะธะบะฐัะพั ะฟะธััะผะฐ
:type mail_id: string
:param path: ะัะดะฐ ะฟะพะปะพะถะธัั ัะฐะนะป
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
เบฅเบฐเบซเบฑเบเปเบกเปเบเบเปเบฒเบเบเบฒเบ, เบชเบฐเบเบฑเปเบเบกเบฑเบเบเปเปเบเปเบญเบเบเปเบญเบเบเบฒเบเบเปเบฒเบญเบฐเบเบดเบเบฒเบเบเบทเปเบกเบญเบตเบ. เบเปเบฒเบเบฐเปเบเบปเปเบฒเบเบฝเบเปเบเปเบเบฐเบเบญเบเบเปเบฒเบเบเปเบฝเบงเบเบฑเบเปเบชเบฑเปเบ magic imap_conn_id. Apache Airflow เปเบเบฑเบเบฎเบฑเบเบชเบฒเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเป (เปเบเบปเปเบฒเบชเบนเปเบฅเบฐเบเบปเบ, เบฅเบฐเบซเบฑเบเบเปเบฒเบ, เบเบตเปเบขเบนเป, เปเบฅเบฐเบเบปเบงเบเปเบฒเบเบปเบเบเบฒเบเบญเบทเปเบเป) เบเบตเปเบชเบฒเบกเบฒเบเปเบเบปเปเบฒเปเบเบดเบเปเบเปเปเบเบเบเบปเบงเบฅเบฐเบเบธเบชเบฐเบเบฃเบดเบ. เบชเบฒเบเบเบฒ, เบเบฒเบเบเบฑเบเบเบฒเบเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเปเบเบดเปเบเบเบทเบงเปเบฒเบเบตเป
เปเบเบฑเบเปเบเบตเบฅเปเบเปเบฒเบเปเปเบกเบนเบ
เปเบเบทเปเบญเบเบเบฒเบเบเบงเบเปเบฎเบปเบฒเบฎเบนเปเบงเบดเบเบตเปเบเบทเปเบญเบกเบเปเป เปเบฅเบฐเบฎเบฑเบเบเปเปเบกเบนเบเบเบฒเบเปเบกเบฅเปเบฅเปเบง, เบเบญเบเบเบตเปเบเบงเบเปเบฎเบปเบฒเบชเบฒเบกเบฒเบเบเบฝเบเปเบเบฑเบเปเบเบตเปเบเบทเปเบญเบฅเปเบเปเบฒเบเบงเบเบกเบฑเบเปเบเป. เปเบเบเปเบฅเบฐเบเบตเบเบญเบเบเปเบญเบ, เบกเบฑเบเบเปเปเปเบเปเปเบฎเบฑเบเบงเบฝเบเปเบเบเบฒเบเบเบฝเบเบเบปเบงเบเบฐเบเบดเบเบฑเบเบเบฒเบเบเบฑเบเบเบตเบเบตเปเบเบฐเบเบฐเบกเบงเบเบเบปเบเบเปเปเบกเบนเบ, เบเปเบฒเบกเบต, เปเบเบฒเบฐเบงเปเบฒเบเบฐเบเบงเบเบเบฒเบเบญเบทเปเบเปเปเบฎเบฑเบเบงเบฝเบเปเบเบเบญเบตเบเปเบชเปเบเปเปเบกเบนเบเบเบตเปเปเบเปเบฎเบฑเบเบเบฒเบเปเบกเบฅ, เบฅเบงเบกเบเบฑเบเบเปเปเบกเบนเบเบเบตเปเปเบญเบปเบฒเบเปเปเบกเบนเบเบเบตเปเบเปเบฝเบงเบเปเบญเบเบเบฒเบเปเบซเบผเปเบเบญเบทเปเบเป (API, เปเบเบฅเบฐเบชเบฑเบ. , web metrics, etc.) เปเบฅเบฐเบญเบทเปเบเป). เบเปเบญเบเบเบฐเบเบปเบเบเบปเบงเบขเปเบฒเบเปเบซเปเปเบเบปเปเบฒ. เบเบนเปเปเบเปเปเบซเบกเปเปเบเปเบเบฒเบเบปเบเบขเบนเปเปเบเบฅเบฐเบเบปเบ CRM, เปเบฅเบฐเบเบงเบเปเบฎเบปเบฒเบเบฑเบเบเปเปเบฎเบนเปเบเปเบฝเบงเบเบฑเบ UUID เบเบญเบเบฅเบฒเบง. เบเบฒเบเบเบฑเปเบ, เปเบกเบทเปเบญเบเบฐเบเบฒเบเบฒเบกเบฎเบฑเบเบเปเปเบกเบนเบเบเบฒเบเปเบเบฅเบฐเบชเบฑเบ SIP, เบเบงเบเปเบฎเบปเบฒเบเบฐเบฎเบฑเบเบชเบฒเบเบเบตเปเปเบเบทเปเบญเบกเบเปเปเบเบฑเบ UUID เบเบญเบเบกเบฑเบ, เปเบเปเบเบงเบเปเบฎเบปเบฒเบเปเปเบชเบฒเบกเบฒเบเบเบฑเบเบเบถเบ เปเบฅเบฐเบเบณเปเบเปเปเบเปเบขเปเบฒเบเบเบทเบเบเปเบญเบ. เปเบเปเบฅเบทเปเบญเบเบเบฑเปเบเบเปเบฒเบง, เบกเบฑเบเปเบเบฑเบเบชเบดเปเบเบชเปเบฒเบเบฑเบเบเบตเปเบเบฐเบฎเบฑเบเบชเบฒเบเบงเบฒเบกเปเบเบดเปเบเบเปเปเบเบเบญเบเบเปเปเบกเบนเบ, เปเบเบเบชเบฐเปเบเบฒเบฐเบเปเบฒเบเบงเบเปเบเบปเบฒเบกเบฒเบเบฒเบเปเบซเบผเปเบเบเปเบฒเบเป. เปเบซเบผเบปเปเบฒเบเบตเปเปเบกเปเบ, เปเบเปเบเบญเบ, เบกเบฒเบเบเบฐเบเบฒเบเบเปเปเบเบฝเบเบเปเปเบเบทเปเบญเบฎเบฑเบเบชเบฒเบเบงเบฒเบกเบชเบปเบกเบเบนเบเบเบญเบเบเปเปเบกเบนเบ, เปเบเปเปเบเบเบฒเบเบเปเบฅเบฐเบเบตเบกเบฑเบเปเบเบฑเบเบชเบดเปเบเบเปเบฒเปเบเบฑเบ. เปเบกเปเบเปเบฅเปเบง, เปเบฅเบฐ idling เปเบเบทเปเบญเบเบญเบเบเบญเบเบเบฑเบเบเบฐเบเบฒเบเบญเบเปเบกเปเบ irrational.
เบเบฑเปเบเบเบฑเปเบ, เปเบเบฑเบเปเบเบตเบเบญเบเบเบงเบเปเบฎเบปเบฒเบเบฐเปเบเบตเบเบเบปเบงเบเบฒเบกเปเบเบงเบเบฑเปเบเบเปเปเบกเบฒเบเบญเบเบเบฒเบเบเปเบฒเบกเบตเบเปเปเบกเบนเบเบชเบปเบเบขเบนเปเปเบเปเบกเบฅ, เปเบฅเบฐเบเบฑเบเบซเบกเบฒเบเบเปเปเบกเบนเบเบเบตเปเบเปเบฒเบเบกเบฒเบงเปเบฒเบเปเปเบเปเบฝเบงเบเปเบญเบ.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
เบเบงเบเปเบฎเบปเบฒเบฎเบฑเบ เปเบฅเบฐเบเบณเปเบเปเบเปเปเบกเบนเบ
เปเบเบทเปเบญเบฎเบฑเบเปเบฅเบฐเบเบฐเบกเบงเบเบเบปเบเบเปเปเบกเบนเบ, เบเปเบฒเบเบชเบฒเบกเบฒเบเบเบฝเบเบเบปเบงเบเบฐเบเบดเบเบฑเบเบเบฒเบเปเบเบเบเปเบฒเบเบซเบฒเบ, เบเปเบฒเบเบชเบฒเบกเบฒเบเบเปเบฒเปเบเปเปเบเบทเปเบญเบเบเบตเปเบเบฝเบกเบเปเบญเบก. เบเบฑเบเบเบฑเปเบเปเบเปเปเบเบดเบเบเบญเบเบเบฑเปเบเปเบซเบเบเบปเบเปเบกเปเบเปเบเบฑเบเปเบฅเบทเปเบญเบเปเบฅเบฑเบเปเบเปเบญเบเป - เปเบเบทเปเบญเปเบญเบปเบฒเบเปเปเบกเบนเบเบเบฒเบเบเบปเบเบซเบกเบฒเบ, เบเบปเบงเบขเปเบฒเบ, เบเปเบญเบเปเบเบฐเบเปเบฒ PythonOperator เบกเบฒเบเบเบฐเบเบฒเบ.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# ะกัะฐะฝะดะฐััะฝะพะต ะบะพะฝัะธะณััะธัะพะฒะฐะฝะธะต ะณัะฐัะฐ
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ะะฟัะตะดะตะปัะตะผ ัะตะฝัะพั
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# ะคัะฝะบัะธั ะดะปั ะฟะพะปััะตะฝะธั ะดะฐะฝะฝัั
ะธะท ะฟะธััะผะฐ
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ะะฟะธัะฐะฝะธะต ะพััะฐะปัะฝัั
ะฒะตััะธะฝ ะณัะฐัะฐ
...
# ะะฐะดะฐะตะผ ัะฒัะทั ะฝะฐ ะณัะฐัะต
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ะะฟะธัะฐะฝะธะต ะพััะฐะปัะฝัั
ะฟะพัะพะบะพะฒ ัะฟัะฐะฒะปะตะฝะธั
เปเบเบเบงเบดเบเบตเบเบฒเบเบเบฒเบ, เบเปเบฒเบเบปเบเบซเบกเบฒเบเบเบญเบเบเปเบฅเบดเบชเบฑเบเบเบญเบเบเปเบฒเบเบขเบนเปเปเบ mail.ru, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเปเบฒเบเบเบฐเบเปเปเบชเบฒเบกเบฒเบเบเบญเบเบซเบฒเบเบปเบเบซเบกเบฒเบเปเบเบเบซเบปเบงเบเปเป, เบเบนเปเบชเบปเปเบ, เปเบฅเบฐเบญเบทเปเบเป. เบเบฑเบเปเบเปเบเบเบต 2016, เบเบงเบเปเบเบปเบฒเปเบเบปเปเบฒเบชเบฑเบเบเบฒเบงเปเบฒเบเบฐเปเบเบฐเบเปเบฒเบกเบฑเบ, เปเบเปเบเบฒเบเบปเบเบเบทเปเบเบงเปเบฒเบกเบตเบเบฒเบเบเปเบฝเบเปเบเบเบเบดเบเปเบเบเบญเบเปเบเบปเบฒเปเบเบปเปเบฒ. เบเปเบฒเบเบฐเปเบเบปเปเบฒเปเบเปเปเบเปเปเบเบเบฑเบเบซเบฒเบเบตเปเปเบเบเบเบฒเบเบชเปเบฒเบเปเบเปเบเบตเปเบเบเบเปเบฒเบเบซเบฒเบเบชเปเบฒเบฅเบฑเบเบเบปเบงเบญเบฑเบเบชเบญเบเบเบตเปเบเปเบฒเปเบเบฑเบเปเบฅเบฐเบเบฒเบเบเบฑเปเบเบเปเบฒเบเบปเบงเบเบญเบเบชเปเบฒเบฅเบฑเบเบเบปเบงเบญเบฑเบเบชเบญเบเบเบตเปเบเปเบฒเปเบเบฑเบเปเบเบเบฒเบเปเบเปเบเบญเบเปเบงเบฑเบเปเบกเบฅ. เบเบฑเปเบเบเบฑเปเบ, เบเบฝเบเปเบเปเบเบปเบงเบญเบฑเบเบชเบญเบเปเบฅเบฐเปเบเบทเปเบญเบเปเบเบเบตเปเบเปเบฒเปเบเบฑเบเบชเปเบฒเบฅเบฑเบเบเบฒเบเบเบปเปเบเบซเบฒ, เปเบเบเปเบฅเบฐเบเบตเบเบญเบเบเปเบญเบ, เบเบฝเบเปเบเป (UNSEEN) เปเบเบปเปเบฒเปเบเปเบเปเบเบเปเบเบตเบเบตเป.
เบชเบฐเบซเบผเบธเบ, เบเบงเบเปเบฎเบปเบฒเบกเบตเบฅเปเบฒเบเบฑเบเบเปเปเปเบเบเบตเป: เบเบงเบเปเบฎเบปเบฒเบเบงเบเปเบเบดเปเบเบงเปเบฒเบกเบตเบเบปเบงเบญเบฑเบเบชเบญเบเปเบซเบกเปเบเบตเปเบเบปเบเบเบฑเบเปเบเบทเปเบญเบเปเบ, เบเปเบฒเบงเปเบฒเบกเบต, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเบงเบเปเบฎเบปเบฒเบเบฒเบงเปเบซเบฅเบเบเปเบญเบเปเบเบฑเบเบกเปเบฝเบเปเบเบเปเบเปเบเบฒเบเปเบเบทเปเบญเบกเบเปเปเบเบฒเบเบเบปเบเบซเบกเบฒเบเบชเบฐเบเบฑเบเบชเบธเบเบเปเบฒเบ.
เบเบฒเบเปเบเปเบเบธเบเบชเบธเบเบเปเบฒเบ, เบกเบฑเบเบเบทเบเบเบปเบเปเบงเบฑเปเบเบงเปเบฒเปเบเปเบกเบเบฑเบเปเบเบฑเบเบเบตเปเบเบฐเบเบทเบเบเบญเบเบญเบญเบ, เบเปเปเบกเบนเบเบเบฒเบเบเปเบญเบเปเบเบฑเบเบกเปเบฝเบเบเบฐเบเบทเบเบฅเบถเบเบฅเปเบฒเบเปเบฅเบฐเบเปเบฒเปเบเบตเบเบเบฒเบ, เปเบฅเบฐเบเบฑเปเบเบเบฑเปเบ, เบชเบดเปเบเบเบฑเบเบซเบกเบปเบเบเบฐเปเบเบเบทเปเบกเบญเบตเบเปเบเบเปเปเบเบญเบเบเบฐเบเบงเบเบเบฒเบ ETL, เปเบเปเบเบตเปเปเบกเปเบเปเบเบตเบเบเบงเปเบฒเบเบตเป. เบเบญเบเปเบเบเบเบญเบเบเบปเบเบเบงเบฒเบก. เบเปเบฒเบกเบฑเบเบญเบญเบเบกเบฒเบเบตเปเบซเบเปเบฒเบชเบปเบเปเบเปเบฅเบฐเปเบเบฑเบเบเบฐเปเบซเบเบ, เบซเบผเบฑเบเบเบฒเบเบเบฑเปเบเบเปเบญเบเบเบดเบเบเบตเบเบตเปเบเบฐเบญเบฐเบเบดเบเบฒเบเบงเบดเบเบตเปเบเปเปเบ ETL เปเบฅเบฐเบชเปเบงเบเบเบญเบเบเบงเบเปเบเบปเบฒเบชเปเบฒเบฅเบฑเบ Apache Airflow.
เปเบซเบผเปเบเบเปเปเบกเบนเบ: www.habr.com