Procesul ETL pentru obținerea datelor din e-mail în Apache Airflow

Procesul ETL pentru obținerea datelor din e-mail în Apache Airflow

Indiferent cât de mult se dezvoltă tehnologia, un șir de abordări învechite se află întotdeauna în urma dezvoltării. Acest lucru se poate datora unei tranziții ușoare, factori umani, nevoi tehnologice sau altceva. În domeniul prelucrării datelor, sursele de date sunt cele mai revelatoare din această parte. Oricât de mult visăm să scăpăm de asta, dar până acum o parte din date sunt trimise în mesagerie instant și e-mailuri, ca să nu mai vorbim de formate mai arhaice. Vă invit să dezasamblați una dintre opțiunile pentru Apache Airflow, ilustrând modul în care puteți prelua date din e-mailuri.

preistorie

O mulțime de date sunt încă transferate prin e-mail, de la comunicări interpersonale la standarde de interacțiune între companii. Este bine dacă este posibil să scrieți o interfață pentru a obține date sau să puneți oameni în birou care vor introduce aceste informații în surse mai convenabile, dar de multe ori acest lucru este posibil să nu fie posibil. Sarcina specifică cu care m-am confruntat a fost conectarea notoriului sistem CRM la depozitul de date și apoi la sistemul OLAP. S-a întâmplat din punct de vedere istoric că pentru compania noastră utilizarea acestui sistem a fost convenabilă într-un anumit domeniu de afaceri. Prin urmare, toată lumea și-a dorit foarte mult să poată opera și cu date din acest sistem terță parte. În primul rând, desigur, a fost studiată posibilitatea obținerii datelor dintr-un API deschis. Din păcate, API-ul nu a acoperit obținerea tuturor datelor necesare și, în termeni simpli, a fost în multe privințe greșit, iar suportul tehnic nu a dorit sau nu a putut să se întâlnească la jumătatea drumului pentru a oferi funcționalități mai cuprinzătoare. Dar acest sistem a oferit posibilitatea de a primi periodic datele lipsă prin poștă sub forma unui link pentru descărcarea arhivei.

De menționat că acesta nu a fost singurul caz în care afacerea a dorit să colecteze date din e-mailuri sau mesagerie instant. Cu toate acestea, în acest caz, nu am putea influența o companie terță care furnizează o parte din date doar în acest mod.

Flux de aer Apache

Pentru a construi procese ETL, cel mai adesea folosim Apache Airflow. Pentru ca un cititor care nu este familiarizat cu această tehnologie să înțeleagă mai bine cum arată ea în context și în general, voi descrie câteva introductive.

Apache Airflow este o platformă gratuită care este utilizată pentru a construi, executa și monitoriza procesele ETL (Extract-Transform-Loading) în Python. Conceptul principal în Airflow este un grafic aciclic direcționat, în care vârfurile graficului sunt procese specifice, iar marginile graficului sunt fluxul de control sau informații. Un proces poate apela pur și simplu orice funcție Python sau poate avea o logică mai complexă din apelarea secvențială a mai multor funcții în contextul unei clase. Pentru cele mai frecvente operațiuni, există deja multe dezvoltări gata făcute care pot fi folosite ca procese. Astfel de evoluții includ:

  • operatori - pentru transferul de date dintr-un loc în altul, de exemplu, dintr-un tabel de bază de date într-un depozit de date;
  • senzori - pentru așteptarea apariției unui anumit eveniment și direcționarea fluxului de control către vârfurile ulterioare ale graficului;
  • cârlige - pentru operațiuni de nivel inferior, de exemplu, pentru a obține date dintr-un tabel de bază de date (utilizat în instrucțiuni);
  • etc

Ar fi nepotrivit să descriem Apache Airflow în detaliu în acest articol. Scurte introduceri pot fi vizualizate aici sau aici.

Cârlig pentru obținerea datelor

