Não importa o quanto a tecnologia se desenvolva, uma série de abordagens ultrapassadas sempre fica atrás do desenvolvimento. Isto pode ser devido a uma transição suave, fatores humanos, necessidades tecnológicas ou qualquer outra coisa. No campo do processamento de dados, as fontes de dados são as mais reveladoras nesta parte. Por mais que sonhemos em nos livrar disso, até agora parte dos dados é enviada em mensagens instantâneas e e-mails, sem falar em formatos mais arcaicos. Convido você a desmontar uma das opções do Apache Airflow, ilustrando como você pode obter dados de e-mails.
Pré-história
Muitos dados ainda são transferidos via e-mail, desde comunicações interpessoais até padrões de interação entre empresas. É bom que seja possível escrever uma interface para obter dados ou colocar pessoas no escritório que irão inserir essas informações em fontes mais convenientes, mas muitas vezes isso pode simplesmente não ser possível. A tarefa específica que enfrentei foi conectar o notório sistema CRM ao data warehouse e depois ao sistema OLAP. Historicamente, aconteceu que para a nossa empresa a utilização deste sistema era conveniente numa determinada área de negócio. Portanto, todos queriam muito poder operar também com dados desse sistema de terceiros. Em primeiro lugar, é claro, foi estudada a possibilidade de obter dados de uma API aberta. Infelizmente, a API não cobria a obtenção de todos os dados necessários e, em termos simples, era distorcida em muitos aspectos, e o suporte técnico não queria ou não podia chegar a meio caminho para fornecer funcionalidades mais abrangentes. Mas este sistema proporcionou a oportunidade de receber periodicamente os dados perdidos por correio na forma de um link para descarregar o arquivo.
Ressalta-se que este não foi o único caso em que a empresa quis coletar dados de e-mails ou mensagens instantâneas. Porém, neste caso, não poderíamos influenciar uma empresa terceirizada que fornece parte dos dados apenas desta forma.
Fluxo de ar Apache
Para construir processos ETL, geralmente usamos o Apache Airflow. Para que um leitor que não está familiarizado com esta tecnologia entenda melhor sua aparência no contexto e em geral, descreverei algumas introdutórias.
Apache Airflow é uma plataforma gratuita usada para construir, executar e monitorar processos ETL (Extract-Transform-Loading) em Python. O conceito principal do Airflow é um gráfico acíclico direcionado, onde os vértices do gráfico são processos específicos e as bordas do gráfico são o fluxo de controle ou informação. Um processo pode simplesmente chamar qualquer função Python ou pode ter uma lógica mais complexa ao chamar várias funções sequencialmente no contexto de uma classe. Para as operações mais frequentes, já existem muitos desenvolvimentos prontos que podem ser utilizados como processos. Esses desenvolvimentos incluem:
- operadores - para transferir dados de um local para outro, por exemplo, de uma tabela de banco de dados para um data warehouse;
- sensores - para aguardar a ocorrência de determinado evento e direcionar o fluxo de controle para os vértices subsequentes do gráfico;
- ganchos - para operações de nível inferior, por exemplo, para obter dados de uma tabela de banco de dados (usada em instruções);
- и т.д.
Seria inapropriado descrever o Apache Airflow em detalhes neste artigo. Breves introduções podem ser visualizadas
Gancho para obter dados
Primeiro de tudo, para resolver o problema, precisamos escrever um gancho com o qual poderíamos:
- conectar-se ao e-mail
- encontre a letra certa
- receber dados da carta.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
A lógica é esta: conectamos, encontramos a última letra mais relevante, se houver outras, ignoramos. Esta função é usada porque as letras posteriores contêm todos os dados das anteriores. Se não for esse o caso, você poderá retornar uma matriz de todas as letras ou processar a primeira e o restante na próxima passagem. Em geral, tudo, como sempre, depende da tarefa.
Adicionamos duas funções auxiliares ao gancho: baixar um arquivo e baixar um arquivo usando um link de um e-mail. Aliás, eles podem ser transferidos para a operadora, depende da frequência de utilização desta funcionalidade. O que mais adicionar ao gancho, novamente, depende da tarefa: se os arquivos forem recebidos imediatamente na carta, você poderá baixar os anexos da carta, se os dados forem recebidos na carta, você precisará analisar a carta, etc. No meu caso, a carta vem com um link para o arquivo, que preciso colocar em um determinado local e iniciar o processo de processamento posterior.
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
O código é simples, por isso dificilmente precisa de mais explicações. Vou apenas falar sobre a linha mágica imap_conn_id. O Apache Airflow armazena parâmetros de conexão (login, senha, endereço e outros parâmetros) que podem ser acessados por um identificador de string. Visualmente, o gerenciamento de conexões se parece com isto
Sensor para aguardar dados
Como já sabemos como conectar e receber dados do correio, agora podemos escrever um sensor para esperá-los. No meu caso, não funcionou escrever imediatamente um operador que irá processar os dados, se houver, porque outros processos funcionam com base nos dados recebidos do correio, inclusive aqueles que pegam dados relacionados de outras fontes (API, telefonia , métricas da web, etc.). etc.). Vou te dar um exemplo. Um novo usuário apareceu no sistema CRM e ainda não sabemos sobre seu UUID. Então, ao tentar receber dados da telefonia SIP, receberemos chamadas vinculadas ao seu UUID, mas não poderemos salvá-las e utilizá-las corretamente. Nessas questões, é importante ter em mente a dependência dos dados, principalmente se forem de fontes diferentes. Estas são, obviamente, medidas insuficientes para preservar a integridade dos dados, mas em alguns casos são necessárias. Sim, e ficar ocioso para ocupar recursos também é irracional.
Assim, nosso sensor lançará os vértices subsequentes do gráfico se houver informações novas no correio, e também marcará as informações anteriores como irrelevantes.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Recebemos e usamos dados
Para receber e processar dados, você pode escrever um operador separado ou usar operadores já prontos. Como até agora a lógica é trivial - para tirar dados da carta, por exemplo, sugiro o PythonOperator padrão
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления
A propósito, se o seu correio corporativo também estiver em mail.ru, você não poderá pesquisar cartas por assunto, remetente, etc. Em 2016, eles prometeram apresentá-lo, mas aparentemente mudaram de ideia. Resolvi esse problema criando uma pasta separada para as cartas necessárias e configurando um filtro para as cartas necessárias na interface web do correio. Assim, apenas as letras e condições necessárias para a busca, no meu caso, simplesmente (INVISÍVEL) entram nesta pasta.
Resumindo, temos a seguinte sequência: verificamos se há novas cartas que atendam às condições, se houver, baixamos o arquivo através do link da última carta.
Sob os últimos pontos, omite-se que este arquivo será descompactado, os dados do arquivo serão limpos e processados e, como resultado, tudo irá para o pipeline do processo ETL, mas isso já está além o escopo do artigo. Se for interessante e útil, terei prazer em continuar descrevendo soluções ETL e suas partes para Apache Airflow.
Fonte: habr.com