Proceso ETL para obter datos do correo electrónico en Apache Airflow

Proceso ETL para obter datos do correo electrónico en Apache Airflow

Non importa canto se desenvolva a tecnoloxía, unha serie de enfoques obsoletos sempre van detrás do desenvolvemento. Isto pode deberse a unha transición suave, factores humanos, necesidades tecnolóxicas ou outra cousa. No ámbito do procesamento de datos, as fontes de datos son as máis reveladoras nesta parte. Por moito que soñemos con desfacernos disto, pero ata agora parte dos datos envíanse en mensaxería instantánea e correos electrónicos, sen esquecer formatos máis arcaicos. Invítoche a desmontar unha das opcións de Apache Airflow, ilustrando como podes sacar datos dos correos electrónicos.

prehistoria

Aínda se transfiren moitos datos a través do correo electrónico, desde comunicacións interpersoais ata estándares de interacción entre empresas. É bo se é posible escribir unha interface para obter datos ou poñer persoas na oficina que introduzan esta información en fontes máis convenientes, pero moitas veces isto pode simplemente non ser posible. A tarefa específica á que me enfrontei foi conectar o famoso sistema CRM ao almacén de datos e despois ao sistema OLAP. Aconteceu historicamente que para a nosa empresa o uso deste sistema era conveniente nunha determinada área de negocio. Polo tanto, todo o mundo realmente quería poder operar con datos deste sistema de terceiros tamén. En primeiro lugar, por suposto, estudouse a posibilidade de obter datos dunha API aberta. Desafortunadamente, a API non cubría a obtención de todos os datos necesarios e, en termos sinxelos, estaba en moitos aspectos torcida, e o soporte técnico non quería ou non podía cumprir a metade de proporcionar unha funcionalidade máis completa. Pero este sistema ofrecía a oportunidade de recibir periodicamente os datos que faltaban por correo en forma de ligazón para descargar o arquivo.

Cómpre sinalar que este non foi o único caso no que a empresa quería recoller datos de correos electrónicos ou mensaxería instantánea. Non obstante, neste caso, non poderiamos influír nunha empresa allea que proporciona parte dos datos só deste xeito.

fluxo de aire apache

Para crear procesos ETL, a maioría das veces usamos Apache Airflow. Para que un lector que non estea familiarizado con esta tecnoloxía entenda mellor como se ve no contexto e en xeral, describirei un par de introdutorios.

Apache Airflow é unha plataforma gratuíta que se usa para construír, executar e supervisar procesos ETL (Extract-Transform-Loading) en Python. O concepto principal en Airflow é un gráfico acíclico dirixido, onde os vértices do gráfico son procesos específicos e os bordos do gráfico son o fluxo de control ou información. Un proceso pode simplemente chamar a calquera función de Python, ou pode ter unha lóxica máis complexa ao chamar secuencialmente varias funcións no contexto dunha clase. Para as operacións máis frecuentes, xa hai moitos desenvolvementos preparados que se poden utilizar como procesos. Tales desenvolvementos inclúen:

  • operadores: para transferir datos dun lugar a outro, por exemplo, dunha táboa de base de datos a un almacén de datos;
  • sensores - para esperar a ocorrencia dun determinado evento e dirixir o fluxo de control aos vértices posteriores do gráfico;
  • ganchos - para operacións de nivel inferior, por exemplo, para obter datos dunha táboa de base de datos (usada en instrucións);
  • etc

Non sería apropiado describir Apache Airflow en detalle neste artigo. Pódense ver breves presentacións aquí ou aquí.

Gancho para obter datos

En primeiro lugar, para resolver o problema, necesitamos escribir un gancho co que poderiamos:

  • conectarse ao correo electrónico
  • atopar a letra correcta
  • recibir datos da carta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

O lóxico é este: conectamos, atopamos a última letra máis relevante, se hai outras, ignorámolas. Utilízase esta función, porque as letras posteriores conteñen todos os datos das anteriores. Se non é o caso, podes devolver unha matriz de todas as letras ou procesar a primeira e o resto no seguinte paso. En xeral, todo, coma sempre, depende da tarefa.

Engadimos dúas funcións auxiliares ao gancho: para descargar un ficheiro e para descargar un ficheiro mediante unha ligazón dun correo electrónico. Por certo, pódense mover ao operador, depende da frecuencia de uso desta funcionalidade. Que máis engadir ao gancho, de novo, depende da tarefa: se os ficheiros se reciben inmediatamente na carta, podes descargar anexos á carta, se os datos se reciben na carta, debes analizar a carta, etc. No meu caso, a carta inclúe unha ligazón ao arquivo, que teño que poñer nun lugar determinado e comezar o proceso de procesamento posterior.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

O código é sinxelo, polo que non precisa máis explicación. Só falarei sobre a liña máxica imap_conn_id. Apache Airflow almacena parámetros de conexión (inicio de sesión, contrasinal, enderezo e outros parámetros) aos que se pode acceder mediante un identificador de cadea. Visualmente, a xestión de conexións é así

Proceso ETL para obter datos do correo electrónico en Apache Airflow

Sensor para esperar datos

Como xa sabemos conectar e recibir datos do correo, agora podemos escribir un sensor para agardalos. No meu caso, non funcionou escribir de inmediato un operador que procesase os datos, se é o caso, porque outros procesos funcionan en función dos datos recibidos do correo, incluídos aqueles que toman datos relacionados doutras fontes (API, telefonía). , métricas web, etc.) etc.). Vouche poñer un exemplo. Apareceu un novo usuario no sistema CRM e aínda non sabemos o seu UUID. Despois, ao tentar recibir datos da telefonía SIP, recibiremos chamadas vinculadas ao seu UUID, pero non poderemos gardalas nin usalas correctamente. Nestes asuntos, é importante ter en conta a dependencia dos datos, sobre todo se son de fontes diferentes. Estas son, por suposto, medidas insuficientes para preservar a integridade dos datos, pero nalgúns casos son necesarias. Si, e o ralentí para ocupar recursos tamén é irracional.

Así, o noso sensor lanzará os vértices posteriores do gráfico se hai información fresca no correo e tamén marcará a información anterior como irrelevante.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Recibimos e utilizamos datos

Para recibir e procesar datos, pode escribir un operador separado, pode usar outros preparados. Dado que a lóxica aínda é trivial - para tomar datos da carta, por exemplo, suxiro o PythonOperator estándar

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Por certo, se o teu correo corporativo tamén está en mail.ru, non poderás buscar cartas por asunto, remitente, etc. En 2016, prometeron presentalo, pero ao parecer cambiaron de opinión. Resolvín este problema creando un cartafol separado para as letras necesarias e configurando un filtro para as letras necesarias na interface web de correo. Así, só as letras e condicións necesarias para a busca, no meu caso, simplemente (UNSEEN) entran neste cartafol.

En resumo, temos a seguinte secuencia: comprobamos se hai novas letras que cumpran as condicións, se as hai, despois descargamos o arquivo mediante o enlace da última letra.
Baixo os últimos puntos, omítese que este arquivo será desempaquetado, os datos do arquivo serán borrados e procesados ​​e, como resultado, todo irá máis aló para o proceso de ETL, pero isto xa está máis alá. o ámbito de aplicación do artigo. Se resultou interesante e útil, seguirei describindo as solucións ETL e as súas partes para Apache Airflow.

Fonte: www.habr.com

Engadir un comentario