ETL-prosessi tietojen saamiseksi sähköpostista Apache Airflowssa

ETL-prosessi tietojen saamiseksi sähköpostista Apache Airflowssa

Riippumatta siitä, kuinka paljon teknologia kehittyy, kehityksen takana on aina joukko vanhentuneita lähestymistapoja. Tämä voi johtua sujuvasta siirtymisestä, inhimillisistä tekijöistä, teknologisista tarpeista tai jostain muusta. Tietojenkäsittelyn alalla tietolähteet ovat tässä osassa paljastavimpia. Vaikka kuinka haaveilemme päästä eroon tästä, mutta toistaiseksi osa tiedoista lähetetään pikaviestinnässä ja sähköpostissa, puhumattakaan arkaaisemmista muodoista. Kehotan sinua purkamaan yhden Apache Airflown vaihtoehdoista, mikä havainnollistaa, kuinka voit ottaa tietoja sähköposteista.

esihistoria

Sähköpostin kautta siirretään edelleen paljon dataa ihmisten välisestä viestinnästä yritysten välisen vuorovaikutuksen standardeihin. On hyvä, jos on mahdollista kirjoittaa käyttöliittymä tietojen saamiseksi tai laittaa toimistoon ihmisiä, jotka syöttävät nämä tiedot kätevämpiin lähteisiin, mutta usein tämä ei yksinkertaisesti ole mahdollista. Erityinen tehtäväni oli yhdistää pahamaineinen CRM-järjestelmä tietovarastoon ja sitten OLAP-järjestelmään. Historiallisesti kävi niin, että yrityksellemme tämän järjestelmän käyttö oli kätevää tietyllä liiketoiminta-alueella. Siksi kaikki todella halusivat pystyä käyttämään tietoja myös tämän kolmannen osapuolen järjestelmästä. Ensinnäkin tietysti tutkittiin mahdollisuutta saada tietoa avoimesta API:sta. Valitettavasti API ei kattanut kaikkien tarvittavien tietojen saamista, ja yksinkertaisesti sanottuna se oli monella tapaa vino, eikä tekninen tuki halunnut tai pystynyt puoliväliin tarjoamaan kattavampia toimintoja. Mutta tämä järjestelmä tarjosi mahdollisuuden saada ajoittain puuttuvat tiedot postitse linkin muodossa arkiston purkamiseen.

On huomattava, että tämä ei ollut ainoa tapaus, jossa yritys halusi kerätä tietoja sähköposteista tai pikaviestinnästä. Tässä tapauksessa emme kuitenkaan voi vaikuttaa kolmannen osapuolen yritykseen, joka toimittaa osan tiedoista vain tällä tavalla.

apache-ilmavirta

ETL-prosessien rakentamiseen käytämme useimmiten Apache Airflowta. Jotta lukija, joka ei ole perehtynyt tähän tekniikkaan, ymmärtäisi paremmin, miltä se näyttää kontekstissa ja yleensäkin, kuvailen pari johdantoa.

Apache Airflow on ilmainen alusta, jota käytetään ETL-prosessien (Extract-Transform-Loading) rakentamiseen, suorittamiseen ja valvontaan Pythonissa. Ilmavirran pääkonsepti on suunnattu asyklinen graafi, jossa graafin kärjet ovat tiettyjä prosesseja ja graafin reunat ovat ohjaus- tai informaatiovirta. Prosessi voi yksinkertaisesti kutsua mitä tahansa Python-funktiota, tai sillä voi olla monimutkaisempi logiikka kutsumalla useita funktioita peräkkäin luokan kontekstissa. Useimmille toiminnoille on jo olemassa monia valmiita kehityskohteita, joita voidaan käyttää prosesseina. Tällaisia ​​kehityskulkuja ovat mm.

  • operaattorit - tiedon siirtämiseen paikasta toiseen, esimerkiksi tietokantataulukosta tietovarastoon;
  • anturit - odottamaan tietyn tapahtuman esiintymistä ja ohjaamaan ohjausvirtaa graafin seuraaviin kärkipisteisiin;
  • koukut - alemman tason toimintoihin, esimerkiksi tietojen saamiseksi tietokantataulukosta (käytetään lausekkeissa);
  • jne.

Ei olisi asianmukaista kuvata Apache Airflowa yksityiskohtaisesti tässä artikkelissa. Lyhyet esittelyt ovat katsottavissa täällä tai täällä.

Koukku tietojen hankkimiseen

