ETL-process för att hämta data från e-post i Apache Airflow

ETL-process för att hämta data från e-post i Apache Airflow

Oavsett hur mycket tekniken utvecklas, följer alltid en rad föråldrade tillvägagångssätt efter utvecklingen. Detta kan bero på en smidig övergång, mänskliga faktorer, tekniska behov eller något annat. Inom området databehandling är datakällor de mest avslöjande i denna del. Hur mycket vi än drömmer om att bli av med detta, men än så länge skickas en del av datan i instant messengers och e-postmeddelanden, för att inte tala om mer ålderdomliga format. Jag inbjuder dig att demontera ett av alternativen för Apache Airflow, och illustrera hur du kan ta data från e-postmeddelanden.

förhistoria

Mycket data överförs fortfarande via e-post, från interpersonell kommunikation till standarder för interaktion mellan företag. Det är bra om det är möjligt att skriva ett gränssnitt för att få data eller placera personer på kontoret som kommer att lägga in denna information i mer bekväma källor, men ofta är det kanske helt enkelt inte möjligt. Den specifika uppgiften jag stod inför var att ansluta det ökända CRM-systemet till datalagret och sedan till OLAP-systemet. Det hände så historiskt att för vårt företag var användningen av detta system bekvämt i ett visst affärsområde. Därför ville alla verkligen kunna arbeta med data från detta tredjepartssystem också. Först och främst studerades förstås möjligheten att få data från ett öppet API. Tyvärr täckte API:et inte att få all nödvändig data, och enkelt uttryckt var den på många sätt sned, och teknisk support ville eller kunde inte mötas halvvägs för att tillhandahålla mer omfattande funktionalitet. Men detta system gav möjligheten att regelbundet ta emot de saknade uppgifterna via post i form av en länk för avlastning av arkivet.

Det bör noteras att detta inte var det enda fallet där företaget ville samla in data från e-postmeddelanden eller snabbmeddelanden. Men i det här fallet kunde vi inte påverka ett tredjepartsföretag som tillhandahåller en del av data endast på detta sätt.

Apache luftflöde

För att bygga ETL-processer använder vi oftast Apache Airflow. För att en läsare som inte är bekant med denna teknik bättre ska förstå hur den ser ut i sammanhanget och generellt, kommer jag att beskriva ett par inledande sådana.

Apache Airflow är en gratis plattform som används för att bygga, exekvera och övervaka ETL-processer (Extract-Transform-Loading) i Python. Huvudkonceptet i Airflow är en riktad acyklisk graf, där kurvans hörn är specifika processer, och kanterna på grafen är flödet av kontroll eller information. En process kan helt enkelt anropa vilken Python-funktion som helst, eller så kan den ha mer komplex logik från att sekventiellt anropa flera funktioner i en klass. För de mest frekventa operationerna finns det redan många färdiga utvecklingar som kan användas som processer. Sådana utvecklingar inkluderar:

  • operatörer - för överföring av data från en plats till en annan, till exempel från en databastabell till ett datalager;
  • sensorer - för att vänta på förekomsten av en viss händelse och styra kontrollflödet till grafens efterföljande hörn;
  • hooks - för operationer på lägre nivå, till exempel för att hämta data från en databastabell (används i satser);
  • etc.

Det skulle vara olämpligt att beskriva Apache Airflow i detalj i den här artikeln. Korta introduktioner kan ses här eller här.

Hook för att få data

Först och främst, för att lösa problemet, måste vi skriva en krok som vi kan:

  • ansluta till e-post
  • hitta rätt bokstav
  • ta emot uppgifter från brevet.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logiken är denna: vi ansluter, hittar den sista mest relevanta bokstaven, om det finns andra ignorerar vi dem. Den här funktionen används eftersom senare bokstäver innehåller all data från tidigare. Om så inte är fallet kan du returnera en uppsättning av alla bokstäver, eller bearbeta den första och resten vid nästa pass. I allmänhet beror allt, som alltid, på uppgiften.

Vi lägger till två hjälpfunktioner till kroken: för att ladda ner en fil och för att ladda ner en fil med en länk från ett e-postmeddelande. Förresten, de kan flyttas till operatören, det beror på hur ofta denna funktionalitet används. Vad mer du ska lägga till i kroken, återigen, beror på uppgiften: om filer tas emot omedelbart i brevet kan du ladda ner bilagor till brevet, om uppgifterna tas emot i brevet måste du analysera brevet, etc. I mitt fall kommer brevet med en länk till arkivet, som jag behöver lägga på ett visst ställe och påbörja den vidare handläggningsprocessen.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Koden är enkel, så den behöver knappast förklaras närmare. Jag ska bara berätta om den magiska linjen imap_conn_id. Apache Airflow lagrar anslutningsparametrar (inloggning, lösenord, adress och andra parametrar) som kan nås av en strängidentifierare. Visuellt ser anslutningshanteringen ut så här

ETL-process för att hämta data från e-post i Apache Airflow

Sensor för att vänta på data

Eftersom vi redan vet hur man ansluter och tar emot data från e-post kan vi nu skriva en sensor för att vänta på dem. I mitt fall fungerade det inte att skriva en operatör direkt som kommer att behandla data, om någon, eftersom andra processer fungerar baserat på data som tas emot från posten, inklusive de som tar relaterad data från andra källor (API, telefoni , webbstatistik, etc.). etc.). Jag ska ge dig ett exempel. En ny användare har dykt upp i CRM-systemet och vi känner fortfarande inte till hans UUID. När vi sedan försöker ta emot data från SIP-telefoni kommer vi att ta emot samtal kopplade till dess UUID, men vi kommer inte att kunna spara och använda dem på rätt sätt. I sådana här frågor är det viktigt att ha i åtanke uppgifternas beroende, särskilt om de kommer från olika källor. Dessa är naturligtvis otillräckliga åtgärder för att bevara dataintegriteten, men i vissa fall är de nödvändiga. Ja, och tomgång för att ockupera resurser är också irrationellt.

Således kommer vår sensor att lansera efterföljande hörn av grafen om det finns färsk information i posten, och även markera den tidigare informationen som irrelevant.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Vi tar emot och använder data

För att ta emot och bearbeta data kan du skriva en separat operatör, du kan använda färdiga. Eftersom logiken än så länge är trivial - för att ta data från brevet, till exempel, föreslår jag standard PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Förresten, om din företagspost också finns på mail.ru, kommer du inte att kunna söka efter brev efter ämne, avsändare etc. Redan 2016 lovade de att införa det, men ändrade tydligen åsikt. Jag löste detta problem genom att skapa en separat mapp för de nödvändiga breven och sätta upp ett filter för de nödvändiga breven i e-postwebbgränssnittet. Således är det bara de nödvändiga bokstäverna och villkoren för sökningen, i mitt fall, helt enkelt (OSED) i den här mappen.

Sammanfattningsvis har vi följande sekvens: vi kontrollerar om det finns nya brev som uppfyller villkoren, om det finns så laddar vi ner arkivet med hjälp av länken från det sista brevet.
Under de sista prickarna är det utelämnat att detta arkiv kommer att packas upp, data från arkivet kommer att rensas och bearbetas, och som ett resultat kommer det hela att gå vidare till pipelinen av ETL-processen, men detta är redan bortom artikelns omfattning. Om det visade sig intressant och användbart, så fortsätter jag gärna att beskriva ETL-lösningar och deras delar för Apache Airflow.

Källa: will.com

Lägg en kommentar