ETL proces za dobivanje podataka iz e-pošte u Apache Airflowu

ETL proces za dobivanje podataka iz e-pošte u Apache Airflowu

Bez obzira koliko se tehnologija razvijala, niz zastarjelih pristupa uvijek zaostaje za razvojem. To može biti posljedica glatke tranzicije, ljudskih faktora, tehnoloških potreba ili nečeg drugog. U području obrade podataka izvori podataka u ovom dijelu najviše otkrivaju. Koliko god sanjali da se toga riješimo, ali za sada se dio podataka šalje u instant messengerima i e-poštom, a da ne spominjemo arhaičnije formate. Pozivam vas da rastavite jednu od opcija za Apache Airflow, ilustrirajući kako možete uzeti podatke iz e-pošte.

prapovijest

Puno se podataka još uvijek prenosi e-poštom, od međuljudske komunikacije do standarda interakcije između tvrtki. Dobro je ako je moguće napisati sučelje za dobivanje podataka ili staviti ljude u ured koji će unositi te podatke u praktičnije izvore, ali često to jednostavno nije moguće. Konkretan zadatak s kojim sam se suočio bilo je povezivanje notornog CRM sustava sa skladištem podataka, a potom i sa OLAP sustavom. Povijesno se dogodilo da je za našu tvrtku korištenje ovog sustava bilo pogodno u određenom području poslovanja. Stoga su svi doista željeli moći raditi i s podacima iz ovog sustava treće strane. Prije svega, naravno, proučavana je mogućnost dobivanja podataka iz otvorenog API-ja. Nažalost, API nije pokrivao dobivanje svih potrebnih podataka, i, jednostavno rečeno, bio je u mnogočemu kriv, a tehnička podrška nije htjela ili mogla izaći u susret kako bi pružila sveobuhvatniju funkcionalnost. Ali ovaj je sustav pružio mogućnost povremenog primanja podataka koji nedostaju poštom u obliku poveznice za istovar arhive.

Valja napomenuti da ovo nije bio jedini slučaj u kojem je tvrtka htjela prikupiti podatke iz e-pošte ili instant messengera. Međutim, u ovom slučaju nismo mogli utjecati na tvrtku treće strane koja samo na ovaj način daje dio podataka.

protok zraka apache

Za izgradnju ETL procesa najčešće koristimo Apache Airflow. Kako bi čitatelj koji nije upoznat s ovom tehnologijom bolje razumio kako ona izgleda u kontekstu i općenito, opisat ću nekoliko uvodnih.

Apache Airflow je besplatna platforma koja se koristi za izgradnju, izvršavanje i nadzor ETL (Extract-Transform-Loading) procesa u Pythonu. Glavni koncept u Airflowu je usmjereni aciklički graf, gdje su vrhovi grafa specifični procesi, a rubovi grafa tok kontrole ili informacija. Proces može jednostavno pozvati bilo koju Python funkciju ili može imati složeniju logiku od uzastopnog pozivanja nekoliko funkcija u kontekstu klase. Za najčešće operacije već postoji mnogo gotovih razvoja koji se mogu koristiti kao procesi. Takav razvoj događaja uključuje:

  • operatori - za prijenos podataka s jednog mjesta na drugo, na primjer, iz tablice baze podataka u skladište podataka;
  • senzori - za čekanje pojave određenog događaja i usmjeravanje toka kontrole na sljedeće vrhove grafa;
  • kuke - za operacije niže razine, na primjer, za dobivanje podataka iz tablice baze podataka (koristi se u izjavama);
  • itd.

Bilo bi neprikladno detaljno opisivati ​​Apache Airflow u ovom članku. Kratke uvode možete pogledati здесь ili здесь.

Kuka za dobivanje podataka

