ETL-folyamat az e-mailekből származó adatok lekéréséhez az Apache Airflow-ban

ETL-folyamat az e-mailekből származó adatok lekéréséhez az Apache Airflow-ban

Nem számít, mennyit fejlődik a technológia, a fejlődés mögött mindig egy sor elavult megközelítés húzódik meg. Ennek oka lehet egy zökkenőmentes átállás, emberi tényezők, technológiai igények vagy valami más. Az adatfeldolgozás területén ebben a részben az adatforrások a legfeltáróbbak. Bármennyire is álmodozunk, hogy megszabaduljunk ettől, de eddig az adatok egy része azonnali üzenetküldőben és emailben érkezik, nem beszélve az archaikusabb formátumokról. Arra kérem, hogy szedje szét az Apache Airflow egyik opcióját, bemutatva, hogyan veheti át az adatokat az e-mailekből.

őstörténet

Továbbra is sok adatot továbbítanak e-mailben, az interperszonális kommunikációtól a vállalatok közötti interakció normáiig. Jó, ha lehet írni egy felületet az adatok beszerzéséhez, vagy olyan embereket helyezni az irodába, akik kényelmesebb forrásokba beírják ezeket az információkat, de gyakran ez egyszerűen nem lehetséges. A konkrét feladat, amellyel szembesültem, a hírhedt CRM rendszer összekapcsolása volt az adattárral, majd az OLAP rendszerrel. Történelmileg megtörtént, hogy cégünk számára ennek a rendszernek a használata kényelmes volt az üzlet egy bizonyos területén. Ezért mindenki nagyon szerette volna, ha ebből a külső rendszerből származó adatokkal is tud működni. Mindenekelőtt természetesen a nyílt API-ból való adatszerzés lehetőségét vizsgálták. Sajnos az API nem fedte le az összes szükséges adat megszerzését, és leegyszerűsítve sok szempontból ferde volt, a technikai támogatás pedig nem akart, vagy félúton nem tudott megfelelni az átfogóbb funkcionalitás biztosításának. De ez a rendszer lehetőséget biztosított arra, hogy a hiányzó adatokat rendszeresen postai úton kapja meg az archívum kirakására szolgáló hivatkozás formájában.

Meg kell jegyezni, hogy nem ez volt az egyetlen eset, amikor a vállalkozás e-mailekből vagy azonnali üzenetküldőkből akart adatokat gyűjteni. Ebben az esetben azonban nem tudtunk olyan külső céget befolyásolni, amely csak ilyen módon adja át az adatok egy részét.

apache légáramlás

Az ETL-folyamatok felépítéséhez leggyakrabban Apache Airflow-t használunk. Annak érdekében, hogy egy olvasó, aki nem ismeri ezt a technológiát, jobban megértse, hogyan néz ki a szövegkörnyezetben és általában, leírok néhány bevezetőt.

Az Apache Airflow egy ingyenes platform, amely az ETL (Extract-Transform-Loading) folyamatok felépítésére, végrehajtására és figyelésére szolgál Pythonban. Az Airflow fő fogalma egy irányított aciklikus gráf, ahol a gráf csúcsai specifikus folyamatok, a gráf élei pedig a vezérlés vagy az információáramlás. Egy folyamat egyszerűen meghívhat bármilyen Python-függvényt, vagy bonyolultabb logikája lehet több függvény szekvenciális meghívásának egy osztály kontextusában. A leggyakrabban előforduló műveletekhez már számos kész fejlesztés áll rendelkezésre, amelyek folyamatként használhatók. Ilyen fejlesztések a következők:

  • operátorok - adatok átvitelére egyik helyről a másikra, például egy adatbázistáblából egy adattárházba;
  • érzékelők - egy bizonyos esemény bekövetkezésének kivárására és a vezérlés áramlásának a gráf következő csúcsaira irányítására;
  • horgok - alacsonyabb szintű műveletekhez, például adatok lekéréséhez adatbázis-táblázatból (az utasításokban használatos);
  • stb

Nem lenne helyénvaló részletesen ismertetni az Apache Airflow-t ebben a cikkben. A rövid bemutatkozások megtekinthetők itt vagy itt.

Kapocs az adatok lekéréséhez

Először is, a probléma megoldásához írnunk kell egy horogot, amellyel:

  • csatlakozni az e-mailhez
  • megtalálni a megfelelő betűt
  • adatokat kapni a levélből.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

