ETL proces za dobijanje podataka iz e-pošte u Apache Airflow

ETL proces za dobijanje podataka iz e-pošte u Apache Airflow

Bez obzira na to koliko se tehnologija razvija, niz zastarjelih pristupa uvijek zaostaje za razvojem. To može biti zbog glatke tranzicije, ljudskih faktora, tehnoloških potreba ili nečeg drugog. U oblasti obrade podataka izvori podataka su u ovom dijelu najizrazitiji. Koliko god sanjali da se toga riješimo, ali za sada se dio podataka šalje u instant messengerima i mejlovima, da ne govorimo o arhaičnijim formatima. Pozivam vas da rastavite jednu od opcija za Apache Airflow, ilustrujući kako možete preuzeti podatke iz e-pošte.

prapovijest

Mnogo podataka se i dalje prenosi putem e-pošte, od međuljudske komunikacije do standarda interakcije između kompanija. Dobro je ako je moguće napisati interfejs za dobijanje podataka ili staviti ljude u kancelariju koji će te informacije unositi u pogodnije izvore, ali često to jednostavno nije moguće. Konkretan zadatak sa kojim sam se suočio bio je povezivanje ozloglašenog CRM sistema sa skladištem podataka, a zatim i sa OLAP sistemom. Istorijski se dogodilo da je za našu kompaniju upotreba ovog sistema bila zgodna u određenoj oblasti poslovanja. Stoga su svi zaista željeli da mogu raditi i sa podacima iz ovog sistema treće strane. Prije svega, naravno, proučavana je mogućnost dobivanja podataka iz otvorenog API-ja. Nažalost, API nije pokrivao dobijanje svih potrebnih podataka, i, jednostavno rečeno, bio je na mnogo načina kriv, a tehnička podrška nije htela ili nije mogla da izađe u susret kako bi pružila sveobuhvatniju funkcionalnost. Ali ovaj sistem je pružio mogućnost da se podaci koji nedostaju povremeno primaju poštom u obliku veze za istovar arhive.

Treba napomenuti da ovo nije bio jedini slučaj u kojem je kompanija željela prikupiti podatke iz e-mailova ili instant messengera. Međutim, u ovom slučaju nismo mogli utjecati na treću kompaniju koja samo na ovaj način daje dio podataka.

apache airflow

Za izgradnju ETL procesa najčešće koristimo Apache Airflow. Kako bi čitatelj koji nije upoznat s ovom tehnologijom bolje razumio kako ona izgleda u kontekstu i općenito, opisati ću nekoliko uvodnih.

Apache Airflow je besplatna platforma koja se koristi za izgradnju, izvršavanje i praćenje ETL (Extract-Transform-Loading) procesa u Pythonu. Glavni koncept u Airflow-u je usmjereni aciklični graf, gdje su vrhovi grafa specifični procesi, a ivice grafa su tok kontrole ili informacija. Proces može jednostavno pozvati bilo koju Python funkciju ili može imati složeniju logiku od sekvencijalnog pozivanja nekoliko funkcija u kontekstu klase. Za najčešće operacije već postoji mnogo gotovih razvoja koji se mogu koristiti kao procesi. Takvi razvoji uključuju:

  • operatori - za prenos podataka sa jednog mesta na drugo, na primer, iz tabele baze podataka u skladište podataka;
  • senzori - za čekanje na pojavu određenog događaja i usmjeravanje toka kontrole na naredne vrhove grafa;
  • kukice - za operacije nižeg nivoa, na primjer, za dobivanje podataka iz tablice baze podataka (koristi se u izjavama);
  • i tako dalje.

Bilo bi neprikladno detaljno opisati Apache Airflow u ovom članku. Kratke uvode možete pogledati ovdje ili ovdje.

Kuka za dobijanje podataka

