ETL ferli til að sækja gögn úr tölvupósti í Apache Airflow

ETL ferli til að sækja gögn úr tölvupósti í Apache Airflow

Sama hversu mikið tæknin þróast, þróun er alltaf fylgt eftir með röð úreltra nálgana. Þetta getur stafað af hnökralausum umskiptum, mannlegum þáttum, tæknilegum þörfum eða einhverju öðru. Á sviði gagnavinnslu eru gagnaheimildir mest afhjúpandi í þessum hluta. Sama hversu mikið okkur dreymir um að losna við þetta, í augnablikinu eru sum gagna send í spjallforritum og tölvupósti, svo ekki sé minnst á eldgamlara snið. Ég býð þér að skoða einn af valkostunum fyrir Apache Airflow, sem sýnir hvernig þú getur safnað gögnum úr tölvupósti.

Forsaga

Mikið af gögnum er enn flutt með tölvupósti, allt frá mannlegum samskiptum til staðla um samskipti milli fyrirtækja. Það er gott ef hægt er að skrifa viðmót til að afla gagna eða setja fólk á skrifstofuna sem færir þessar upplýsingar inn í þægilegri heimildir, en oft er það einfaldlega ekki hægt. Það sérstaka verkefni sem ég stóð frammi fyrir var að tengja hið alræmda CRM kerfi við gagnageymsluna og síðan við OLAP kerfið. Það gerðist svo sögulega að fyrir fyrirtæki okkar var notkun þessa kerfis þægileg á tilteknu sviði viðskipta. Þess vegna vildu allir virkilega geta starfað með gögn frá þessu þriðja aðila kerfi líka. Fyrst af öllu var auðvitað kannaður möguleikinn á að fá gögn úr opnu API. Því miður náði API ekki að fá öll nauðsynleg gögn, og í einföldu máli var það á margan hátt skakkt og tækniaðstoð vildi ekki eða gat ekki mætt á miðri leið til að veita víðtækari virkni. En þetta kerfi gaf tækifæri til að fá reglulega gögnin sem vantaði með pósti í formi hlekks til að afferma skjalasafnið.

Tekið skal fram að þetta var ekki eina tilvikið þar sem fyrirtækið vildi safna gögnum úr tölvupósti eða spjallskilaboðum. Hins vegar, í þessu tilviki, gætum við ekki haft áhrif á þriðja aðila fyrirtæki sem veitir hluta af gögnunum aðeins á þennan hátt.

Apache loftflæði

Til að byggja upp ETL ferla notum við oftast Apache Airflow. Til þess að lesandi sem ekki kannast við þessa tækni skilji betur hvernig hún lítur út í samhengi og almennt mun ég lýsa nokkrum inngangi.

Apache Airflow er ókeypis vettvangur sem er notaður til að byggja, framkvæma og fylgjast með ETL (Extract-Transform-Loading) ferlum í Python. Meginhugtakið í Airflow er stýrt óhringlaga línurit, þar sem hornpunktar línuritsins eru ákveðin ferli og brúnir línuritsins eru flæði stjórna eða upplýsinga. Ferli getur einfaldlega kallað hvaða Python aðgerð sem er, eða það getur haft flóknari rökfræði frá því að hringja í nokkrar aðgerðir í röð í samhengi við flokk. Fyrir algengustu aðgerðir eru nú þegar margar tilbúnar þróun sem hægt er að nota sem ferli. Slík þróun felur í sér:

  • rekstraraðilar - til að flytja gögn frá einum stað til annars, til dæmis úr gagnagrunnstöflu í gagnageymslu;
  • skynjarar - til að bíða eftir að ákveðinn atburður gerist og beina stjórnflæðinu að síðari hornpunktum grafsins;
  • krókar - fyrir aðgerðir á lægra stigi, til dæmis til að fá gögn úr gagnagrunnstöflu (notað í yfirlýsingum);
  • o.fl.

Það væri óviðeigandi að lýsa Apache Airflow í smáatriðum í þessari grein. Hægt er að skoða stuttar kynningar hér eða hér.

Krókur til að fá gögn

Fyrst af öllu, til að leysa vandamálið, þurfum við að skrifa krók sem við gætum:

  • tengjast tölvupósti
  • finndu bréfið sem þú þarft;
  • fá gögn úr bréfinu.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Rökfræðin er þessi: við tengjumst, finnum síðasta staf sem er mest viðeigandi, ef það eru aðrir, hunsum við þá. Þessi aðgerð er notuð, vegna þess að síðari stafir innihalda öll gögn fyrri. Ef þetta er ekki raunin, þá geturðu skilað fjölda af öllum stöfum, eða unnið þann fyrsta og afganginn í næstu umferð. Almennt, allt, eins og alltaf, fer eftir verkefninu.

