ETL proces pro získávání dat z e-mailu v Apache Airflow

ETL proces pro získávání dat z e-mailu v Apache Airflow

Bez ohledu na to, jak moc se technologie vyvíjí, za vývojem vždy stojí řada zastaralých přístupů. Může to být způsobeno hladkým přechodem, lidskými faktory, technologickými potřebami nebo něčím jiným. V oblasti zpracování dat jsou v této části nejvíce objevné zdroje dat. Bez ohledu na to, jak moc sníme o tom, že se toho zbavíme, zatím je část dat zasílána v instant messengerech a e-mailech, nemluvě o archaičtějších formátech. Zvu vás k rozebrání jedné z možností pro Apache Airflow, která ilustruje, jak můžete přebírat data z e-mailů.

pravěk

Mnoho dat se stále přenáší prostřednictvím e-mailu, od mezilidské komunikace po standardy interakce mezi společnostmi. Je dobré, když je možné napsat rozhraní pro získávání dat nebo dát do kanceláře lidi, kteří budou tyto informace vkládat do pohodlnějších zdrojů, ale často to prostě není možné. Konkrétním úkolem, kterému jsem čelil, bylo připojení notoricky známého CRM systému k datovému skladu a následně k systému OLAP. Historicky se tak stalo, že pro naši společnost bylo použití tohoto systému výhodné v určité oblasti podnikání. Každý si proto opravdu přál umět operovat i s daty z tohoto systému třetích stran. V první řadě se samozřejmě studovala možnost získávání dat z otevřeného API. Bohužel API nepokrývalo získávání všech potřebných dat a zjednodušeně řečeno bylo v mnoha ohledech pokřivené a technická podpora se nechtěla nebo nemohla sejít na půli cesty, aby poskytla komplexnější funkcionalitu. Tento systém však poskytoval příležitost pravidelně dostávat chybějící data poštou ve formě odkazu pro uvolnění archivu.

Nutno podotknout, že to nebyl jediný případ, kdy chtěl byznys sbírat data z e-mailů nebo instant messengerů. V tomto případě jsme však nemohli ovlivnit společnost třetí strany, která část dat poskytuje pouze tímto způsobem.

proudění vzduchu apache

K budování ETL procesů nejčastěji využíváme Apache Airflow. Aby čtenář, který tuto technologii nezná, lépe pochopil, jak vypadá v kontextu a vůbec, popíšu pár úvodních.

Apache Airflow je bezplatná platforma, která se používá k vytváření, spouštění a monitorování procesů ETL (Extract-Transform-Loading) v Pythonu. Hlavním konceptem v Airflow je orientovaný acyklický graf, kde vrcholy grafu jsou specifické procesy a okraje grafu jsou tok řízení nebo informací. Proces může jednoduše volat jakoukoli funkci Pythonu nebo může mít složitější logiku z postupného volání několika funkcí v kontextu třídy. Pro nejfrekventovanější operace již existuje mnoho připravených vývojů, které lze použít jako procesy. Takový vývoj zahrnuje:

  • operátory - pro přenos dat z jednoho místa na druhé, například z databázové tabulky do datového skladu;
  • senzory - pro čekání na výskyt určité události a nasměrování toku řízení do následných vrcholů grafu;
  • háčky - pro operace nižší úrovně, například pro získání dat z databázové tabulky (používané v příkazech);
  • atd.

Bylo by nevhodné v tomto článku podrobně popisovat Apache Airflow. Krátké úvody si můžete prohlédnout zde nebo zde.

Háček pro získávání dat

Nejprve, abychom problém vyřešili, musíme napsat háček, pomocí kterého bychom mohli:

  • připojit k emailu
  • najít správné písmeno
  • přijímat data z dopisu.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika je taková: připojíme se, najdeme poslední nejrelevantnější písmeno, pokud existují další, ignorujeme je. Tato funkce se používá, protože pozdější písmena obsahují všechna data dřívějších. Pokud tomu tak není, můžete vrátit pole všech písmen nebo zpracovat první a zbytek při dalším průchodu. Obecně platí, že vše, jako vždy, závisí na úkolu.

Do háčku přidáváme dvě pomocné funkce: pro stažení souboru a pro stažení souboru pomocí odkazu z emailu. Ty lze mimochodem přesunout k operátorovi, záleží na frekvenci využívání této funkcionality. Co dalšího přidat k háčku, opět záleží na úkolu: pokud jsou soubory přijaty okamžitě v dopise, můžete si stáhnout přílohy k dopisu, pokud jsou data přijata v dopise, musíte dopis analyzovat, atd. V mém případě dopis přichází s jedním odkazem na archiv, který potřebuji umístit na určité místo a zahájit další proces zpracování.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kód je jednoduchý, takže nepotřebuje další vysvětlení. Jen vám řeknu o magické čáře imap_conn_id. Apache Airflow ukládá parametry připojení (login, heslo, adresa a další parametry), ke kterým lze přistupovat pomocí identifikátoru řetězce. Vizuálně vypadá správa připojení takto

ETL proces pro získávání dat z e-mailu v Apache Airflow

Senzor čeká na data

Protože již víme, jak se připojit a přijímat data z pošty, můžeme nyní napsat senzor, který na ně počká. V mém případě se neosvědčilo napsat hned operátora, který data případně zpracuje, protože na základě dat přijatých z pošty fungují jiné procesy, včetně těch, které berou související data z jiných zdrojů (API, telefonie , webové metriky atd.) atd.). Dám vám příklad. V CRM systému se objevil nový uživatel, o jehož UUID stále nevíme. Pak při pokusu o příjem dat ze SIP telefonie budeme přijímat hovory vázané na její UUID, ale nebudeme je moci uložit a správně použít. V takových záležitostech je důležité mít na paměti závislost dat, zvláště pokud jsou z různých zdrojů. To jsou samozřejmě nedostatečná opatření k zachování integrity dat, ale v některých případech jsou nezbytná. Ano, a nečinnost kvůli zabírání zdrojů je také iracionální.

Náš senzor tedy spustí následující vrcholy grafu, pokud jsou v e-mailu čerstvé informace, a také označí předchozí informace jako irelevantní.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Přijímáme a používáme data

Pro příjem a zpracování dat můžete napsat samostatný operátor, můžete použít hotové. Protože logika je stále triviální - například vzít data z dopisu, navrhuji standardní PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Mimochodem, pokud je vaše firemní pošta také na mail.ru, nebudete moci vyhledávat dopisy podle předmětu, odesílatele atd. Ještě v roce 2016 slíbili, že ji zavedou, ale zřejmě si to rozmysleli. Tento problém jsem vyřešil vytvořením samostatné složky pro potřebná písmena a nastavením filtru pro potřebná písmena ve webovém rozhraní pošty. Do této složky se tak dostanou pouze potřebná písmena a podmínky pro vyhledávání, v mém případě prostě (NEVIDĚNÉ).

Shrneme-li, máme následující sekvenci: zkontrolujeme, zda existují nová písmena, která splňují podmínky, pokud existují, stáhneme archiv pomocí odkazu z posledního písmene.
Pod posledními tečkami je vynecháno, že se tento archiv rozbalí, data z archivu se vyčistí a zpracují a ve výsledku to celé půjde dál do potrubí procesu ETL, ale to už je mimo rozsah článku. Pokud by to bylo zajímavé a užitečné, pak budu rád pokračovat v popisu ETL řešení a jejich částí pro Apache Airflow.

Zdroj: www.habr.com

Přidat komentář