A logika a következő: kapcsolódunk, megkeressük az utolsó legrelevánsabb betűt, ha van még más, figyelmen kívül hagyjuk. Ezt a funkciót azért használjuk, mert a későbbi betűk a korábbiak összes adatát tartalmazzák. Ha nem ez a helyzet, akkor visszaadhatja az összes betű tömbjét, vagy feldolgozhatja az elsőt, a többit pedig a következő lépésben. Általában minden, mint mindig, a feladattól függ.

Két kiegészítő funkciót adunk a horoghoz: egy fájl letöltéséhez és egy fájl letöltéséhez egy e-mailben található hivatkozás segítségével. Egyébként áthelyezhetők a kezelőhöz, ez a funkció használatának gyakoriságától függ. Mit kell még hozzátenni a horoghoz, az megint a feladattól függ: ha a levélben azonnal megérkeznek a fájlok, akkor letöltheti a levél mellékleteit, ha az adatok a levélben érkeznek, akkor elemezni kell a levelet, stb. Az én esetemben a levélhez tartozik egy hivatkozás az archívumhoz, amit egy bizonyos helyre kell elhelyeznem, és elindítani a további feldolgozási folyamatot.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

A kód egyszerű, így aligha igényel további magyarázatot. Csak mesélek az imap_conn_id varázsvonalról. Az Apache Airflow a kapcsolati paramétereket (bejelentkezési név, jelszó, cím és egyéb paraméterek) tárolja, amelyeket karakterlánc-azonosítóval lehet elérni. Vizuálisan a kapcsolatkezelés így néz ki

ETL-folyamat az e-mailekből származó adatok lekéréséhez az Apache Airflow-ban

Érzékelő várja az adatokat

Mivel már tudjuk, hogyan kell csatlakozni és adatokat fogadni a levelekből, most már írhatunk egy érzékelőt, amely várja őket. Az én esetemben nem ment azonnal operátort írni, aki feldolgozza az adatokat, ha vannak ilyenek, mert más folyamatok a levélből kapott adatok alapján működnek, beleértve azokat is, amelyek más forrásokból (API, telefon) veszik át a kapcsolódó adatokat. , webes mutatók stb.). stb.). Mondok egy példát. Új felhasználó jelent meg a CRM rendszerben, akinek az UUID-jét továbbra sem tudjuk. Ekkor, amikor a SIP telefonról próbálunk adatokat fogadni, annak UUID-jához kötött hívásokat fogunk fogadni, de nem tudjuk őket megfelelően elmenteni és használni. Ilyen esetekben fontos szem előtt tartani az adatok függőségét, különösen, ha azok különböző forrásokból származnak. Ezek természetesen nem elegendőek az adatok integritásának megőrzéséhez, de bizonyos esetekben szükségesek. Igen, és az erőforrások lefoglalásának alapjárata szintén irracionális.

Így a szenzorunk elindítja a gráf következő csúcsait, ha friss információ van a levélben, és az előző információt is irrelevánsnak jelöli.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Adatokat fogadunk és használunk

Az adatok fogadásához és feldolgozásához külön operátort írhat, használhat készeket. Mivel a logika még mindig triviális - például a levélből való adatok vételéhez, javaslom a szabványos PythonOperatort

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Mellesleg, ha a vállalati levelei a mail.ru-n is megtalálhatók, akkor nem fog tudni keresni a leveleket tárgy, feladó stb. szerint. Még 2016-ban megígérték, hogy bevezetik, de láthatóan meggondolták magukat. Ezt a problémát úgy oldottam meg, hogy a szükséges leveleknek külön mappát készítettem, és a levelező webes felületén beállítottam egy szűrőt a szükséges levelekre. Így csak a kereséshez szükséges betűk és feltételek, esetemben egyszerűen (NEM LÁTVA) kerülnek ebbe a mappába.

Összegezve a következő sorrendet kapjuk: ellenőrizzük, hogy vannak-e új, a feltételeknek megfelelő levelek, ha vannak, akkor az utolsó levél hivatkozása segítségével letöltjük az archívumot.
Az utolsó pontok alatt kimarad, hogy ezt az archívumot kicsomagolják, az archívumból származó adatok törlésre és feldolgozásra kerüljenek, és ennek eredményeként az egész továbbmegy az ETL folyamat csővezetékébe, de ez már túl van a cikk terjedelmét. Ha érdekesnek és hasznosnak bizonyult, akkor szívesen folytatom az Apache Airflow ETL megoldásainak és alkatrészeinek ismertetését.

Forrás: will.com

Hozzászólás