Processo ETL para obter dados de e-mail no Apache Airflow

Processo ETL para obter dados de e-mail no Apache Airflow

Não importa o quanto a tecnologia se desenvolva, uma série de abordagens ultrapassadas sempre fica atrás do desenvolvimento. Isto pode ser devido a uma transição suave, fatores humanos, necessidades tecnológicas ou qualquer outra coisa. No campo do processamento de dados, as fontes de dados são as mais reveladoras nesta parte. Por mais que sonhemos em nos livrar disso, até agora parte dos dados é enviada em mensagens instantâneas e e-mails, sem falar em formatos mais arcaicos. Convido você a desmontar uma das opções do Apache Airflow, ilustrando como você pode obter dados de e-mails.

Pré-história

Muitos dados ainda são transferidos via e-mail, desde comunicações interpessoais até padrões de interação entre empresas. É bom que seja possível escrever uma interface para obter dados ou colocar pessoas no escritório que irão inserir essas informações em fontes mais convenientes, mas muitas vezes isso pode simplesmente não ser possível. A tarefa específica que enfrentei foi conectar o notório sistema CRM ao data warehouse e depois ao sistema OLAP. Historicamente, aconteceu que para a nossa empresa a utilização deste sistema era conveniente numa determinada área de negócio. Portanto, todos queriam muito poder operar também com dados desse sistema de terceiros. Em primeiro lugar, é claro, foi estudada a possibilidade de obter dados de uma API aberta. Infelizmente, a API não cobria a obtenção de todos os dados necessários e, em termos simples, era distorcida em muitos aspectos, e o suporte técnico não queria ou não podia chegar a meio caminho para fornecer funcionalidades mais abrangentes. Mas este sistema proporcionou a oportunidade de receber periodicamente os dados perdidos por correio na forma de um link para descarregar o arquivo.

Ressalta-se que este não foi o único caso em que a empresa quis coletar dados de e-mails ou mensagens instantâneas. Porém, neste caso, não poderíamos influenciar uma empresa terceirizada que fornece parte dos dados apenas desta forma.

Fluxo de ar Apache

Para construir processos ETL, geralmente usamos o Apache Airflow. Para que um leitor que não está familiarizado com esta tecnologia entenda melhor sua aparência no contexto e em geral, descreverei algumas introdutórias.

Apache Airflow é uma plataforma gratuita usada para construir, executar e monitorar processos ETL (Extract-Transform-Loading) em Python. O conceito principal do Airflow é um gráfico acíclico direcionado, onde os vértices do gráfico são processos específicos e as bordas do gráfico são o fluxo de controle ou informação. Um processo pode simplesmente chamar qualquer função Python ou pode ter uma lógica mais complexa ao chamar várias funções sequencialmente no contexto de uma classe. Para as operações mais frequentes, já existem muitos desenvolvimentos prontos que podem ser utilizados como processos. Esses desenvolvimentos incluem:

  • operadores - para transferir dados de um local para outro, por exemplo, de uma tabela de banco de dados para um data warehouse;
  • sensores - para aguardar a ocorrência de determinado evento e direcionar o fluxo de controle para os vértices subsequentes do gráfico;
  • ganchos - para operações de nível inferior, por exemplo, para obter dados de uma tabela de banco de dados (usada em instruções);
  • и т.д.

Seria inapropriado descrever o Apache Airflow em detalhes neste artigo. Breves introduções podem ser visualizadas aqui ou aqui.

Gancho para obter dados

Primeiro de tudo, para resolver o problema, precisamos escrever um gancho com o qual poderíamos:

  • conectar-se ao e-mail
  • encontre a letra certa
  • receber dados da carta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

A lógica é esta: conectamos, encontramos a última letra mais relevante, se houver outras, ignoramos. Esta função é usada porque as letras posteriores contêm todos os dados das anteriores. Se não for esse o caso, você poderá retornar uma matriz de todas as letras ou processar a primeira e o restante na próxima passagem. Em geral, tudo, como sempre, depende da tarefa.

Adicionamos duas funções auxiliares ao gancho: baixar um arquivo e baixar um arquivo usando um link de um e-mail. Aliás, eles podem ser transferidos para a operadora, depende da frequência de utilização desta funcionalidade. O que mais adicionar ao gancho, novamente, depende da tarefa: se os arquivos forem recebidos imediatamente na carta, você poderá baixar os anexos da carta, se os dados forem recebidos na carta, você precisará analisar a carta, etc. No meu caso, a carta vem com um link para o arquivo, que preciso colocar em um determinado local e iniciar o processo de processamento posterior.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

O código é simples, por isso dificilmente precisa de mais explicações. Vou apenas falar sobre a linha mágica imap_conn_id. O Apache Airflow armazena parâmetros de conexão (login, senha, endereço e outros parâmetros) que podem ser acessados ​​por um identificador de string. Visualmente, o gerenciamento de conexões se parece com isto

Processo ETL para obter dados de e-mail no Apache Airflow

Sensor para aguardar dados

Como já sabemos como conectar e receber dados do correio, agora podemos escrever um sensor para esperá-los. No meu caso, não funcionou escrever imediatamente um operador que irá processar os dados, se houver, porque outros processos funcionam com base nos dados recebidos do correio, inclusive aqueles que pegam dados relacionados de outras fontes (API, telefonia , métricas da web, etc.). etc.). Vou te dar um exemplo. Um novo usuário apareceu no sistema CRM e ainda não sabemos sobre seu UUID. Então, ao tentar receber dados da telefonia SIP, receberemos chamadas vinculadas ao seu UUID, mas não poderemos salvá-las e utilizá-las corretamente. Nessas questões, é importante ter em mente a dependência dos dados, principalmente se forem de fontes diferentes. Estas são, obviamente, medidas insuficientes para preservar a integridade dos dados, mas em alguns casos são necessárias. Sim, e ficar ocioso para ocupar recursos também é irracional.

Assim, nosso sensor lançará os vértices subsequentes do gráfico se houver informações novas no correio, e também marcará as informações anteriores como irrelevantes.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Recebemos e usamos dados

Para receber e processar dados, você pode escrever um operador separado ou usar operadores já prontos. Como até agora a lógica é trivial - para tirar dados da carta, por exemplo, sugiro o PythonOperator padrão

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

A propósito, se o seu correio corporativo também estiver em mail.ru, você não poderá pesquisar cartas por assunto, remetente, etc. Em 2016, eles prometeram apresentá-lo, mas aparentemente mudaram de ideia. Resolvi esse problema criando uma pasta separada para as cartas necessárias e configurando um filtro para as cartas necessárias na interface web do correio. Assim, apenas as letras e condições necessárias para a busca, no meu caso, simplesmente (INVISÍVEL) entram nesta pasta.

Resumindo, temos a seguinte sequência: verificamos se há novas cartas que atendam às condições, se houver, baixamos o arquivo através do link da última carta.
Sob os últimos pontos, omite-se que este arquivo será descompactado, os dados do arquivo serão limpos e processados ​​​​e, como resultado, tudo irá para o pipeline do processo ETL, mas isso já está além o escopo do artigo. Se for interessante e útil, terei prazer em continuar descrevendo soluções ETL e suas partes para Apache Airflow.

Fonte: habr.com

Adicionar um comentário