Prije svega, da bismo riješili problem, moramo napisati kuku pomoću koje bismo mogli:

  • spojite se na e-poštu
  • pronaći pravo slovo
  • dobiti podatke iz pisma.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika je sljedeća: povezujemo se, nalazimo zadnje najrelevantnije slovo, ako ima drugih, ignoriramo ih. Ova se funkcija koristi jer kasnija pisma sadrže sve podatke ranijih. Ako to nije slučaj, onda možete vratiti niz svih slova ili obraditi prvo, a ostatak u sljedećem prolazu. Općenito, sve, kao i uvijek, ovisi o zadatku.

Dodali smo dvije pomoćne funkcije u kuku: za preuzimanje datoteke i za preuzimanje datoteke pomoću veze iz e-pošte. Usput, mogu se premjestiti operateru, ovisi o učestalosti korištenja ove funkcionalnosti. Što još dodati kuki, opet ovisi o zadatku: ako su datoteke primljene odmah u pismu, tada možete preuzeti privitke pismu, ako su podaci primljeni u pismu, tada trebate analizirati pismo, itd. U mom slučaju pismo dolazi s jednom poveznicom na arhivu koju trebam staviti na određeno mjesto i pokrenuti daljnju obradu.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kod je jednostavan, tako da ne treba dodatno objašnjenje. Reći ću vam samo o čarobnoj liniji imap_conn_id. Apache Airflow pohranjuje parametre veze (prijavu, lozinku, adresu i druge parametre) kojima se može pristupiti identifikatorom niza. Vizualno, upravljanje vezama izgleda ovako

ETL proces za dobivanje podataka iz e-pošte u Apache Airflowu

Senzor za čekanje podataka

Budući da već znamo kako se povezati i primati podatke iz pošte, sada možemo napisati senzor koji će ih čekati. U mom slučaju nije uspjelo odmah napisati operatora koji će obrađivati ​​podatke, ako ih ima, jer drugi procesi rade na temelju podataka primljenih iz pošte, uključujući i one koji uzimaju povezane podatke iz drugih izvora (API, telefonija , web metrika itd.). itd.). Dat ću vam primjer. Pojavio se novi korisnik u CRM sustavu, a još uvijek ne znamo njegov UUID. Tada ćemo pri pokušaju primanja podataka od SIP telefonije primati pozive vezane uz njen UUID, ali ih nećemo moći pravilno spremiti i koristiti. U takvim je stvarima važno imati na umu ovisnost podataka, osobito ako su iz različitih izvora. To su, naravno, nedovoljne mjere za očuvanje cjelovitosti podataka, ali u nekim slučajevima su neophodne. Da, i prazan hod radi zauzimanja resursa također je iracionalan.

Stoga će naš senzor pokrenuti sljedeće vrhove grafa ako u pošti ima svježih informacija, a također će prethodne informacije označiti kao nevažne.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Primamo i koristimo podatke

Za primanje i obradu podataka možete napisati zasebni operator, možete koristiti gotove. Budući da je logika još uvijek trivijalna - za uzimanje podataka iz pisma, na primjer, predlažem standardni PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Usput, ako je vaša korporativna pošta također na mail.ru, tada nećete moći pretraživati ​​pisma prema predmetu, pošiljatelju itd. Još 2016. obećali su ga uvesti, ali su se očito predomislili. Riješio sam ovaj problem tako što sam napravio zasebnu mapu za potrebna pisma i postavio filter za potrebna slova u web sučelju pošte. Dakle, samo potrebna slova i uvjeti za pretragu, u mom slučaju jednostavno (NEVIĐENO) ulaze u ovu mapu.

Ukratko, imamo sljedeći redoslijed: provjeravamo postoje li nova pisma koja ispunjavaju uvjete, ako postoje, preuzimamo arhivu pomoću veze iz posljednjeg pisma.
Pod zadnjim točkama je izostavljeno da će se ova arhiva raspakirati, podaci iz arhive će se očistiti i obraditi, te će kao rezultat toga cijela stvar ići dalje u cjevovod ETL procesa, ali to je već preko opseg članka. Ako se pokazalo zanimljivim i korisnim, rado ću nastaviti opisivati ​​ETL rješenja i njihove dijelove za Apache Airflow.

Izvor: www.habr.com

Dodajte komentar