Við bætum tveimur aukaaðgerðum við krókinn: til að hlaða niður skrá og til að hlaða niður skrá með hlekk úr tölvupósti. Við the vegur, þá er hægt að færa þau til rekstraraðilans, það fer eftir því hversu oft þessi virkni er notuð. Hvað annað á að bæta við krókinn, fer aftur eftir verkefninu: ef skrár berast strax í bréfinu, þá geturðu hlaðið niður viðhengjum við bréfið, ef gögnin berast í bréfinu, þá þarftu að flokka bréfið, o.s.frv. Í mínu tilfelli fylgir bréfinu einn hlekkur á skjalasafnið sem ég þarf að setja á ákveðinn stað og hefja frekari úrvinnslu.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kóðinn er einfaldur, svo hann þarf varla frekari skýringa við. Ég skal bara segja þér frá töfralínunni imap_conn_id. Apache Airflow geymir tengibreytur (innskráning, lykilorð, heimilisfang og aðrar breytur) sem hægt er að nálgast með strengjaauðkenni. Sjónrænt lítur tengingarstjórnun svona út

ETL ferli til að sækja gögn úr tölvupósti í Apache Airflow

Skynjari til að bíða eftir gögnum

Þar sem við vitum nú þegar hvernig á að tengjast og taka á móti gögnum úr pósti, getum við nú skrifað skynjara til að bíða eftir þeim. Í mínu tilfelli virkaði það ekki að skrifa símafyrirtæki strax sem mun vinna úr gögnunum, ef einhver er, vegna þess að önnur ferli vinna út frá gögnunum sem berast frá póstinum, þar á meðal þeir sem taka tengd gögn frá öðrum aðilum (API, símtækni) , vefmælingar osfrv.) osfrv.). Ég skal gefa þér dæmi. Nýr notandi hefur birst í CRM kerfinu og við vitum enn ekki um UUID hans. Síðan, þegar reynt er að taka á móti gögnum frá SIP-símakerfi, munum við fá símtöl sem eru bundin við UUID þess, en við munum ekki geta vistað og notað þau rétt. Í slíkum málum er mikilvægt að hafa í huga hversu háð gögnin eru, sérstaklega ef þau eru frá mismunandi aðilum. Þetta eru auðvitað ófullnægjandi ráðstafanir til að varðveita heilleika gagna, en í sumum tilfellum eru þær nauðsynlegar. Já, og aðgerðalaus til að hernema auðlindir er líka óskynsamlegt.

Þannig mun skynjarinn okkar ræsa síðari hornpunkta grafsins ef það eru nýjar upplýsingar í pósti og einnig merkja fyrri upplýsingar sem óviðkomandi.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Við tökum á móti og notum gögn

Til að taka á móti og vinna úr gögnum geturðu skrifað sérstakan rekstraraðila, eða þú getur notað tilbúna. Þar sem rökfræðin er enn léttvæg - til að taka gögn úr bréfi, þá sting ég sem dæmi á venjulegum PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Við the vegur, ef fyrirtækjapósturinn þinn er líka á mail.ru, þá muntu ekki geta leitað að bréfum eftir efni, sendanda osfrv. Árið 2016 lofuðu þeir að kynna það, en skiptu greinilega um skoðun. Ég leysti þetta vandamál með því að búa til sérstaka möppu fyrir nauðsynleg bréf og setja upp síu fyrir nauðsynlega stafi í póstviðmótinu. Þannig komast aðeins nauðsynlegir stafir og skilyrði fyrir leitinni, í mínu tilfelli, einfaldlega (ÓSÉÐ) inn í þessa möppu.

Í stuttu máli höfum við eftirfarandi röð: við athugum hvort það séu nýir stafir sem uppfylla skilyrðin, ef svo eru, þá sækjum við skjalasafnið með því að nota hlekkinn frá síðasta bréfi.
Undir síðustu sporbaug er því sleppt að þessu skjalasafni verður pakkað upp, gögnin úr skjalasafninu verða hreinsuð og unnin og á endanum mun þetta allt fara lengra í ETL vinnslupípuna, en þetta er nú þegar utan gildissviðs greinin. Ef það reyndist áhugavert og gagnlegt, þá mun ég vera ánægður með að halda áfram að lýsa ETL lausnum og hlutum þeirra fyrir Apache Airflow.

Heimild: www.habr.com

Bæta við athugasemd