Prije svega, da bismo riješili problem, moramo napisati udicu s kojom bismo mogli:

  • povežite se na email
  • pronađite pravo slovo
  • primati podatke iz pisma.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika je sledeća: povezujemo se, nalazimo poslednje najrelevantnije slovo, ako ima drugih, ignorišemo ih. Ova funkcija se koristi jer kasnija slova sadrže sve podatke ranijih. Ako to nije slučaj, onda možete vratiti niz svih slova ili obraditi prvo, a ostatak na sljedećem prolazu. Općenito, sve, kao i uvijek, ovisi o zadatku.

Dodamo dvije pomoćne funkcije na kuku: za preuzimanje datoteke i za preuzimanje datoteke pomoću veze iz e-pošte. Usput, mogu se premjestiti kod operatera, ovisi o učestalosti korištenja ove funkcionalnosti. Šta još dodati na kuku, opet, ovisi o zadatku: ako se datoteke odmah primaju u pismu, tada možete preuzeti priloge pismu, ako su podaci primljeni u pismu, onda morate raščlaniti pismo, itd. U mom slučaju pismo dolazi sa jednom vezom do arhive koju trebam staviti na određeno mjesto i krenuti u dalju obradu.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kod je jednostavan, tako da mu nije potrebno dodatno objašnjenje. Reći ću vam samo o magičnoj liniji imap_conn_id. Apache Airflow pohranjuje parametre veze (prijava, lozinka, adresa i drugi parametri) kojima se može pristupiti pomoću identifikatora niza. Vizuelno, upravljanje vezama izgleda ovako

ETL proces za dobijanje podataka iz e-pošte u Apache Airflow

Senzor čeka podatke

Pošto već znamo kako se povezati i primati podatke iz pošte, sada možemo napisati senzor da ih čeka. U mom slučaju nije išlo odmah napisati operatera koji će obraditi podatke, ako ih ima, jer drugi procesi rade na osnovu podataka primljenih iz pošte, uključujući i one koji preuzimaju povezane podatke iz drugih izvora (API, telefonija , web metrike, itd.) itd.). Dat ću vam primjer. U CRM sistemu se pojavio novi korisnik, a još uvijek ne znamo za njegov UUID. Tada ćemo, prilikom pokušaja primanja podataka od SIP telefonije, primati pozive vezane za njen UUID, ali ih nećemo moći ispravno pohraniti i koristiti. U takvim stvarima važno je imati na umu ovisnost podataka, posebno ako su iz različitih izvora. Ovo su, naravno, nedovoljne mjere za očuvanje integriteta podataka, ali su u nekim slučajevima neophodne. Da, a neracionalno je i neracionalno zauzimanje resursa.

Tako će naš senzor pokrenuti sljedeće vrhove grafa ako u mailu ima svježih informacija, a također će prethodnu informaciju označiti kao nebitnu.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Primamo i koristimo podatke

Za primanje i obradu podataka možete napisati poseban operator, možete koristiti gotove. Pošto je do sada logika trivijalna - da uzmete podatke iz pisma, na primjer, predlažem standardni PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Usput, ako je vaša korporativna pošta također na mail.ru, tada nećete moći tražiti pisma po temi, pošiljaocu itd. Još 2016. obećali su da će ga uvesti, ali su se očigledno predomislili. Ovaj problem sam riješio kreiranjem posebnog foldera za potrebna slova i postavljanjem filtera za potrebna slova u web interfejsu pošte. Dakle, samo potrebna slova i uslovi za pretragu, u mom slučaju, jednostavno (NEVIĐENO) ulaze u ovaj folder.

Sumirajući, imamo sljedeći redoslijed: provjeravamo da li postoje nova pisma koja ispunjavaju uslove, ako ih ima, onda preuzimamo arhivu koristeći vezu iz posljednjeg pisma.
Ispod zadnjih tačaka je izostavljeno da će se ova arhiva raspakirati, podaci iz arhive obrisati i obraditi, a kao rezultat toga, cijela stvar će ići dalje do cevovoda ETL procesa, ali to je već izvan granica obim članka. Ako je ispalo zanimljivo i korisno, onda ću rado nastaviti opisivati ​​ETL rješenja i njihove dijelove za Apache Airflow.

izvor: www.habr.com

Dodajte komentar