ETL Prozess fir Daten aus E-Mail an Apache Airflow ze kréien

ETL Prozess fir Daten aus E-Mail an Apache Airflow ze kréien

Egal wéi vill Technologie sech entwéckelt, e String vun alen Approchen verleeft ëmmer hannert der Entwécklung. Dëst kann wéinst engem glaten Iwwergang, mënschlech Faktoren, technologesche Besoinen oder soss eppes sinn. Am Beräich vun der Dateveraarbechtung sinn Datequellen déi offensichtlechst an dësem Deel. Egal wéi vill mir dreemen vun dësem lass ze ginn, awer bis elo gëtt en Deel vun den Donnéeën an Instant Messenger an E-Mail geschéckt, fir net méi archaesch Formater ze ernimmen. Ech invitéieren Iech eng vun den Optiounen fir Apache Airflow ze disassemble, illustréiert wéi Dir Daten aus E-Mailen huelen kënnt.

Virgeschicht

Vill Daten ginn nach ëmmer iwwer E-Mail transferéiert, vun interperséinleche Kommunikatiounen bis Normen vun der Interaktioun tëscht Firmen. Et ass gutt wann et méiglech ass en Interface ze schreiwen fir Daten ze kréien oder Leit op de Büro ze setzen déi dës Informatioun a méi praktesch Quellen aginn, awer dacks ass dëst einfach net méiglech. Déi spezifesch Aufgab, déi ech konfrontéiert hunn, war den notoresche CRM-System mam Datelager ze verbannen, an dann mam OLAP System. Et ass sou geschitt historesch datt fir eis Firma d'Benotzung vun dësem System an engem bestëmmte Beräich vum Geschäft bequem war. Dofir wollt jidderee wierklech fäeg sinn och mat Daten aus dësem Drëtt-Partei-System ze bedreiwen. Als éischt gouf natierlech d'Méiglechkeet studéiert Daten aus enger oppener API ze kréien. Leider huet d'API net all déi néideg Donnéeën ofgedeckt, an, an einfache Begrëffer, war et op vill Manéiere kromm, an technesch Ënnerstëtzung wollt oder konnt net hallef treffen fir méi ëmfaassend Funktionalitéit ze bidden. Mä dëse System huet d'Méiglechkeet periodesch déi fehlend Donnéeën per Mail a Form vun engem Link fir d'Entlueden vum Archiv ze kréien.

Et sollt bemierkt datt dëst net deen eenzege Fall war an deem d'Geschäft Daten aus E-Mailen oder Instant Messenger wollt sammelen. Wéi och ëmmer, an dësem Fall kënne mir eng Drëtt Partei Firma net beaflossen, déi en Deel vun den Donnéeën nëmmen op dës Manéier liwwert.

apache Loftfloss

Fir ETL Prozesser ze bauen, benotze mir meeschtens Apache Airflow. Fir datt e Lieser, deen dës Technologie net vertraut ass besser ze verstoen wéi et am Kontext an allgemeng ausgesäit, wäert ech e puer Aféierung beschreiwen.

Apache Airflow ass eng gratis Plattform déi benotzt gëtt fir ETL (Extract-Transform-Loading) Prozesser am Python ze bauen, auszeféieren an ze iwwerwaachen. D'Haaptkonzept am Airflow ass eng geriicht azyklesch Grafik, wou d'Wierder vun der Grafik spezifesch Prozesser sinn, an d'Kante vun der Grafik sinn de Flux vu Kontroll oder Informatioun. E Prozess kann einfach all Python Funktioun nennen, oder et kann méi komplex Logik hunn aus sequenziell verschidde Funktiounen am Kontext vun enger Klass ze ruffen. Fir déi heefegst Operatioune ginn et scho vill fäerdeg Entwécklungen, déi als Prozesser benotzt kënne ginn. Esou Entwécklungen enthalen:

  • Bedreiwer - fir Daten vun enger Plaz op déi aner ze transferéieren, zum Beispill vun enger Datebanktabell an en Datelager;
  • Sensoren - fir op d'Optriede vun engem bestëmmten Event ze waarden an de Flux vun der Kontroll op déi spéider Wirbelen vun der Grafik ze riichten;
  • Haken - fir Operatiounen op nidderegen Niveau, zum Beispill, fir Daten aus enger Datebanktabelle ze kréien (benotzt an Aussoen);
  • an esou weider.

Et wier onpassend Apache Airflow am Detail an dësem Artikel ze beschreiwen. Kuerz Aféierung ka gekuckt ginn hei oder hei.

Hook fir Daten ze kréien

Als éischt, fir de Problem ze léisen, musse mir en Haken schreiwen, mat deem mir kéinten:

  • konnektéieren op E-Mail
  • fannen de richtege Bréif
  • kréien Daten aus dem Bréif.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

