Proceso ETL para obtener datos del correo electrónico en Apache Airflow

Proceso ETL para obtener datos del correo electrónico en Apache Airflow

No importa cuánto se desarrolle la tecnología, siempre hay una serie de enfoques obsoletos detrás del desarrollo. Esto puede deberse a una transición sin problemas, factores humanos, necesidades tecnológicas u otra cosa. En el campo del procesamiento de datos, las fuentes de datos son las más reveladoras en esta parte. No importa cuánto soñemos con deshacernos de esto, pero hasta ahora parte de los datos se envían en mensajería instantánea y correos electrónicos, sin mencionar los formatos más arcaicos. Los invito a desarmar una de las opciones de Apache Airflow, ilustrando como se pueden tomar datos de los correos electrónicos.

Prehistoria

Todavía se transfieren muchos datos a través del correo electrónico, desde comunicaciones interpersonales hasta estándares de interacción entre empresas. Es bueno si es posible escribir una interfaz para obtener datos o poner personas en la oficina que ingresen esta información en fuentes más convenientes, pero a menudo esto simplemente no es posible. La tarea específica a la que me enfrenté fue conectar el notorio sistema CRM al almacén de datos y luego al sistema OLAP. Sucedió históricamente que para nuestra empresa el uso de este sistema era conveniente en un área particular de negocios. Por lo tanto, todos realmente querían poder operar también con datos de este sistema de terceros. En primer lugar, por supuesto, se estudió la posibilidad de obtener datos de una API abierta. Desafortunadamente, la API no cubría la obtención de todos los datos necesarios y, en términos simples, estaba torcida en muchos sentidos, y el soporte técnico no quería o no podía cumplir la mitad del camino para proporcionar una funcionalidad más completa. Pero este sistema brindó la oportunidad de recibir periódicamente los datos faltantes por correo en forma de enlace para descargar el archivo.

Cabe señalar que este no fue el único caso en el que la empresa quiso recopilar datos de correos electrónicos o mensajería instantánea. Sin embargo, en este caso, no podríamos influir en una empresa de terceros que proporciona parte de los datos solo de esta manera.

Flujo de aire Apache

Para construir procesos ETL, con mayor frecuencia usamos Apache Airflow. Para que un lector que no esté familiarizado con esta tecnología pueda comprender mejor cómo se ve en el contexto y en general, describiré un par de introductorias.

Apache Airflow es una plataforma gratuita que se utiliza para construir, ejecutar y monitorear procesos ETL (Extract-Transform-Loading) en Python. El concepto principal en Airflow es un gráfico acíclico dirigido, donde los vértices del gráfico son procesos específicos y los bordes del gráfico son el flujo de control o información. Un proceso puede simplemente llamar a cualquier función de Python, o puede tener una lógica más compleja llamando secuencialmente a varias funciones en el contexto de una clase. Para las operaciones más frecuentes, ya existen muchos desarrollos listos para usar que se pueden utilizar como procesos. Tales desarrollos incluyen:

  • operadores: para transferir datos de un lugar a otro, por ejemplo, desde una tabla de base de datos a un almacén de datos;
  • sensores: para esperar la ocurrencia de un determinado evento y dirigir el flujo de control a los vértices posteriores del gráfico;
  • ganchos: para operaciones de nivel inferior, por ejemplo, para obtener datos de una tabla de base de datos (utilizados en declaraciones);
  • etcétera

Sería inapropiado describir Apache Airflow en detalle en este artículo. Se pueden ver breves introducciones aquí o aquí.

Gancho para obtener datos

En primer lugar, para resolver el problema, necesitamos escribir un gancho con el que podamos:

  • conectarse al correo electrónico
  • encontrar la letra correcta
  • recibir datos de la carta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

La lógica es esta: conectamos, encontramos la última letra más relevante, si hay otras, las ignoramos. Esta función se utiliza porque las letras posteriores contienen todos los datos de las anteriores. Si este no es el caso, puede devolver una matriz de todas las letras, o procesar la primera y el resto en el siguiente paso. En general, todo, como siempre, depende de la tarea.

Agregamos dos funciones auxiliares al gancho: para descargar un archivo y para descargar un archivo usando un enlace desde un correo electrónico. Por cierto, se pueden mover al operador, depende de la frecuencia de uso de esta funcionalidad. Qué más agregar al gancho, nuevamente, depende de la tarea: si los archivos se reciben inmediatamente en la carta, entonces puede descargar los archivos adjuntos a la carta, si los datos se reciben en la carta, entonces necesita analizar la carta, etc. En mi caso, la carta viene con un enlace al archivo, que debo colocar en un lugar determinado y comenzar el proceso de procesamiento adicional.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

El código es simple, por lo que apenas necesita más explicaciones. Solo te contaré sobre la línea mágica imap_conn_id. Apache Airflow almacena parámetros de conexión (inicio de sesión, contraseña, dirección y otros parámetros) a los que se puede acceder mediante un identificador de cadena. Visualmente, la administración de conexiones se ve así

Proceso ETL para obtener datos del correo electrónico en Apache Airflow

Sensor para esperar datos

Como ya sabemos cómo conectarnos y recibir datos del correo, ahora podemos escribir un sensor para esperarlos. En mi caso, no funcionó escribir un operador de inmediato que procesará los datos, si los hay, porque otros procesos funcionan en función de los datos recibidos del correo, incluidos los que toman datos relacionados de otras fuentes (API, telefonía , métricas web, etc.), etc.). Te daré un ejemplo. Ha aparecido un nuevo usuario en el sistema de CRM, y todavía no conocemos su UUID. Entonces, al intentar recibir datos de telefonía SIP, recibiremos llamadas ligadas a su UUID, pero no podremos guardarlas y utilizarlas correctamente. En tales asuntos, es importante tener en cuenta la dependencia de los datos, especialmente si son de diferentes fuentes. Estas son, por supuesto, medidas insuficientes para preservar la integridad de los datos, pero en algunos casos son necesarias. Sí, y el ralentí para ocupar recursos también es irracional.

Por lo tanto, nuestro sensor lanzará vértices posteriores del gráfico si hay información nueva en el correo y también marcará la información anterior como irrelevante.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Recibimos y usamos datos

Para recibir y procesar datos, puede escribir un operador separado, puede usar los ya preparados. Dado que hasta ahora la lógica es trivial: para tomar datos de la carta, por ejemplo, sugiero el PythonOperator estándar

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Por cierto, si su correo corporativo también está en mail.ru, entonces no podrá buscar cartas por asunto, remitente, etc. En 2016, prometieron presentarlo, pero aparentemente cambiaron de opinión. Resolví este problema creando una carpeta separada para las letras necesarias y configurando un filtro para las letras necesarias en la interfaz web de correo. Por lo tanto, solo las letras y condiciones necesarias para la búsqueda, en mi caso, simplemente (INVISIBLE) ingresan a esta carpeta.

Resumiendo, tenemos la siguiente secuencia: verificamos si hay nuevas cartas que cumplan con las condiciones, si las hay, luego descargamos el archivo usando el enlace de la última carta.
Debajo de los últimos puntos, se omite que este archivo se desempaquetará, los datos del archivo se borrarán y procesarán y, como resultado, todo irá más allá de la canalización del proceso ETL, pero esto ya está más allá el alcance del artículo. Si resultó interesante y útil, con gusto continuaré describiendo las soluciones ETL y sus partes para Apache Airflow.

Fuente: habr.com

Añadir un comentario