ETL-proces til at hente data fra e-mail i Apache Airflow

ETL-proces til at hente data fra e-mail i Apache Airflow

Uanset hvor meget teknologien udvikler sig, følger en række forældede tilgange altid efter udviklingen. Dette kan skyldes en glidende overgang, menneskelige faktorer, teknologiske behov eller noget andet. Inden for databehandling er datakilder de mest afslørende i denne del. Uanset hvor meget vi drømmer om at slippe af med dette, men indtil videre sendes en del af dataene i instant messengers og e-mails, for ikke at nævne mere arkaiske formater. Jeg inviterer dig til at skille en af ​​mulighederne for Apache Airflow ad, og illustrere, hvordan du kan tage data fra e-mails.

forhistorie

En masse data overføres stadig via e-mail, fra interpersonel kommunikation til standarder for interaktion mellem virksomheder. Det er godt, hvis det er muligt at skrive en grænseflade for at indhente data eller sætte folk på kontoret, som vil indtaste disse oplysninger i mere bekvemme kilder, men ofte er det måske simpelthen ikke muligt. Den specifikke opgave, jeg stod over for, var at forbinde det berygtede CRM-system til datavarehuset og derefter til OLAP-systemet. Det skete så historisk, at for vores virksomhed var brugen af ​​dette system praktisk i et bestemt forretningsområde. Derfor ønskede alle virkelig også at kunne operere med data fra dette tredjepartssystem. Først og fremmest blev naturligvis muligheden for at indhente data fra en åben API undersøgt. Desværre dækkede API'et ikke over at få alle de nødvendige data, og kort sagt var det på mange måder skævt, og teknisk support ville eller kunne ikke mødes halvvejs for at levere mere omfattende funktionalitet. Men dette system gav mulighed for periodisk at modtage de manglende data med posten i form af et link til udlæsning af arkivet.

Det skal bemærkes, at dette ikke var det eneste tilfælde, hvor virksomheden ønskede at indsamle data fra e-mails eller instant messengers. Men i dette tilfælde kunne vi ikke påvirke en tredjepartsvirksomhed, der kun leverer en del af dataene på denne måde.

Apache luftstrøm

Til at bygge ETL-processer bruger vi oftest Apache Airflow. For at en læser, der ikke er bekendt med denne teknologi, bedre kan forstå, hvordan den ser ud i sammenhængen og generelt, vil jeg beskrive et par indledende.

Apache Airflow er en gratis platform, der bruges til at bygge, eksekvere og overvåge ETL-processer (Extract-Transform-Loading) i Python. Hovedkonceptet i Airflow er en rettet acyklisk graf, hvor hjørnerne af grafen er specifikke processer, og kanterne af grafen er strømmen af ​​kontrol eller information. En proces kan simpelthen kalde enhver Python-funktion, eller den kan have mere kompleks logik fra sekventielt at kalde flere funktioner i sammenhæng med en klasse. Til de hyppigste operationer er der allerede mange færdige udviklinger, der kan bruges som processer. Sådanne udviklinger omfatter:

  • operatører - til overførsel af data fra et sted til et andet, for eksempel fra en databasetabel til et datavarehus;
  • sensorer - til at vente på forekomsten af ​​en bestemt hændelse og dirigere strømmen af ​​kontrol til de efterfølgende hjørner af grafen;
  • hooks - til operationer på lavere niveau, for eksempel for at hente data fra en databasetabel (brugt i sætninger);
  • etc.

Det ville være upassende at beskrive Apache Airflow i detaljer i denne artikel. Korte introduktioner kan ses her eller her.

Hook for at få data

Først og fremmest, for at løse problemet, skal vi skrive en krog, som vi kunne:

  • oprette forbindelse til e-mail
  • finde det rigtige bogstav
  • modtage data fra brevet.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logikken er denne: vi forbinder, finder det sidste mest relevante bogstav, hvis der er andre, ignorerer vi dem. Denne funktion bruges, fordi senere bogstaver indeholder alle data fra tidligere. Hvis dette ikke er tilfældet, kan du returnere en række af alle bogstaver eller behandle det første og resten ved næste gennemløb. Generelt afhænger alt som altid af opgaven.