Ensinnäkin ongelman ratkaisemiseksi meidän on kirjoitettava koukku, jolla voisimme:

  • muodosta yhteys sähköpostiin
  • löytää oikea kirjain
  • vastaanottaa tietoja kirjeestä.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logiikka on tämä: muodostamme yhteyden, etsimme viimeisen merkityksellisin kirjaimen, jos muita on, jätämme ne huomiotta. Tätä toimintoa käytetään, koska myöhemmät kirjaimet sisältävät kaikki aikaisempien kirjainten tiedot. Jos näin ei ole, voit palauttaa joukon kaikista kirjaimista tai käsitellä ensimmäisen ja loput seuraavassa kirjaimessa. Yleensä kaikki, kuten aina, riippuu tehtävästä.

Lisäämme koukkuun kaksi aputoimintoa: tiedoston lataamiseen ja tiedoston lataamiseen sähköpostin linkin kautta. Muuten, ne voidaan siirtää operaattorille, se riippuu tämän toiminnon käyttötiheydestä. Mitä muuta koukkuun lisätään, riippuu jälleen tehtävästä: jos tiedostot vastaanotetaan heti kirjeeseen, voit ladata liitteitä kirjeeseen, jos tiedot vastaanotetaan kirjeessä, sinun on jäsennettävä kirje, jne. Minun tapauksessani kirjeessä on yksi linkki arkistoon, joka minun on laitettava tiettyyn paikkaan ja aloitettava jatkokäsittely.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Koodi on yksinkertainen, joten se tuskin tarvitsee lisäselvitystä. Kerron vain maagisesta linjasta imap_conn_id. Apache Airflow tallentaa yhteysparametrit (kirjautumistunnus, salasana, osoite ja muut parametrit), joita voidaan käyttää merkkijonotunnisteella. Visuaalisesti yhteydenhallinta näyttää tältä

ETL-prosessi tietojen saamiseksi sähköpostista Apache Airflowssa

Anturi odottaa tietoja

Koska osaamme jo muodostaa yhteyden ja vastaanottaa tietoja postista, voimme nyt kirjoittaa anturin odottamaan niitä. Minun tapauksessani ei onnistunut kirjoittaa heti operaattoria, joka käsittelee tiedot, jos niitä on, koska muut prosessit toimivat postista saatujen tietojen perusteella, mukaan lukien ne, jotka ottavat asiaan liittyvää dataa muista lähteistä (API, puhelin). , verkkotiedot jne.). jne.). Annan sinulle esimerkin. CRM-järjestelmään on ilmaantunut uusi käyttäjä, jonka UUID:stä ei ole vielä tietoa. Sitten kun yritämme vastaanottaa tietoja SIP-puhelimesta, saamme sen UUID-tunnukseen sidotut puhelut, mutta emme voi tallentaa ja käyttää niitä oikein. Tällaisissa asioissa on tärkeää pitää mielessä tietojen riippuvuus, varsinkin jos ne ovat peräisin eri lähteistä. Nämä ovat tietysti riittämättömiä toimenpiteitä tietojen eheyden säilyttämiseksi, mutta joissain tapauksissa ne ovat välttämättömiä. Kyllä, ja joutokäynti resurssien käyttämiseksi on myös järjetöntä.

Siten anturimme käynnistää graafin myöhemmät kärjet, jos postissa on tuoretta tietoa, ja merkitsee myös edelliset tiedot merkityksettömiksi.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Vastaanotamme ja käytämme dataa

Tietojen vastaanottamiseen ja käsittelyyn voit kirjoittaa erillisen operaattorin, voit käyttää valmiita. Koska logiikka on toistaiseksi triviaali - esimerkiksi tiedon ottamiseksi kirjeestä, ehdotan tavallista PythonOperatoria

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Muuten, jos yrityspostisi on myös osoitteessa mail.ru, et voi etsiä kirjeitä aiheen, lähettäjän jne. mukaan. Vuonna 2016 he lupasivat ottaa sen käyttöön, mutta ilmeisesti muuttivat mielensä. Ratkaisin tämän ongelman luomalla erillisen kansion tarvittaville kirjeille ja asettamalla tarvittaville kirjaimille suodattimen sähköpostin verkkokäyttöliittymään. Siten tähän kansioon pääsevät vain hakuun tarvittavat kirjaimet ja ehdot, minun tapauksessani yksinkertaisesti (NÄYTTÄMÄTTÖMÄN).

Yhteenvetona meillä on seuraava järjestys: tarkistamme, onko uusia ehdot täyttäviä kirjeitä, jos niitä on, lataamme arkiston viimeisen kirjeen linkin avulla.
Viimeisten pisteiden alta on jätetty pois, että tämä arkisto puretaan, arkistosta tulevat tiedot tyhjennetään ja käsitellään, minkä seurauksena koko juttu menee edelleen ETL-prosessin putkilinjaan, mutta tämä on jo ohi. artikkelin laajuus. Jos se osoittautui mielenkiintoiseksi ja hyödylliseksi, jatkan mielelläni ETL-ratkaisujen ja niiden osien kuvaamista Apache Airflow:lle.

Lähde: will.com

Lisää kommentti