ETL-protsess Apache Airflow e-kirjadest andmete hankimiseks

ETL-protsess Apache Airflow e-kirjadest andmete hankimiseks

Ükskõik kui palju tehnoloogia areneb, jääb arengu taga alati rida aegunud lähenemisviise. See võib olla tingitud sujuvast üleminekust, inimteguritest, tehnoloogilistest vajadustest või millestki muust. Andmetöötluse vallas on selles osas kõige paljastavamad andmeallikad. Ükskõik kui palju me sellest vabanemisest unistame, aga seni saadetakse osa andmetest kiirsõnumite ja meili teel, rääkimata arhailisematest formaatidest. Kutsun teid lahti võtma üks Apache Airflow valikutest, näitlikustades, kuidas saate meilidest andmeid võtta.

eelajalugu

E-posti teel edastatakse endiselt palju andmeid, alates inimestevahelisest suhtlusest kuni ettevõtetevahelise suhtluse standarditeni. Hea, kui andmete hankimiseks on võimalik kirjutada liides või panna kontorisse inimesed, kes selle info mugavamatesse allikatesse sisestavad, kuid sageli ei pruugi see lihtsalt võimalik olla. Konkreetne ülesanne, millega ma silmitsi seisin, on kurikuulsa CRM-süsteemi ühendamine andmelaoga ja seejärel OLAP-süsteemiga. Ajalooliselt juhtus nii, et meie ettevõtte jaoks oli selle süsteemi kasutamine konkreetses ärivaldkonnas mugav. Seetõttu soovisid kõik väga, et oleks võimalik opereerida ka selle kolmanda osapoole süsteemi andmetega. Kõigepealt uuriti muidugi võimalust saada andmeid avatud API-st. Kahjuks ei hõlmanud API kõigi vajalike andmete saamist ja lihtsamalt öeldes oli see paljuski vildakas ning tehniline tugi ei tahtnud või ei saanud pooleldi kokku, et pakkuda terviklikumat funktsionaalsust. Kuid see süsteem andis võimaluse saada puuduvad andmed perioodiliselt posti teel arhiivi mahalaadimise lingi kujul.

Tuleb märkida, et see ei olnud ainus juhtum, mil ettevõte soovis koguda andmeid e-kirjadest või kiirsõnumite saatjatest. Kuid antud juhul ei saanud me mõjutada kolmanda osapoole ettevõtet, kes edastab osa andmetest ainult sel viisil.

Apache õhuvool

ETL-i protsesside koostamiseks kasutame kõige sagedamini Apache Airflow'i. Selleks, et lugeja, kes seda tehnoloogiat ei tunne, saaks paremini aru, kuidas see kontekstis ja üldiselt välja näeb, kirjeldan paari sissejuhatavat.

Apache Airflow on tasuta platvorm, mida kasutatakse Pythonis ETL-i (Extract-Transform-Loading) protsesside koostamiseks, täitmiseks ja jälgimiseks. Õhuvoo põhikontseptsioon on suunatud atsükliline graaf, kus graafi tippudeks on spetsiifilised protsessid ja graafiku servadeks on juhtimis- või infovoog. Protsess võib lihtsalt kutsuda mis tahes Pythoni funktsiooni või sellel võib olla keerulisem loogika mitme funktsiooni järjestikusest kutsumisest klassi kontekstis. Kõige sagedasemate toimingute jaoks on juba palju valmisarendusi, mida saab protsessidena kasutada. Sellised arengud hõlmavad järgmist:

  • operaatorid - andmete edastamiseks ühest kohast teise, näiteks andmebaasi tabelist andmelattu;
  • andurid - teatud sündmuse toimumise ootamiseks ja juhtimisvoo suunamiseks graafiku järgmistesse tippudesse;
  • konksud - madalama taseme operatsioonide jaoks, näiteks andmebaasi tabelist andmete saamiseks (kasutatakse lausetes);
  • jne

Selles artiklis poleks kohane Apache Airflow'i üksikasjalikult kirjeldada. Lühitutvustusi saab vaadata siin või siin.

Konks andmete hankimiseks

