Processo ETL per ottenere dati dalla posta elettronica in Apache Airflow

Processo ETL per ottenere dati dalla posta elettronica in Apache Airflow

Non importa quanto la tecnologia si sviluppi, una serie di approcci obsoleti segue sempre lo sviluppo. Ciò può essere dovuto a una transizione graduale, fattori umani, esigenze tecnologiche o qualcos'altro. Nel campo dell'elaborazione dei dati, le fonti di dati sono le più rivelatrici in questa parte. Non importa quanto sogniamo di sbarazzarcene, ma finora parte dei dati viene inviata tramite messaggistica istantanea ed e-mail, per non parlare dei formati più arcaici. Ti invito a smontare una delle opzioni per Apache Airflow, illustrando come puoi prelevare dati dalle email.

Sfondo

Molti dati vengono ancora trasferiti via e-mail, dalle comunicazioni interpersonali agli standard di interazione tra aziende. È positivo se è possibile scrivere un'interfaccia per ottenere dati o inserire persone in ufficio che inseriranno queste informazioni in fonti più convenienti, ma spesso ciò potrebbe semplicemente non essere possibile. Il compito specifico che ho dovuto affrontare è stato collegare il famigerato sistema CRM al data warehouse, e quindi al sistema OLAP. Storicamente è successo che per la nostra azienda l'utilizzo di questo sistema fosse conveniente in una particolare area di attività. Pertanto, tutti volevano davvero poter operare anche con i dati di questo sistema di terze parti. Prima di tutto, ovviamente, è stata studiata la possibilità di ottenere dati da un'API aperta. Sfortunatamente, l'API non copriva l'ottenimento di tutti i dati necessari e, in termini semplici, era per molti versi storta e il supporto tecnico non voleva o non poteva incontrarsi a metà strada per fornire funzionalità più complete. Ma questo sistema offriva l'opportunità di ricevere periodicamente per posta i dati mancanti sotto forma di collegamento per lo scarico dell'archivio.

Va notato che questo non era l'unico caso in cui l'azienda voleva raccogliere dati da e-mail o messaggistica istantanea. Tuttavia, in questo caso, non potremmo influenzare una società terza che fornisce parte dei dati solo in questo modo.

Flusso d'aria Apache

Per creare processi ETL, utilizziamo molto spesso Apache Airflow. Affinché un lettore che non abbia familiarità con questa tecnologia possa capire meglio come appare nel contesto e in generale, ne descriverò un paio introduttivi.

Apache Airflow è una piattaforma gratuita utilizzata per creare, eseguire e monitorare i processi ETL (Extract-Transform-Loading) in Python. Il concetto principale in Airflow è un grafico aciclico diretto, in cui i vertici del grafico sono processi specifici e i bordi del grafico sono il flusso di controllo o informazioni. Un processo può semplicemente chiamare qualsiasi funzione Python o può avere una logica più complessa chiamando in sequenza diverse funzioni nel contesto di una classe. Per le operazioni più frequenti, esistono già molti sviluppi già pronti che possono essere utilizzati come processi. Tali sviluppi includono:

  • operatori - per trasferire dati da un luogo a un altro, ad esempio da una tabella di database a un data warehouse;
  • sensori - per attendere il verificarsi di un determinato evento e dirigere il flusso di controllo ai successivi vertici del grafico;
  • hook - per operazioni di livello inferiore, ad esempio per ottenere dati da una tabella di database (utilizzati nelle istruzioni);
  • eccetera

Sarebbe inappropriato descrivere dettagliatamente Apache Airflow in questo articolo. È possibile visualizzare brevi presentazioni qui o qui.

Hook per ottenere dati

Prima di tutto, per risolvere il problema, dobbiamo scrivere un hook con il quale potremmo:

  • connettersi alla posta elettronica
  • trova la lettera giusta
  • ricevere i dati dalla lettera.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

