แ แแช แแ แฃแแแ แแแแแแแแ แแแก แขแแฅแแแแแแแ, แแแซแแแแแแฃแแ แแแแแแแแแแก แกแแ แแ แงแแแแแแแแก แฉแแแแ แฉแแแ แแแแแแแแ แแแแก แฃแแแ. แแก แจแแแซแแแแ แแแแแฌแแแฃแแ แแงแแก แแแฃแแ แแแแแกแแแแ, แแแแแแแแฃแ แ แคแแฅแขแแ แแแแ, แขแแฅแแแแแแแฃแ แ แกแแญแแ แแแแแแแ แแ แกแฎแแ แ แแแแ. แแแแแชแแแแ แแแแฃแจแแแแแแก แกแคแแ แแจแ, แแ แแแฌแแแจแ แงแแแแแแ แแแแแแแแแแแแ แแแแแชแแแแ แฌแงแแ แแแแ. แ แแช แแ แฃแแแ แแแชแแแแแแแแ แแแแก แแแแแแแ แแชแแแแแแแ, แแแแ แแ แฏแแ แฏแแ แแแแ แแแแแชแแแแแแก แแแฌแแแ แแแแแแแแแ แแงแแกแแแ แแแกแแแฏแแ แแแกแ แแ แแ.แฌแแ แแแแแจแ, แ แแ แแฆแแ แแคแแ แ แแแฅแแแ แฃแคแ แ แแ แฅแแฃแ แคแแ แแแขแแแจแ. แแ แแแแแขแแแแแแ แแแจแแแแ Apache Airflow-แแก แแ แ-แแ แแ แแแ แแแแขแ, แ แแแแแแช แแกแแฎแแแก, โโแแฃ แ แแแแ แจแแแแซแแแแ แแแแฆแแ แแแแแชแแแแแ แแแคแแกแขแแแแ.
แฌแแแแแกแขแแ แแ
แแแแ แ แแแแแชแแแ แแแแแ แแแแแแแก แแแแฅแขแ แแแฃแแ แคแแกแขแแ, แแแขแแ แแแ แกแแแแแฃแ แ แแแแฃแแแแแชแแแแแแแ แแแแแแแแแแก แจแแ แแก แฃแ แแแแ แแฅแแแแแแแก แกแขแแแแแ แขแแแแแแ. แแแ แแแ, แแฃ แจแแกแแซแแแแแแแ แแแแแชแแแแ แแแกแแแแแแแแแ แแแขแแ แคแแแกแแก แแแฌแแ แ แแ แแคแแกแจแ แแแแแแแแแแแก แแแแแแแกแแแ, แ แแแแแแแช แแ แแแคแแ แแแชแแแก แฃแคแ แ แแแกแแฎแแ แฎแแแแ แฌแงแแ แแแแจแ แจแแแขแแแแ, แแแแ แแ แฎแจแแ แแ แแก แจแแแซแแแแ แฃแแ แแแแ แจแแฃแซแแแแแแ แแงแแก. แแแแแ แแขแฃแแ แแแแชแแแ, แ แแแแแแช แแ แฌแแแแฌแงแแ, แแงแ แชแแแแแแ CRM แกแแกแขแแแแก แแแแแแจแแ แแแ แแแแแชแแแแ แกแแฌแงแแแแแ, แจแแแแแ แแ OLAP แกแแกแขแแแแกแแแ. แแกแ แแแฎแแ แแกแขแแ แแฃแแแ, แ แแ แฉแแแแ แแแแแแแแแกแแแแก แแ แกแแกแขแแแแก แแแแแงแแแแแ แแแกแแฎแแ แฎแแแแแ แแงแ แแแแแแกแแก แแแแแ แแขแฃแ แกแคแแ แแจแ. แแแแขแแ, แงแแแแแก แซแแแแแ แกแฃแ แแ แแ แแแกแแแ แแฎแแ แแก แกแแกแขแแแแก แแแแแชแแแแแแ แแฃแจแแแแแช. แฃแแแ แแแแแก แงแแแแแกแ, แ แ แแฅแแ แฃแแแ, แจแแกแฌแแแแแแ แแฅแแ แฆแแ API-แแแ แแแแแชแแแแแแก แแแแแแแแแก แจแแกแแซแแแแแแแ. แกแแแฌแฃแฎแแ แแ, API แแ แแแแชแแแแ แงแแแแ แกแแญแแ แ แแแแแชแแแแก แแแฆแแแแก แแ, แแแ แขแแแ แกแแขแงแแแแแ แ แแ แแแฅแแแ, แแก แแ แแแแแ แแแแแกแแแ แแกแแ แแชแแแ แ แแงแ แแ แขแแฅแแแแฃแ แ แแฎแแ แแแญแแ แ แแ แกแฃแ แแ แแ แแแ แฎแแแแแแแ แจแฃแ แแแแแ แฃแคแ แ แกแ แฃแแงแแคแแแ แคแฃแแฅแชแแแแแ แแแแก แฃแแ แฃแแแแแกแแงแแคแแ. แแแแ แแ แแก แกแแกแขแแแ แกแแจแฃแแแแแแก แแซแแแแแ แแแ แแแแฃแแแ แแแแฆแ แแแแแ แแฃแแ แแแแแชแแแแแ แคแแกแขแแ แแ แฅแแแแก แแแแแแขแแแ แแแแก แแแฃแแแก แกแแฎแแ.
แฃแแแ แแฆแแแแจแแแก, แ แแ แแก แแ แแงแ แแ แแแแแ แแ แจแแแแฎแแแแ, แ แแแแกแแช แแแแแแกแก แกแฃแ แแ แแแแแชแแแแแแก แจแแแ แแแแแ แแ.แฌแแ แแแแแแแแ แแ แแงแแกแแแ แ แแแกแแแฏแแ แแแแแแ. แแฃแแชแ, แแ แจแแแแฎแแแแแจแ, แฉแแแ แแแ แแแแแฎแแ แฎแแ แแแแแแแ แแแกแแแ แแฎแแ แแก แแแแแแแแแแ, แ แแแแแแช แแฎแแแแ แแ แแแแ แแฌแแแแก แแแแแชแแแแ แแแฌแแแก.
แแแแฉแแก แฐแแแ แแก แแแแแแ
ETL แแ แแชแแกแแแแก แจแแกแแฅแแแแแแ, แฉแแแ แงแแแแแแ แฎแจแแ แแ แแแงแแแแแ Apache Airflow-แก. แแแแกแแแแแก, แ แแ แแแแแฎแแแแแ, แ แแแแแแช แแ แแชแแแแก แแ แขแแฅแแแแแแแแก, แฃแแแ แแแแแแก, แ แแแแ แแแแแแงแฃแ แแแ แแก แแแแขแแฅแกแขแจแ แแ แแแแแแแ, แแ แแฆแแฌแแ แ แแแแแแแแ แจแแกแแแแแก.
Apache Airflow แแ แแก แฃแคแแกแ แแแแขแคแแ แแ, แ แแแแแแช แแแแแแงแแแแแ Python-แจแ ETL (Extract-Transform-Loading) แแ แแชแแกแแแแก แจแแกแแฅแแแแแแ, แจแแกแแกแ แฃแแแแแแ แแ แแแแแขแแ แแแแแกแแแแก. Airflow-แแก แแแแแแ แ แแแแชแแคแชแแ แแ แแก แแแแแ แแฃแแ แแชแแแแฃแ แ แแ แแคแแแ, แกแแแแช แแ แแคแแก แฌแแแ แแแแ แแ แแก แกแแแชแแคแแแฃแ แ แแ แแชแแกแแแ, แฎแแแ แแ แแคแแก แแแแแแแ แแ แแก แแแแขแ แแแแก แแ แแแคแแ แแแชแแแก แแแแแแ. แแ แแชแแกแก แจแแฃแซแแแ แฃแแ แแแแ แแแแแแซแแฎแแก แแแแแแแก แแแแแกแแแแ แ แคแฃแแฅแชแแ, แแ แจแแแซแแแแ แฐแฅแแแแแก แฃแคแ แ แ แแฃแแ แแแแแแ แแแแกแแก แแแแขแแฅแกแขแจแ แ แแแแแแแแ แคแฃแแฅแชแแแก แแแแแแแแแแ แฃแแ แแแแแซแแฎแแแแกแแแ. แงแแแแแแ แฎแจแแ แ แแแแ แแชแแแแแกแแแแก, แฃแแแ แแ แกแแแแแก แแ แแแแแ แแแ แแแแแแแแ แแแ, แ แแแแแแช แจแแแซแแแแ แแแแแงแแแแแฃแ แแฅแแแก แ แแแแ แช แแ แแชแแกแแแ. แแกแแแ แแแแแแแแแ แแแแชแแแก:
- แแแแ แแขแแ แแแ - แแแแแชแแแแแแก แแ แแ แแแแแแแแแ แแแแ แแแ แแแแแกแแขแแแแ, แแแแแแแแแ, แแแแแชแแแแ แแแแแก แชแฎแ แแแแแแ แแแแแชแแแแ แกแแฌแงแแแจแ;
- แกแแแกแแ แแแ - แแแ แแแแฃแแ แแแแแแแแก แแแแแแแแจแ แแ แแแแขแ แแแแก แแแแแแแก แแแแแ แแฃแแแแ แแ แแคแแก แจแแแแแแ แฌแแแ แแแแแ;
- hooks - แฅแแแแ แแแแแก แแแแ แแชแแแแแกแแแแก, แแแแแแแแแ, แแแแแชแแแแ แแแแแก แชแฎแ แแแแแแ แแแแแชแแแแแแก แแแกแแฆแแแแ (แแแแแแงแแแแแ แแแแชแฎแแแแแแแจแ);
- แ.แจ.
แแ แกแขแแขแแแจแ Apache Airflow แแแขแแแฃแ แแ แแฆแฌแแ แ แแ แแฅแแแแ แแแแแแจแแฌแแแแแ. แแแแแ แจแแกแแแแแ แจแแแแซแแแแ แแฎแแแแ
Hook แแแแแชแแแแแแก แแแกแแฆแแแแ
แฃแแแ แแแแแก แงแแแแแกแ, แแ แแแแแแแก แแแแแกแแญแ แแแแ, แฉแแแ แฃแแแ แแแแฌแแ แแ แแแแแแ, แ แแแแแแแช แจแแแแแซแแแ:
- แแแฃแแแแจแแ แแแ แแ.แฌแแ แแแก
- แแแแแแแ แกแฌแแ แ แแกแ
- แแแแฆแแ แแแแแชแแแแแ แฌแแ แแแแแแ.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook ะดะปั ะฟะพะปััะตะฝะธั ะดะฐะฝะฝัั
ั ัะปะตะบััะพะฝะฝะพะน ะฟะพััั
:param imap_conn_id: ะะดะตะฝัะธัะธะบะฐัะพั ะฟะพะดะบะปััะตะฝะธั ะบ ะฟะพััะต
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ะะพะดะบะปััะฐะตะผัั ะบ ะฟะพััะต
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ะะตัะพะด ะดะปั ะฟะพะปััะตะฝะธั ะธะดะตะฝัะธัะธะบะฐัะพัะฐ ะฟะพัะปะตะดะฝะตะณะพ ะฟะธััะผะฐ,
ัะดะพะฒะปะตัะฒะพัะฐัััะตะณะพ ััะปะพะฒะธัะผ ะฟะพะธัะบะฐ
:param check_seen: ะัะผะตัะฐัั ะฟะพัะปะตะดะฝะตะต ะฟะธััะผะพ ะบะฐะบ ะฟัะพัะธัะฐะฝะฝะพะต
:type check_seen: bool
:param box: ะะฐะธะผะตะฝะพะฒะฐะฝะธั ััะธะบะฐ
:type box: string
:param condition: ะฃัะปะพะฒะธั ะฟะพะธัะบะฐ ะฟะธัะตะผ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("ะ ััะธะบะต ะฝะฐะนะดะตะฝั ัะปะตะดัััะธะต ะฟะธััะผะฐ: " + str(mail_ids))
if not mail_ids:
logging.info("ะะต ะฝะฐะนะดะตะฝะพ ะฝะพะฒัั
ะฟะธัะตะผ")
return None
mail_id = mail_ids[0]
# ะตัะปะธ ัะฐะบะธั
ะฟะธัะตะผ ะฝะตัะบะพะปัะบะพ
if len(mail_ids) > 1:
# ะพัะผะตัะฐะตะผ ะพััะฐะปัะฝัะต ะฟัะพัะธัะฐะฝะฝัะผะธ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# ะฒะพะทะฒัะฐัะฐะตะผ ะฟะพัะปะตะดะฝะตะต
mail_id = mail_ids[-1]
# ะฝัะถะฝะพ ะปะธ ะพัะผะตัะธัั ะฟะพัะปะตะดะฝะตะต ะฟัะพัะธัะฐะฝะฝัะผ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
แแแแแแ แแกแแแแ: แฉแแแ แแฃแแแแจแแ แแแแแ, แแแแฃแแแแ แแแแ แงแแแแแแ แแฅแขแฃแแแฃแ แแกแแก, แแฃ แแ แแก แกแฎแแ, แแแแแแแ แแแ แแแ. แแก แคแฃแแฅแชแแ แแแแแแงแแแแแ, แ แแแแแ แจแแแแแแแ แแกแแแแ แจแแแชแแแก แแแ แแแแแ แงแแแแ แแแแแชแแแก. แแฃ แแก แแกแ แแ แแ แแก, แแแจแแ แจแแแแซแแแแ แแแแแ แฃแแแ แงแแแแ แแกแแก แแแกแแแ, แแ แแแแแฃแจแแแแ แแแ แแแแ, แฎแแแ แแแแแ แฉแแแ แจแแแแแ แฃแฆแแแขแแฎแแแแ. แแแแแแแ, แงแแแแแคแแ แ, แ แแแแ แช แงแแแแแแแแก, แแแแชแแแแแแ แแแแแแแแแแฃแแ.
แฉแแแ แแแแแแแขแแแ แแ แแแแฎแแแ แ แคแฃแแฅแชแแแก: แคแแแแแก แฉแแแแกแแขแแแ แแแ แแ แคแแแแแก แแแแแแกแแฌแแ แแ แแแคแแกแขแแก แแแฃแแแก แแแแแงแแแแแแ. แกแฎแแแแ แจแแ แแก, แแแแ แแแแแขแแแ แจแแกแแซแแแแแแแ แแแแ แแขแแ แแ, แแก แแแแแแแแแแฃแแแ แแ แคแฃแแฅแชแแแก แแแแแงแแแแแแก แกแแฎแจแแ แแแ. แแแแแ แ แ แฃแแแ แแแแแแขแแ แแแฃแญแก, แแแแแ แแแแแแแแแแฃแแแ แแแแแแแแแแ: แแฃ แคแแแแแแ แแแแฆแแแ แแแฃแงแแแแแแแแ แฌแแ แแแจแ, แแแจแแ แจแแแแซแแแแ แฉแแแแขแแแ แแแ แฌแแ แแแแ แแแแแ แแแแ, แแฃ แแแแแชแแแแแ แแแแฆแแแ แฌแแ แแแจแ, แแแจแแ แแฅแแแ แฃแแแ แแแแแแแแแแแ แแกแ, แแ แ.แจ. แฉแแแก แจแแแแฎแแแแแจแ, แฌแแ แแแก แแแงแแแแ แแ แฅแแแแก แแ แแ แแแฃแแ, แ แแแแแแช แฃแแแ แแแแแงแแแ แแแ แแแแฃแ แแแแแแแก แแ แแแแแฌแงแ แจแแแแแแแ แแแแฃแจแแแแแแก แแ แแชแแกแ.
def download_from_url(self, url, path, chunk_size=128):
"""
ะะตัะพะด ะดะปั ัะบะฐัะธะฒะฐะฝะธั ัะฐะนะปะฐ
:param url: ะะดัะตั ะทะฐะณััะทะบะธ
:type url: string
:param path: ะัะดะฐ ะฟะพะปะพะถะธัั ัะฐะนะป
:type path: string
:param chunk_size: ะะพ ัะบะพะปัะบะพ ะฑะฐะนัะพะฒ ะฟะธัะฐัั
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ะะตัะพะด ะดะปั ัะบะฐัะธะฒะฐะฝะธั ัะฐะนะปะฐ ะฟะพ ัััะปะบะต ะธะท ะฟะธััะผะฐ
:param mail_id: ะะดะตะฝัะธัะธะบะฐัะพั ะฟะธััะผะฐ
:type mail_id: string
:param path: ะัะดะฐ ะฟะพะปะพะถะธัั ัะฐะนะป
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
แแแแ แแแ แขแแแแ, แแแแขแแ แแแก แแ แกแญแแ แแแแ แแแแแขแแแแแ แแฎแกแแ. แแ แฃแแ แแแแ แแแขแงแแแ แฏแแแแกแแฃแ แ แฎแแแแก imap_conn_id. Apache Airflow แแแแฎแแแก แแแแจแแ แแก แแแ แแแแขแ แแแก (แจแแกแแแ, แแแ แแแ, แแแกแแแแ แแ แแ แกแฎแแ แแแ แแแแขแ แแแ), แ แแแแแแแแช แฌแแแแแ แจแแกแแซแแแแแแแ แกแแแแแแแแ แแแแแขแแคแแแแขแแ แแ. แแแแฃแแแฃแ แแ, แแแแจแแ แแก แแแแแฏแแแแขแ แแกแ แแแแแแงแฃแ แแแ
แกแแแกแแ แ แแแแแแแ แแแแแชแแแแแก
แแแแแแแแ แฉแแแ แฃแแแ แแแชแแ แ แแแแ แแแแแแแแจแแ แแ แแ แแแแแฆแแ แแแแแชแแแแแ แคแแกแขแแแแ, แแฎแแ แจแแแแแซแแแ แแแแฌแแ แแ แกแแแกแแ แ, แ แแ แแแแแแแแแ แแแ. แฉแแแก แจแแแแฎแแแแแจแ, แแ แแแแแแแแแ แแแแ แแขแแ แแก แแแฃแงแแแแแแแแ แแแฌแแ แ, แ แแแแแแช แแแแแฃแจแแแแแก แแแแแชแแแแแก, แแกแแแแก แแ แกแแแแแแก แจแแแแฎแแแแแจแ, แ แแแแแ แกแฎแแ แแ แแชแแกแแแ แแฃแจแแแแก แคแแกแขแแแแ แแแฆแแแฃแ แแแแแชแแแแแแ แแแงแ แแแแแแ, แแแ แจแแ แแก แแกแแแแแแช, แ แแแแแแแช แแฆแแแแ แจแแกแแแแแแก แแแแแชแแแแแก แกแฎแแ แฌแงแแ แแแแแแแ (API, แขแแแแคแแแแ). , แแแ แแแขแ แแแ แแ แ.แจ.) แแ แ.แจ.). แแแแแแแแก แแแแแงแแแ. CRM แกแแกแขแแแแจแ แแฎแแแ แแแแฎแแแ แแแแแ แแแแแฉแแแ แแ แแแกแ UUID-แแก แจแแกแแฎแแ แฏแแ แแแแแ แแ แแแชแแ. แจแแแแแ, แ แแแแกแแช แแชแแแแแแ แแแแแชแแแแแแก แแแฆแแแแก SIP แขแแแแคแแแแแแ, แแแแแฆแแแ แแแก UUID-แแแ แแแแแแจแแ แแแฃแ แแแ แแแก, แแแแ แแ แแแ แจแแแซแแแแ แแแ แกแฌแแ แแ แจแแแแฎแแแก แแ แแแแแงแแแแแแก. แแกแแ แกแแแแแฎแแแจแ แแแแจแแแแแแแแแแ แแแแแแแแแแกแฌแแแแ แแแแแชแแแแแแก แแแแแแแแแแฃแแแแ, แแแแกแแแฃแแ แแแแ แแฃ แแกแแแ แกแฎแแแแแกแฎแแ แฌแงแแ แแแแ แแ แแแ. แแก, แ แ แแฅแแ แฃแแแ, แแ แแกแแแแแ แแกแ แแแแแแแ แแแแแชแแแแ แแแแแแแแแแก แจแแกแแแแ แฉแฃแแแแแแ, แแแแ แแ แแแแแแ แ แจแแแแฎแแแแแจแ แแกแแแ แแฃแชแแแแแแแแ. แแแแฎ, แแ แ แแกแฃแ แกแแแแก แแแกแแแแแแแแแ แฃแกแแฅแแฃแ แแแ แแกแแแ แแ แแชแแแแแแฃแ แแ.
แแแ แแแแ, แฉแแแแ แกแแแกแแ แ แแแแแฃแจแแแแก แแ แแคแแแแก แจแแแแแแ แฌแแแ แแแแก, แแฃ แคแแกแขแแจแ แแ แแก แแฎแแแ แแแคแแ แแแชแแ แแ แแกแแแ แแฆแแแจแแแแก แฌแแแ แแแคแแ แแแชแแแก, แ แแแแ แช แจแแฃแกแแแแแ.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
แฉแแแ แแแฆแแแ แแ แแแงแแแแแ แแแแแชแแแแแก
แแแแแชแแแแแแก แแแกแแฆแแแแ แแ แแแกแแแฃแจแแแแแแแ แจแแแแซแแแแ แแแฌแแ แแ แชแแแแ แแแแ แแขแแ แ, แจแแแแซแแแแ แแแแแแงแแแแ แแแ. แแแแแแแแ แแแแแแ แฏแแ แแแแแ แขแ แแแแแแฃแ แแ - แแแแแแแแแ, แแกแแแแ แแแแแชแแแแแแก แแฆแแแ, แแ แแแแแแแแแ แกแขแแแแแ แขแฃแ PythonOperator-แก.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# ะกัะฐะฝะดะฐััะฝะพะต ะบะพะฝัะธะณััะธัะพะฒะฐะฝะธะต ะณัะฐัะฐ
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ะะฟัะตะดะตะปัะตะผ ัะตะฝัะพั
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# ะคัะฝะบัะธั ะดะปั ะฟะพะปััะตะฝะธั ะดะฐะฝะฝัั
ะธะท ะฟะธััะผะฐ
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ะะฟะธัะฐะฝะธะต ะพััะฐะปัะฝัั
ะฒะตััะธะฝ ะณัะฐัะฐ
...
# ะะฐะดะฐะตะผ ัะฒัะทั ะฝะฐ ะณัะฐัะต
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ะะฟะธัะฐะฝะธะต ะพััะฐะปัะฝัั
ะฟะพัะพะบะพะฒ ัะฟัะฐะฒะปะตะฝะธั
แกแฎแแแแ แจแแ แแก, แแฃ แแฅแแแแ แแแ แแแ แแขแแฃแแ แคแแกแขแ แแกแแแ แแ แแก mail.ru-แแ, แแแจแแ แแแ แจแแซแแแแ แฌแแ แแแแแแก แแแซแแแแแก แกแแแแแก, แแแแแแแแแแก แแ แ.แจ. แฏแแ แแแแแ 2016 แฌแแแก แแแฐแแแ แแแแ แแแก แแแแแ แแแแก, แแแแ แแ, แ แแแแ แช แฉแแแก, แแแแแแคแแฅแ แแก. แแ แแแแแแฌแงแแแขแ แแก แแ แแแแแแ แกแแญแแ แ แแกแแแแแกแแแแก แชแแแแ แกแแฅแแฆแแแแแก แจแแฅแแแแ แแ แคแแกแขแแก แแแ แแแขแแ แคแแแกแจแ แกแแญแแ แ แแกแแแแแกแแแแก แคแแแขแ แแก แแแงแแแแแแ. แแแ แแแแ, แแ แกแแฅแแฆแแแแแจแ แแฎแแแแ แซแแแแแก แกแแญแแ แ แแกแแแแ แแ แแแ แแแแแ, แฉแแแก แจแแแแฎแแแแแจแ, แฃแแ แแแแ (แแแฃแ แแแแแแแแ).
แจแแฏแแแแแแ, แแแแฅแแก แจแแแแแแ แแแแแแแแแแ แแแ: แแแแแฌแแแแ แแ แแก แแฃ แแ แ แแฎแแแ แแกแแแแ, แ แแแแแแแช แแแแแงแแคแแแแแแ แแแ แแแแแก, แแฃ แแ แแก, แจแแแแแ แฉแแแแแขแแแ แแแแ แแ แฅแแแก แแแแ แแกแแแแ แแแฃแแแก แแแแแงแแแแแแ.
แแแแ แฌแแ แขแแแแแแก แฅแแแจ แแแแแขแแแแแฃแแแ, แ แแ แแก แแ แฅแแแ แแแแฎแกแแแแ, แแ แฅแแแแแแ แแแแแชแแแแแ แแแกแฃแคแแแแแแแ แแ แแแแฃแจแแแแแแ แแ แจแแแแแแ, แงแแแแแคแแ แ แฃแคแ แ แจแแ แก แฌแแแ ETL แแ แแชแแกแแก แแแแกแแแแแแแแ, แแแแ แแ แแก แฃแแแ แกแชแแแแแแ. แกแขแแขแแแก แคแแ แแแแแ. แแฃ แกแแแแขแแ แแกแ แแ แกแแกแแ แแแแแ แแฆแแแฉแแแ, แแแจแแ แแ แกแแแแแแแแแแ แแแแแแ แซแแแแ ETL แแแแแฌแงแแแขแแแแแแแแก แแ แแแแ แแแฌแแแแแแก แแฆแฌแแ แแก Apache Airflow-แแกแแแแก.
แฌแงแแ แ: www.habr.com