Postopek ETL za pridobivanje podatkov iz e-pošte v Apache Airflow

Postopek ETL za pridobivanje podatkov iz e-pošte v Apache Airflow

Ne glede na to, koliko se tehnologija razvija, za razvojem vedno sledi niz zastarelih pristopov. To je lahko posledica gladkega prehoda, človeških dejavnikov, tehnoloških potreb ali česa drugega. Na področju obdelave podatkov so v tem delu najbolj razvidni viri podatkov. Ne glede na to, koliko sanjamo, da bi se tega znebili, vendar se zaenkrat del podatkov pošilja v hitrih sporočilih in e-pošti, da ne omenjamo bolj arhaičnih formatov. Vabim vas, da razstavite eno od možnosti za Apache Airflow, ki ponazarja, kako lahko vzamete podatke iz e-pošte.

prazgodovina

Preko elektronske pošte se še vedno prenaša veliko podatkov, od medosebnih komunikacij do standardov interakcije med podjetji. Dobro je, če je mogoče napisati vmesnik za pridobivanje podatkov ali postaviti ljudi v pisarni, ki bodo te podatke vnašali v priročnejše vire, vendar pogosto to preprosto ni mogoče. Konkretna naloga, s katero sem se soočil, je bila povezava razvpitega sistema CRM s podatkovnim skladiščem in nato s sistemom OLAP. Zgodovinsko se je tako zgodilo, da je bila za naše podjetje uporaba tega sistema primerna na določenem področju poslovanja. Zato so vsi zelo želeli, da bi lahko operirali tudi s podatki iz tega sistema tretjih oseb. Najprej je bila seveda preučena možnost pridobivanja podatkov iz odprtega API-ja. Na žalost API ni pokrival pridobivanja vseh potrebnih podatkov in, poenostavljeno povedano, je bil v marsičem pokvarjen, tehnična podpora pa se ni želela ali mogla srečati na pol poti, da bi zagotovila celovitejšo funkcionalnost. Toda ta sistem je omogočil občasno prejemanje manjkajočih podatkov po pošti v obliki povezave za razkladanje arhiva.

Treba je opozoriti, da to ni bil edini primer, v katerem je podjetje želelo zbrati podatke iz elektronske pošte ali hitrih sporočil. Vendar v tem primeru ne moremo vplivati ​​na tretje podjetje, ki le na ta način posreduje del podatkov.

pretok zraka apache

Za gradnjo ETL procesov najpogosteje uporabljamo Apache Airflow. Da bo bralec, ki te tehnologije ne pozna, bolje razumel, kako izgleda v kontekstu in na splošno, bom opisal nekaj uvodnih.

Apache Airflow je brezplačna platforma, ki se uporablja za izdelavo, izvajanje in spremljanje procesov ETL (Extract-Transform-Loading) v Pythonu. Glavni koncept v Airflowu je usmerjen aciklični graf, kjer so oglišča grafa specifični procesi, robovi grafa pa tok nadzora ali informacij. Proces lahko preprosto pokliče katero koli funkcijo Python ali pa ima bolj zapleteno logiko zaradi zaporednega klicanja več funkcij v kontekstu razreda. Za najpogostejše operacije že obstaja veliko pripravljenih razvojnih rešitev, ki jih je mogoče uporabiti kot procese. Tak razvoj dogodkov vključuje:

  • operaterji - za prenos podatkov z enega mesta na drugega, na primer iz tabele podatkovne baze v podatkovno skladišče;
  • senzorji - za čakanje na nastop določenega dogodka in usmerjanje toka nadzora na naslednja oglišča grafa;
  • kavlji - za operacije na nižji ravni, na primer za pridobivanje podatkov iz tabele zbirke podatkov (uporablja se v stavkih);
  • itd

V tem članku bi bilo neprimerno podrobno opisovati Apache Airflow. Kratke predstavitve si lahko ogledate tukaj ali tukaj.

Kavelj za pridobivanje podatkov