Esiteks peame probleemi lahendamiseks kirjutama konksu, mille abil saaksime:

  • ühendada meiliga
  • leida õige kiri
  • saada kirjast andmeid.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Loogika on järgmine: ühendame, leiame viimase kõige asjakohasema tähe, kui on teisi, siis ignoreerime neid. Seda funktsiooni kasutatakse, kuna hilisemad kirjad sisaldavad kõiki varasemate andmeid. Kui see nii ei ole, saate tagastada kõigi tähtede massiivi või töödelda esimest ja ülejäänud järgmisel käigul. Üldiselt sõltub kõik, nagu alati, ülesandest.

Konksule lisame kaks abifunktsiooni: faili allalaadimiseks ja faili allalaadimiseks meilist saadud lingi abil. Muide, neid saab teisaldada operaatorile, see sõltub selle funktsiooni kasutamise sagedusest. Mida veel konksu juurde lisada, oleneb jällegi ülesandest: kui failid laekuvad kohe kirja, siis saab kirjale manuseid alla laadida, kui andmed on kirjas, siis tuleb kiri sõeluda, jne. Minu puhul on kirjaga kaasas üks link arhiivi, mille pean kindlasse kohta panema ja edasise töötlemise protsessi alustama.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kood on lihtne, nii et see ei vaja täiendavat selgitust. Ma räägin teile lihtsalt maagilisest reast imap_conn_id. Apache Airflow salvestab ühenduse parameetrid (sisselogimine, parool, aadress ja muud parameetrid), millele pääseb juurde stringi identifikaatori abil. Visuaalselt näeb ühenduse haldamine välja selline

ETL-protsess Apache Airflow e-kirjadest andmete hankimiseks

Andur andmete ootamiseks

Kuna me juba teame, kuidas ühendust luua ja meilist andmeid vastu võtta, saame nüüd kirjutada anduri, mis neid ootama hakkab. Minu puhul ei õnnestunud kohe operaatori kirjutamine, kes andmeid töötleb, kui neid on, sest muud protsessid töötavad meili teel saadud andmete põhjal, sh need, mis võtavad seotud andmeid muudest allikatest (API, telefon). , veebimõõdikud jne). jne). Toon näite. CRM-i süsteemi on ilmunud uus kasutaja, kelle UUID-st me ikka veel ei tea. Seejärel, kui proovime SIP-telefonilt andmeid vastu võtta, saame selle UUID-ga seotud kõnesid, kuid me ei saa neid õigesti salvestada ja kasutada. Sellistes küsimustes on oluline meeles pidada andmete sõltuvust, eriti kui need on erinevatest allikatest. Need on loomulikult ebapiisavad meetmed andmete terviklikkuse säilitamiseks, kuid mõnel juhul on need vajalikud. Jah, ja tühikäik ressursside hõivamiseks on samuti irratsionaalne.

Seega käivitab meie andur graafiku järgmised tipud, kui kirjas on värsket teavet, ja märgib ka eelneva teabe ebaoluliseks.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Me võtame vastu ja kasutame andmeid

Andmete vastuvõtmiseks ja töötlemiseks saab kirjutada eraldi operaatori, kasutada saab juba valmis. Kuna seni on loogika triviaalne - näiteks tähest andmete võtmiseks soovitan tavalist PythonOperatorit

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Muide, kui teie ettevõtte post asub ka saidil mail.ru, siis ei saa te kirju otsida teema, saatja jne järgi. Veel 2016. aastal lubasid nad seda tutvustada, kuid ilmselt mõtlesid ümber. Lahendasin selle probleemi nii, et lõin vajalike kirjade jaoks eraldi kausta ja seadistasin meili veebiliideses vajalike kirjade jaoks filtri. Seega satuvad sellesse kausta vaid otsinguks vajalikud tähed ja tingimused, minu puhul lihtsalt (NÄHTAMATA).

Kokkuvõtteks on meil järgmine jada: kontrollime, kas on uusi kirju, mis vastavad tingimustele, kui on, siis laadime arhiivi alla viimase tähe lingi abil.
Viimaste punktide alt on välja jäetud, et see arhiiv lahti pakitakse, andmed arhiivist kustutatakse ja töödeldakse ning selle tulemusena läheb kogu asi edasi ETL protsessi torusse, aga see on juba üle jõu. artikli ulatus. Kui see osutus huvitavaks ja kasulikuks, siis jätkan hea meelega Apache Airflow ETL lahenduste ja nende osade kirjeldamist.

Allikas: www.habr.com

Lisa kommentaar