Vi tilføjer to hjælpefunktioner til krogen: til at downloade en fil og til at downloade en fil ved hjælp af et link fra en e-mail. Forresten kan de flyttes til operatøren, det afhænger af hyppigheden af ​​at bruge denne funktionalitet. Hvad der ellers skal tilføjes til krogen, afhænger igen af ​​opgaven: hvis filer modtages med det samme i brevet, kan du downloade vedhæftede filer til brevet, hvis dataene modtages i brevet, skal du parse brevet, etc. I mit tilfælde kommer brevet med ét link til arkivet, som jeg skal lægge et bestemt sted og starte den videre behandling.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Koden er enkel, så den behøver næppe yderligere forklaring. Jeg vil bare fortælle dig om den magiske linje imap_conn_id. Apache Airflow gemmer forbindelsesparametre (login, adgangskode, adresse og andre parametre), som kan tilgås af en strengidentifikator. Visuelt ser forbindelsesstyringen sådan ud

ETL-proces til at hente data fra e-mail i Apache Airflow

Sensor til at vente på data

Da vi allerede ved, hvordan man forbinder og modtager data fra mail, kan vi nu skrive en sensor til at vente på dem. I mit tilfælde virkede det ikke at skrive en operatør med det samme, der vil behandle dataene, hvis nogen, fordi andre processer fungerer baseret på data modtaget fra posten, inklusive dem, der tager relaterede data fra andre kilder (API, telefoni , webmålinger osv.). osv.). Jeg vil give dig et eksempel. En ny bruger er dukket op i CRM-systemet, og vi kender stadig ikke til hans UUID. Når vi derefter forsøger at modtage data fra SIP-telefoni, vil vi modtage opkald knyttet til dets UUID, men vi vil ikke være i stand til at gemme og bruge dem korrekt. I sådanne sager er det vigtigt at huske på afhængigheden af ​​dataene, især hvis de er fra forskellige kilder. Det er naturligvis utilstrækkelige foranstaltninger til at bevare dataintegriteten, men i nogle tilfælde er de nødvendige. Ja, og tomgang for at besætte ressourcer er også irrationelt.

Vores sensor vil således lancere efterfølgende hjørner af grafen, hvis der er frisk information i posten, og også markere den tidligere information som irrelevant.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Vi modtager og bruger data

For at modtage og behandle data kan du skrive en separat operatør, du kan bruge færdige. Da logikken stadig er triviel - for at tage data fra brevet, for eksempel, foreslår jeg standard PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Forresten, hvis din virksomhedsmail også er på mail.ru, vil du ikke være i stand til at søge efter breve efter emne, afsender osv. Tilbage i 2016 lovede de at indføre det, men ændrede åbenbart mening. Jeg løste dette problem ved at oprette en separat mappe til de nødvendige breve og opsætte et filter for de nødvendige breve i mail-webgrænsefladen. Det er således kun de nødvendige bogstaver og betingelser for søgningen, i mit tilfælde, som simpelthen (USESET) kommer ind i denne mappe.

Sammenfattende har vi følgende rækkefølge: vi tjekker, om der er nye breve, der opfylder betingelserne, hvis der er, så downloader vi arkivet ved hjælp af linket fra det sidste brev.
Under de sidste prikker er det udeladt, at dette arkiv vil blive pakket ud, data fra arkivet vil blive ryddet og behandlet, og som følge heraf vil det hele gå videre til pipelinen af ​​ETL-processen, men dette er allerede ude over artiklens omfang. Hvis det viste sig interessant og brugbart, så vil jeg gerne fortsætte med at beskrive ETL-løsninger og deres dele til Apache Airflow.

Kilde: www.habr.com

Tilføj en kommentar