Najprej moramo za rešitev problema napisati kljuko, s katero bi lahko:

  • povežite se z e-pošto
  • poiščite pravo črko
  • prejeti podatke iz pisma.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika je taka: povežemo se, poiščemo zadnjo najbolj relevantno črko, če so druge, jih ignoriramo. Ta funkcija se uporablja, ker kasnejša pisma vsebujejo vse podatke prejšnjih. Če temu ni tako, potem lahko vrnete matriko vseh črk ali obdelate prvo, ostale pa pri naslednjem prehodu. Na splošno je vse, kot vedno, odvisno od naloge.

Kljuku dodamo dve pomožni funkciji: za prenos datoteke in za prenos datoteke s povezavo iz e-pošte. Mimogrede, jih je mogoče premakniti k operaterju, odvisno od pogostosti uporabe te funkcionalnosti. Kaj še dodati kavlju, je spet odvisno od naloge: če so datoteke prejete takoj v pismu, potem lahko prenesete priloge k pismu, če so podatki prejeti v pismu, potem morate pismo razčleniti, itd. V mojem primeru je pismo opremljeno z eno povezavo do arhiva, ki jo moram postaviti na določeno mesto in začeti nadaljnjo obdelavo.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Koda je preprosta, zato skoraj ne potrebuje dodatne razlage. Povedal vam bom samo o čarobni vrstici imap_conn_id. Apache Airflow shrani parametre povezave (prijavo, geslo, naslov in druge parametre), do katerih lahko dostopate z identifikatorjem niza. Vizualno je upravljanje povezave videti takole

Postopek ETL za pridobivanje podatkov iz e-pošte v Apache Airflow

Senzor za čakanje na podatke

Ker se že znamo povezovati in prejemati podatke iz pošte, lahko zdaj napišemo senzor, ki jih čaka. V mojem primeru ni šlo takoj napisati operaterja, ki bo obdeloval podatke, če sploh, ker drugi procesi delujejo na podlagi prejetih podatkov iz pošte, tudi tisti, ki jemljejo sorodne podatke iz drugih virov (API, telefonija , spletne meritve itd.) itd.). Dal ti bom primer. V sistemu CRM se je pojavil nov uporabnik, za katerega UUID še vedno ne vemo. Takrat bomo ob poskusu prejemanja podatkov iz SIP telefonije prejeli klice, vezane na njen UUID, vendar jih ne bomo mogli shraniti in pravilno uporabiti. Pri takšnih zadevah je pomembno upoštevati odvisnost podatkov, še posebej, če so iz različnih virov. To seveda niso zadostni ukrepi za ohranitev celovitosti podatkov, so pa v nekaterih primerih nujni. Da, in prosti tek za zasedbo virov je tudi neracionalen.

Tako bo naš senzor sprožil naslednja oglišča grafa, če so v pošti sveže informacije, prejšnje informacije pa tudi označil kot nepomembne.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Podatke prejemamo in uporabljamo

Za prejemanje in obdelavo podatkov lahko napišete ločen operater, lahko uporabite že pripravljene. Ker je zaenkrat logika trivialna - za prevzem podatkov iz pisma, na primer, predlagam standardni PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Mimogrede, če je vaša poslovna pošta tudi na mail.ru, potem ne boste mogli iskati pisem po zadevi, pošiljatelju itd. Že leta 2016 so obljubljali, da ga bodo predstavili, a so si očitno premislili. To težavo sem rešil tako, da sem ustvaril ločeno mapo za potrebna pisma in nastavil filter za potrebna pisma v poštnem spletnem vmesniku. Tako pridejo v to mapo samo potrebne črke in pogoji za iskanje, v mojem primeru preprosto (NEVIDENO).

Če povzamemo, imamo naslednje zaporedje: preverimo, ali obstajajo nova pisma, ki izpolnjujejo pogoje, če obstajajo, nato prenesemo arhiv s povezavo iz zadnjega pisma.
Pod zadnjimi pikami je izpuščeno, da se bo ta arhiv razpakiral, podatki iz arhiva počistili in obdelali, posledično pa bo vse skupaj šlo naprej v cevovod procesa ETL, ampak to je že dlje. obseg članka. Če se je izkazalo za zanimivo in uporabno, bom z veseljem nadaljeval z opisom ETL rešitev in njihovih delov za Apache Airflow.

Vir: www.habr.com

Dodaj komentar