ETL-proses om data vanaf e-pos in Apache Airflow te kry

ETL-proses om data vanaf e-pos in Apache Airflow te kry

Maak nie saak hoeveel tegnologie ontwikkel nie, 'n string verouderde benaderings volg altyd agter ontwikkeling. Dit kan wees as gevolg van 'n gladde oorgang, menslike faktore, tegnologiese behoeftes, of iets anders. Op die gebied van dataverwerking is databronne die mees onthullende in hierdie deel. Maak nie saak hoeveel ons daarvan droom om daarvan ontslae te raak nie, maar tot dusver word 'n deel van die data in kitsboodskappe en e-posse gestuur, om nie eers te praat van meer argaΓ―ese formate nie. Ek nooi jou uit om een ​​van die opsies vir Apache Airflow uitmekaar te haal, wat illustreer hoe jy data uit e-posse kan neem.

voorgeskiedenis

Baie data word steeds per e-pos oorgedra, van interpersoonlike kommunikasie tot standaarde van interaksie tussen maatskappye. Dit is goed as dit moontlik is om 'n koppelvlak te skryf om data te bekom of mense in die kantoor te plaas wat hierdie inligting in geriefliker bronne sal invoer, maar dikwels is dit dalk eenvoudig nie moontlik nie. Die spesifieke taak wat ek in die gesig gestaar het, was om die berugte CRM-stelsel aan die datapakhuis te koppel, en dan aan die OLAP-stelsel. Dit het so histories gebeur dat die gebruik van hierdie stelsel vir ons maatskappy gerieflik was in 'n spesifieke besigheidsarea. Daarom wou almal regtig ook met data van hierdie derdeparty-stelsel kan werk. Eerstens is natuurlik die moontlikheid bestudeer om data van 'n oop API te verkry. Ongelukkig het die API nie gedek om al die nodige data te kry nie, en in eenvoudige terme was dit in baie opsigte skeef, en tegniese ondersteuning wou of kon nie halfpad ontmoet om meer omvattende funksionaliteit te verskaf nie. Maar hierdie stelsel het die geleentheid gebied om die ontbrekende data periodiek per pos te ontvang in die vorm van 'n skakel vir die aflaai van die argief.

Daar moet kennis geneem word dat dit nie die enigste geval was waarin die onderneming data van e-posse of kitsboodskappers wou insamel nie. In hierdie geval kon ons egter nie 'n derdepartymaatskappy beΓ―nvloed wat 'n deel van die data slegs op hierdie manier verskaf nie.

Apache lugvloei

Om ETL-prosesse te bou, gebruik ons ​​meestal Apache Airflow. Sodat 'n leser wat nie met hierdie tegnologie vertroud is nie beter kan verstaan ​​hoe dit in die konteks en in die algemeen lyk, sal ek 'n paar inleidende beskryf.

Apache Airflow is 'n gratis platform wat gebruik word om ETL (Extract-Transform-Loading)-prosesse in Python te bou, uit te voer en te monitor. Die hoofkonsep in Lugvloei is 'n gerigte asikliese grafiek, waar die hoekpunte van die grafiek spesifieke prosesse is, en die rande van die grafiek die vloei van beheer of inligting is. 'n Proses kan eenvoudig enige Python-funksie oproep, of dit kan meer komplekse logika hΓͺ om verskeie funksies opeenvolgend in die konteks van 'n klas te roep. Vir die mees gereelde operasies is daar reeds baie klaargemaakte ontwikkelings wat as prosesse gebruik kan word. Sulke ontwikkelings sluit in:

  • operateurs - vir die oordrag van data van een plek na 'n ander, byvoorbeeld van 'n databasistabel na 'n datapakhuis;
  • sensors - om te wag vir die voorkoms van 'n sekere gebeurtenis en die vloei van beheer na die daaropvolgende grafiekhoekpunte te rig;
  • hake - vir laervlak-bewerkings, byvoorbeeld om data van 'n databasistabel te kry (gebruik in stellings);
  • ens.

Dit sal onvanpas wees om Apache Airflow in detail in hierdie artikel te beskryf. Kort inleidings kan besigtig word hier of hier.

Haak om data te kry

