Prucessu ETL per uttene dati da email in Apache Airflow

Prucessu ETL per uttene dati da email in Apache Airflow

Ùn importa micca quantu a tecnulugia si sviluppa, una catena di approcci obsoleti sempre traccia daretu à u sviluppu. Questu pò esse dovutu à una transizione liscia, fatturi umani, bisogni tecnologichi, o qualcosa altru. In u campu di u trattamentu di dati, i fonti di dati sò i più revelatori in questa parte. Ùn importa micca quantu sognu di sbarazzà di questu, ma finu à quì una parte di e dati hè mandata in messageria istantanea è email, per ùn dì micca formate più arcaiche. Vi invitu à disassemble una di l'opzioni per Apache Airflow, illustrandu cumu pudete piglià dati da e-mail.

Pristoria

Un saccu di dati hè sempre trasferitu via e-mail, da e cumunicazioni interpersonali à i normi di interazzione trà e cumpagnie. Hè bonu s'ellu hè pussibule di scrive una interfaccia per ottene dati o mette in l'uffiziu e persone chì entreranu sta informazione in fonti più convenienti, ma spessu questu ùn hè micca pussibule. U compitu specificu chì aghju affruntatu era cunnette u famusu sistema CRM à u magazzinu di dati, è dopu à u sistema OLAP. Hè accadutu storicu chì per a nostra cumpagnia l'usu di stu sistema era cunvenutu in una zona particulare di l'affari. Dunque, tutti vulianu veramente pudè uperà cù dati da stu sistema di terzu. Prima di tuttu, sicuru, a pussibilità di ottene dati da una API aperta hè stata studiata. Sfurtunatamente, l'API ùn copre micca ottene tutte e dati necessarii, è, in termini simplici, era in parechje manere currettu, è u supportu tecnicu ùn vulia micca o ùn pudia micca scuntrà a mità di strada per furnisce una funziunalità più cumpleta. Ma stu sistema hà datu l'uppurtunità di riceve periodicamente i dati mancanti per mail in forma di un ligame per scaricamentu di l'archiviu.

Hè da nutà chì questu ùn era micca l'unicu casu in quale l'affari vulia cullà dati da emails o messageri instantani. Tuttavia, in questu casu, ùn pudemu micca influenzà una cumpagnia di terzu chì furnisce una parte di e dati solu in questu modu.

flussu d'aria apache

Per custruisce i prucessi ETL, u più spessu usemu Apache Airflow. Per un lettore chì ùn hè micca familiarizatu cù sta tecnulugia per capisce megliu cumu si vede in u cuntestu è in generale, descriveraghju un paru di introduttori.

Apache Airflow hè una piattaforma libera chì hè aduprata per custruisce, eseguisce è monitorà i prucessi ETL (Extract-Transform-Loading) in Python. U cuncettu principalu in Airflow hè un gràficu aciclicu direttu, induve i vertici di u gràficu sò prucessi specifichi, è i bordi di u gràficu sò u flussu di cuntrollu o infurmazione. Un prucessu pò simpricimenti chjamà qualsiasi funzione Python, o pò avè una logica più cumplessa da chjamà sequenzialmente parechje funzioni in u cuntestu di una classe. Per l'operazioni più frequenti, ci sò digià parechji sviluppi pronti chì ponu esse usatu cum'è prucessi. Tali sviluppi includenu:

  • operatori - per trasfiriri dati da un locu à l'altru, per esempiu, da una tabella di basa di dati à un magazzinu di dati;
  • sensori - per aspittà l'occurrence di un certu avvenimentu è dirigendu u flussu di cuntrollu à i vertici successivi di u graficu;
  • ganci - per l'operazioni di livellu più bassu, per esempiu, per uttene dati da una tabella di basa di dati (usata in dichjarazioni);
  • è cusì.

Saria inappropriatu di discrive Apache Airflow in dettagliu in questu articulu. Brevi introduzioni ponu esse vistu ccà o ccà.

Ganciu per uttene dati

Prima di tuttu, per risolve u prublema, avemu bisognu di scrive un ganciu cù quale pudemu:

  • cunnette à e-mail
  • truvà a lettera ghjusta
  • riceve dati da a lettera.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