D'Logik ass dëst: mir verbannen, fanne de leschte relevantste Bréif, wann et anerer sinn, ignoréiere mir se. Dës Funktioun gëtt benotzt, well spéider Bréiwer all d'Donnéeë vu fréiere enthalen. Wann dat net de Fall ass, da kënnt Dir eng Rei vun alle Bréiwer zréckginn, oder déi éischt veraarbecht, an de Rescht am nächste Pass. Am Allgemengen, hänkt alles wéi ëmmer vun der Aufgab of.

Mir addéieren zwou Hëllefsfunktiounen un den Hook: fir eng Datei erofzelueden a fir eng Datei mat engem Link vun enger E-Mail erofzelueden. Iwwregens, si kënnen op de Bedreiwer geplënnert ginn, et hänkt vun der Frequenz vun der Benotzung vun dëser Funktionalitéit of. Wat fir den Hook nach eng Kéier ze addéieren, hänkt vun der Aufgab of: wann Dateien direkt am Bréif kritt ginn, da kënnt Dir Uschlëss op de Bréif eroflueden, wann d'Donnéeën am Bréif kritt ginn, da musst Dir de Bréif parséieren, etc. A mengem Fall kënnt de Bréif mat engem Link op den Archiv, deen ech muss op eng bestëmmte Plaz setzen an de weidere Veraarbechtungsprozess ufänken.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

De Code ass einfach, also brauch et kaum weider Erklärung. Ech soen Iech just iwwer d'Magie Linn imap_conn_id. Apache Airflow späichert Verbindungsparameter (Login, Passwuert, Adress an aner Parameteren) déi vun engem Stringidentifizéierer zougänglech sinn. Visuell gesäit d'Verbindungsmanagement esou aus

ETL Prozess fir Daten aus E-Mail an Apache Airflow ze kréien

Sensor fir Daten ze waarden

Well mir scho wësse wéi mir Daten aus der Mail verbannen an kréien, kënne mir elo e Sensor schreiwen fir op se ze waarden. A mengem Fall huet et net fonctionnéiert direkt en Operateur ze schreiwen deen d'Donnéeën veraarbecht, wann iwwerhaapt, well aner Prozesser funktionnéieren op Basis vun den Donnéeën, déi vun der Mail kritt goufen, och déi, déi verbonne Donnéeën aus anere Quellen huelen (API, Telefonie). , Web Metriken, etc.) etc.). Ech ginn Iech e Beispill. En neie Benotzer ass am CRM System opgetaucht, a mir wëssen nach ëmmer net iwwer seng UUID. Dann, wa mir probéieren Daten aus der SIP-Telefonie ze kréien, kréie mir Uriff verbonne mat senger UUID, awer mir kënnen se net späicheren a richteg benotzen. An esou Saachen ass et wichteg d'Ofhängegkeet vun den Donnéeën am Kapp ze halen, besonnesch wa se aus verschiddene Quelle kommen. Dëst sinn natierlech net genuch Moossname fir d'Integritéit vun den Donnéeën ze erhaalen, awer an e puer Fäll sinn se néideg. Jo, a Leedung fir Ressourcen ze besetzen ass och irrational.

Also lancéiert eise Sensor spéider Wirbelen vun der Grafik wann et frësch Informatioun an der Mail gëtt, an och déi fréier Informatioun als irrelevant markéieren.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Mir kréien a benotzen Daten

Fir Daten ze kréien an ze verarbeiten, kënnt Dir e separaten Bedreiwer schreiwen, Dir kënnt fäerdege benotzen. Well bis elo d'Logik trivial ass - fir Daten aus dem Bréif ze huelen, zum Beispill, proposéieren ech de Standard PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Iwwregens, wann Är Firmenmail och op mail.ru ass, da kënnt Dir net no Bréiwer no Thema, Sender, etc. Zréck an 2016, si versprach et aféieren, mä anscheinend geännert hir Meenung. Ech hunn dëse Problem geléist andeems en en getrennten Dossier fir déi néideg Bréiwer erstallt huet an e Filter fir déi néideg Bréiwer an der Mail Web Interface opgestallt huet. Also nëmmen déi néideg Buschtawen a Konditioune fir d'Sich, a mengem Fall, einfach (UNSEEN) an dësen Dossier.

Zesummefaassend hu mir déi folgend Sequenz: mir kucken ob et nei Bréiwer ginn déi de Konditiounen entspriechen, wann et gëtt, da lueden mir d'Archiv erof mam Link vum leschte Bréif.
Ënnert deene leschte Punkte gëtt ewech gelooss, datt dëst Archiv ausgepackt gëtt, d'Daten aus dem Archiv geläscht a veraarbecht ginn, an doduerch geet dat Ganzt weider an d'Pipeline vum ETL-Prozess, mä dat ass schonn doriwwer eraus. den Ëmfang vum Artikel. Wann et interessant an nëtzlech ass, da wäert ech gär weider ETL Léisungen an hir Deeler fir Apache Airflow beschreiwen.

Source: will.com

Setzt e Commentaire