În primul rând, pentru a rezolva problema, trebuie să scriem un cârlig cu care am putea:

  • conectați-vă la e-mail
  • găsiți litera potrivită
  • primi date din scrisoare.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logica este aceasta: ne conectăm, găsim ultima literă cea mai relevantă, dacă sunt altele, le ignorăm. Această funcție este folosită, deoarece literele ulterioare conțin toate datele celor anterioare. Dacă nu este cazul, atunci puteți returna o serie de toate literele sau puteți procesa prima, iar restul la următoarea trecere. În general, totul, ca întotdeauna, depinde de sarcină.

Adăugăm două funcții auxiliare la cârlig: pentru descărcarea unui fișier și pentru descărcarea unui fișier folosind un link dintr-un e-mail. Apropo, acestea pot fi mutate către operator, depinde de frecvența de utilizare a acestei funcționalități. Ce altceva să adăugați la cârlig, din nou, depinde de sarcină: dacă fișierele sunt primite imediat în scrisoare, atunci puteți descărca atașamente la scrisoare, dacă datele sunt primite în scrisoare, atunci trebuie să analizați scrisoarea, etc. În cazul meu, scrisoarea vine cu un link către arhivă, pe care trebuie să îl pun într-un anumit loc și să încep procesul de procesare ulterioară.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Codul este simplu, deci nu are nevoie de explicații suplimentare. Îți voi spune doar despre linia magică imap_conn_id. Apache Airflow stochează parametrii de conectare (login, parolă, adresă și alți parametri) care pot fi accesați printr-un identificator de șir. Din punct de vedere vizual, gestionarea conexiunii arată astfel

Procesul ETL pentru obținerea datelor din e-mail în Apache Airflow

Senzor pentru a aștepta date

Deoarece știm deja cum să ne conectăm și să primim date din e-mail, acum putem scrie un senzor pentru a le aștepta. În cazul meu, nu a funcționat să scriu imediat un operator care să prelucreze datele, dacă este cazul, pentru că alte procese funcționează pe baza datelor primite din mail, inclusiv cele care preiau date aferente din alte surse (API, telefonie). , metrici web etc.) etc.). Vă dau un exemplu. Un nou utilizator a apărut în sistemul CRM și încă nu știm despre UUID-ul său. Apoi, când vom încerca să primim date de la telefonia SIP, vom primi apeluri legate de UUID-ul acestuia, dar nu le vom putea salva și folosi corect. În astfel de chestiuni, este important să țineți cont de dependența datelor, mai ales dacă acestea provin din surse diferite. Acestea sunt, desigur, măsuri insuficiente pentru a păstra integritatea datelor, dar în unele cazuri sunt necesare. Da, iar mersul în gol pentru a ocupa resurse este, de asemenea, irațional.

Astfel, senzorul nostru va lansa nodurile ulterioare ale graficului dacă există informații noi în e-mail și va marca, de asemenea, informațiile anterioare ca irelevante.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Primim și folosim date

Pentru a primi și procesa date, puteți scrie un operator separat, puteți folosi unul gata făcut. Deoarece logica este încă banală - pentru a prelua date din scrisoare, de exemplu, sugerez standardul PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Apropo, dacă e-mailul dvs. corporativ este și pe mail.ru, atunci nu veți putea căuta scrisori după subiect, expeditor etc. În 2016, ei au promis că îl vor introduce, dar se pare că s-au răzgândit. Am rezolvat această problemă prin crearea unui folder separat pentru literele necesare și configurarea unui filtru pentru literele necesare în interfața web de e-mail. Astfel, doar literele și condițiile necesare căutării, în cazul meu, pur și simplu (NEVĂZUT) intră în acest folder.

Rezumând, avem următoarea secvență: verificăm dacă există litere noi care îndeplinesc condițiile, dacă există, apoi descarcăm arhiva folosind link-ul din ultima literă.
Sub ultimele puncte, se omite faptul că această arhivă va fi dezambalată, datele din arhivă vor fi șterse și procesate și, ca urmare, totul va merge mai departe la conducta procesului ETL, dar acest lucru este deja dincolo. domeniul de aplicare al articolului. Dacă s-a dovedit interesant și util, atunci voi continua cu plăcere să descriu soluțiile ETL și părțile lor pentru Apache Airflow.

Sursa: www.habr.com

Adauga un comentariu