A logica hè questu: cunnettamu, truvamu l'ultima lettera più pertinente, s'ellu ci sò altri, ignuramu. Questa funzione hè aduprata, perchè e lettere più tardi cuntenenu tutti i dati di i primi. S'ellu ùn hè micca u casu, pudete turnà un array di tutte e lettere, o processà u primu, è u restu nantu à u prossimu passu. In generale, tuttu, cum'è sempre, dipende di u compitu.

Aghjunghjemu duie funzioni ausiliarii à u ganciu: per scaricà un schedariu è per scaricà un schedariu cù un ligame da un email. In modu, ponu esse spustati à l'operatore, dipende da a freccia di usu di sta funziunalità. Cosa altru à aghjunghje à u ganciu, di novu, dipende di u compitu: se i schedari sò ricevuti subitu in a lettera, allura pudete scaricà attachments à a lettera, se i dati sò ricevuti in a lettera, allora avete bisognu di analizà a lettera, ecc. In u mo casu, a lettera vene cun un ligame à l'archiviu, chì aghju bisognu di mette in un certu locu è principià u prucessu di trasfurmazioni.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

U codice hè simplice, cusì ùn hà micca bisognu di più spiegazione. Vi dicu solu nantu à a linea magica imap_conn_id. Apache Airflow guarda i paràmetri di cunnessione (login, password, indirizzu è altri paràmetri) chì ponu accede da un identificatore di stringa. Visualmente, a gestione di cunnessione s'assumiglia cusì

Prucessu ETL per uttene dati da email in Apache Airflow

Sensor per aspittà i dati

Siccomu sapemu digià cumu cunnette è riceve dati da mail, pudemu avà scrive un sensor per aspittà per elli. In u mo casu, ùn hà micca travagliatu per scrive un operatore subitu chì prucederà e dati, s'ellu ci hè, perchè altri prucessi funzionanu basatu nantu à e dati ricevuti da u mail, cumpresi quelli chì piglianu dati cunnessi da altre fonti (API, telefonia). , metrica web, etc.). etc.). Vi daraghju un esempiu. Un novu utilizatore hè apparsu in u sistema CRM, è ùn sapemu ancu u so UUID. Allora, quandu pruvate di riceve dati da a telefonia SIP, riceveremu chjamati ligati à u so UUID, ma ùn pudemu micca salvà è aduprà bè. In tali affari, hè impurtante di mantene in mente a dependenza di e dati, soprattuttu s'ellu sò da diverse fonti. Quessi sò, sicuru, misure insufficienti per priservà l'integrità di e dati, ma in certi casi sò necessarii. Iè, è idling per occupà risorse hè ancu irrazionale.

Cusì, u nostru sensoru lanciarà i vertici successivi di u graficu s'ellu ci hè infurmazione fresca in u mail, è ancu marca l'infurmazioni previ cum'è irrilevanti.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Ricevemu è usemu dati

Per riceve è processà e dati, pudete scrive un operatore separatu, pudete aduprà quelli pronti. Siccomu a logica hè sempre triviale - per piglià dati da a lettera, per esempiu, suggerisce u standard PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Per via, se u vostru mail corporativu hè ancu in mail.ru, allora ùn puderete micca cercà lettere per sughjettu, mittente, etc. Torna in 2016, anu prumessu di presentà lu, ma apparentemente cambiatu a so mente. Aghju risoltu stu prublema creendu un cartulare separatu per e lettere necessarie è stabilisce un filtru per e lettere necessarie in l'interfaccia web di mail. Cusì, solu e lettere è e cundizioni necessarii per a ricerca, in u mo casu, simpricimenti (UNSEEN) entra in stu cartulare.

Riassuntu, avemu a sequenza seguente: cuntrollamu s'ellu ci sò novi lettere chì scontranu e cundizioni, se ci sò, allora scaricamu l'archiviu cù u ligame da l'ultima lettera.
Sottu à l'ultimi punti, hè omessu chì questu archiviu serà unpacked, i dati da l'archiviu seranu sbulicati è processati, è in u risultatu, tuttu ciò chì andarà più in u pipeline di u prucessu ETL, ma questu hè digià fora. u scopu di l'articulu. S'ellu hè statu interessante è utile, allora cuntinueraghju cun piacè à descriverà e soluzioni ETL è e so parti per Apache Airflow.

Source: www.habr.com

Add a comment