Eerstens, om die probleem op te los, moet ons 'n haak skryf waarmee ons kan:

  • koppel aan e-pos
  • vind die regte letter
  • data uit die brief ontvang.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Die logika is dit: ons verbind, vind die laaste mees relevante letter, as daar ander is, ignoreer ons hulle. Hierdie funksie word gebruik, want latere briewe bevat al die data van vroeΓ«re. As dit nie die geval is nie, kan jy 'n reeks van alle letters terugstuur, of die eerste een verwerk, en die res met die volgende pas. Oor die algemeen hang alles, soos altyd, van die taak af.

Ons voeg twee hulpfunksies by die haak: om 'n lΓͺer af te laai en om 'n lΓͺer af te laai met 'n skakel van 'n e-pos. Terloops, hulle kan na die operateur geskuif word, dit hang af van die frekwensie van die gebruik van hierdie funksionaliteit. Wat anders om by die haak te voeg, hang weer van die taak af: as lΓͺers onmiddellik in die brief ontvang word, kan u aanhangsels by die brief aflaai, as die data in die brief ontvang word, moet u die brief ontleed, ens. In my geval kom die brief met een skakel na die argief, wat ek op 'n sekere plek moet plaas en die verdere verwerkingsproses moet begin.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Die kode is eenvoudig, so dit het skaars verdere verduideliking nodig. Ek sal jou net vertel van die towerlyn imap_conn_id. Apache Airflow stoor verbindingsparameters (aanmelding, wagwoord, adres en ander parameters) wat deur 'n string-identifiseerder verkry kan word. Visueel lyk verbindingsbestuur so

ETL-proses om data vanaf e-pos in Apache Airflow te kry

Sensor om te wag vir data

Aangesien ons reeds weet hoe om data van pos te koppel en te ontvang, kan ons nou 'n sensor skryf om vir hulle te wag. In my geval het die skryf van 'n operateur wat die data sal verwerk, indien enige, nie dadelik uitgewerk nie, aangesien ander prosesse ook werk op grond van die data wat van die pos ontvang is, insluitend diΓ© wat verwante data van ander bronne af neem (API, telefonie, webmetriek, ens.). Ek sal jou 'n voorbeeld gee. 'n Nuwe gebruiker het in die CRM-stelsel verskyn, en ons weet steeds nie van sy UUID nie. Dan, wanneer ons probeer om data vanaf SIP-telefonie te ontvang, sal ons oproepe ontvang wat aan sy UUID gekoppel is, maar ons sal dit nie reg kan stoor en gebruik nie. In sulke sake is dit belangrik om die afhanklikheid van die data in gedagte te hou, veral as dit uit verskillende bronne kom. Dit is natuurlik onvoldoende maatreΓ«ls om data-integriteit te bewaar, maar in sommige gevalle is dit nodig. Ja, en luier om hulpbronne te beset is ook irrasioneel.

Ons sensor sal dus die volgende hoekpunte van die grafiek begin as daar vars inligting in die pos is, en ook die vorige inligting as irrelevant merk.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Ons ontvang en gebruik data

Om data te ontvang en te verwerk, kan jy 'n aparte operateur skryf, jy kan klaargemaakte gebruik. Aangesien die logika tot dusver triviaal is - om byvoorbeeld data uit die brief te neem, stel ek die standaard PythonOperator voor

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Terloops, as u korporatiewe pos ook op mail.ru is, sal u nie briewe kan soek volgens onderwerp, sender, ens. In 2016 het hulle belowe om dit bekend te stel, maar het blykbaar van plan verander. Ek het hierdie probleem opgelos deur 'n aparte vouer vir die nodige briewe te skep en 'n filter vir die nodige briewe in die pos-webkoppelvlak op te stel. Dus, slegs die nodige briewe en voorwaardes vir die soektog, in my geval, kom eenvoudig (ONSIEN) in hierdie gids.

Opsommend het ons die volgende volgorde: ons kyk of daar nuwe briewe is wat aan die voorwaardes voldoen, as daar is, dan laai ons die argief af deur die skakel van die laaste brief te gebruik.
Onder die laaste kolletjies word dit weggelaat dat hierdie argief uitgepak sal word, die data uit die argief sal uitgevee en verwerk word, en gevolglik sal die hele ding verder gaan na die pyplyn van die ETL-proses, maar dit is reeds verby die omvang van die artikel. As dit interessant en bruikbaar blyk, sal ek graag voortgaan om ETL-oplossings en hul onderdele vir Apache Airflow te beskryf.

Bron: will.com

Voeg 'n opmerking