La logica è questa: ci colleghiamo, troviamo l'ultima lettera più rilevante, se ce ne sono altre le ignoriamo. Questa funzione viene utilizzata perché le lettere successive contengono tutti i dati di quelle precedenti. In caso contrario, puoi restituire un array di tutte le lettere o elaborare la prima e il resto al passaggio successivo. In generale, tutto, come sempre, dipende dall'attività.

Aggiungiamo due funzioni ausiliarie all'hook: per scaricare un file e per scaricare un file utilizzando un collegamento da un'e-mail. A proposito, possono essere trasferiti all'operatore, dipende dalla frequenza di utilizzo di questa funzionalità. Cos'altro aggiungere all'hook, ancora una volta, dipende dall'attività: se i file vengono ricevuti immediatamente nella lettera, è possibile scaricare gli allegati alla lettera, se i dati vengono ricevuti nella lettera, è necessario analizzare la lettera, eccetera. Nel mio caso, la lettera viene fornita con un collegamento all'archivio, che devo inserire in un determinato luogo e avviare l'ulteriore processo di elaborazione.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Il codice è semplice, quindi non ha bisogno di ulteriori spiegazioni. Ti parlerò solo della linea magica imap_conn_id. Apache Airflow memorizza i parametri di connessione (login, password, indirizzo e altri parametri) a cui è possibile accedere tramite un identificatore di stringa. Visivamente, la gestione della connessione è simile a questa

Processo ETL per ottenere dati dalla posta elettronica in Apache Airflow

Sensore in attesa di dati

Poiché sappiamo già come connetterci e ricevere dati dalla posta, ora possiamo scrivere un sensore che li attenda. Nel mio caso non ha funzionato scrivere subito un operatore che elaborerà i dati, se ce ne saranno, perché altri processi funzionano in base ai dati ricevuti dalla posta, inclusi quelli che prendono i dati correlati da altre fonti (API, telefonia , metriche web, ecc.). Ti faccio un esempio. Un nuovo utente è apparso nel sistema CRM e non sappiamo ancora del suo UUID. Quindi, quando proveremo a ricevere dati dalla telefonia SIP, riceveremo chiamate legate al suo UUID, ma non saremo in grado di salvarle e utilizzarle correttamente. In tali questioni, è importante tenere presente la dipendenza dei dati, soprattutto se provengono da fonti diverse. Queste sono, ovviamente, misure insufficienti per preservare l'integrità dei dati, ma in alcuni casi sono necessarie. Sì, e anche l'ozio per occupare risorse è irrazionale.

Pertanto, il nostro sensore lancerà i successivi vertici del grafico se ci sono nuove informazioni nella posta e contrassegnerà anche le informazioni precedenti come irrilevanti.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Riceviamo e utilizziamo i dati

Per ricevere ed elaborare i dati, puoi scrivere un operatore separato, puoi usare quelli già pronti. Poiché la logica è ancora banale, ad esempio per prendere i dati dalla lettera, suggerisco lo standard PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

A proposito, se anche la tua posta aziendale è su mail.ru, non sarai in grado di cercare lettere per oggetto, mittente, ecc. Nel 2016 avevano promesso di introdurlo, ma a quanto pare hanno cambiato idea. Ho risolto questo problema creando una cartella separata per le lettere necessarie e impostando un filtro per le lettere necessarie nell'interfaccia web della posta. Pertanto, solo le lettere e le condizioni necessarie per la ricerca, nel mio caso, semplicemente (UNSEEN) entrano in questa cartella.

Riassumendo, abbiamo la seguente sequenza: controlliamo se ci sono nuove lettere che soddisfano le condizioni, se ci sono, quindi scarichiamo l'archivio utilizzando il collegamento dall'ultima lettera.
Sotto gli ultimi punti, si omette che questo archivio verrà decompresso, i dati dell'archivio verranno cancellati ed elaborati e, di conseguenza, il tutto andrà oltre la pipeline del processo ETL, ma questo è già oltre la portata dell'articolo. Se si è rivelato interessante e utile, continuerò volentieri a descrivere le soluzioni ETL e le loro parti per Apache Airflow.

Fonte: habr.com

Aggiungi un commento