ETL-Prozess zum Abrufen von Daten aus E-Mails in Apache Airflow

ETL-Prozess zum Abrufen von Daten aus E-Mails in Apache Airflow

Ganz gleich, wie weit sich die Technologie weiterentwickelt, eine Reihe veralteter Ansätze hinkt der Entwicklung immer hinterher. Dies kann auf einen reibungslosen Übergang, menschliche Faktoren, technologische Anforderungen oder etwas anderes zurückzuführen sein. Im Bereich der Datenverarbeitung sind die Datenquellen in diesem Teil am aufschlussreichsten. Egal wie sehr wir davon träumen, dies loszuwerden, ein Teil der Daten wird bisher über Instant Messenger und E-Mails gesendet, ganz zu schweigen von archaischeren Formaten. Ich lade Sie ein, eine der Optionen für Apache Airflow zu zerlegen und zu veranschaulichen, wie Sie Daten aus E-Mails entnehmen können.

Vorgeschichte

Noch immer werden viele Daten per E-Mail übertragen, von der zwischenmenschlichen Kommunikation bis hin zu Standards der Interaktion zwischen Unternehmen. Es ist gut, wenn es möglich ist, eine Schnittstelle zu schreiben, um Daten zu erhalten oder Leute im Büro einzusetzen, die diese Informationen in bequemere Quellen eingeben, aber oft ist dies einfach nicht möglich. Die konkrete Aufgabe, vor der ich stand, bestand darin, das berüchtigte CRM-System mit dem Data Warehouse und dann mit dem OLAP-System zu verbinden. In der Vergangenheit war es für unser Unternehmen so, dass der Einsatz dieses Systems in einem bestimmten Geschäftsbereich praktisch war. Deshalb wollte jeder unbedingt auch mit den Daten dieses Drittsystems arbeiten können. Zunächst wurde natürlich die Möglichkeit untersucht, Daten aus einer offenen API zu beziehen. Leider reichte die API nicht aus, um alle notwendigen Daten zu erhalten, und sie war, vereinfacht gesagt, in vielerlei Hinsicht fehlerhaft, und der technische Support wollte oder konnte nicht auf halbem Weg zur Bereitstellung umfassenderer Funktionen beitragen. Dieses System bot jedoch die Möglichkeit, die fehlenden Daten regelmäßig per E-Mail in Form eines Links zum Entladen des Archivs zu erhalten.

Es ist zu beachten, dass dies nicht der einzige Fall war, in dem das Unternehmen Daten aus E-Mails oder Instant Messengern sammeln wollte. Allerdings haben wir in diesem Fall keinen Einfluss auf ein Drittunternehmen, das einen Teil der Daten nur auf diesem Wege bereitstellt.

Apache-Luftstrom

Um ETL-Prozesse zu erstellen, verwenden wir am häufigsten Apache Airflow. Damit ein Leser, der mit dieser Technologie nicht vertraut ist, besser verstehen kann, wie sie im Kontext und im Allgemeinen aussieht, werde ich einige einführende beschreiben.

Apache Airflow ist eine kostenlose Plattform, die zum Erstellen, Ausführen und Überwachen von ETL-Prozessen (Extract-Transform-Loading) in Python verwendet wird. Das Hauptkonzept von Airflow ist ein gerichteter azyklischer Graph, bei dem die Eckpunkte des Graphen spezifische Prozesse und die Kanten des Graphen den Kontroll- oder Informationsfluss darstellen. Ein Prozess kann einfach eine beliebige Python-Funktion aufrufen oder über eine komplexere Logik verfügen, indem mehrere Funktionen im Kontext einer Klasse nacheinander aufgerufen werden. Für die häufigsten Vorgänge gibt es bereits viele fertige Entwicklungen, die als Prozesse genutzt werden können. Zu diesen Entwicklungen gehören:

  • Operatoren – zum Übertragen von Daten von einem Ort zum anderen, beispielsweise von einer Datenbanktabelle in ein Data Warehouse;
  • Sensoren – um auf das Eintreten eines bestimmten Ereignisses zu warten und den Kontrollfluss auf die nachfolgenden Eckpunkte des Diagramms zu lenken;
  • Hooks – für Operationen auf niedrigerer Ebene, zum Beispiel um Daten aus einer Datenbanktabelle abzurufen (in Anweisungen verwendet);
  • usw.

Es wäre unangemessen, Apache Airflow in diesem Artikel ausführlich zu beschreiben. Kurze Einführungen können eingesehen werden hier oder hier.

Hook zum Abrufen von Daten

Um das Problem zu lösen, müssen wir zunächst einen Hook schreiben, mit dem wir:

  • Verbindung mit E-Mail herstellen
  • Finde den richtigen Buchstaben
  • Daten aus dem Brief erhalten.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Die Logik ist folgende: Wir verbinden uns, finden den letzten relevantesten Buchstaben, wenn es andere gibt, ignorieren wir sie. Diese Funktion wird verwendet, da spätere Briefe alle Daten früherer Briefe enthalten. Wenn dies nicht der Fall ist, können Sie ein Array aller Buchstaben zurückgeben oder den ersten Buchstaben und den Rest im nächsten Durchgang verarbeiten. Im Allgemeinen hängt alles wie immer von der Aufgabe ab.

Wir fügen dem Hook zwei Hilfsfunktionen hinzu: zum Herunterladen einer Datei und zum Herunterladen einer Datei über einen Link aus einer E-Mail. Sie können übrigens auf den Betreiber übertragen werden, es hängt von der Häufigkeit der Nutzung dieser Funktionalität ab. Was dem Hook noch hinzugefügt werden muss, hängt wiederum von der Aufgabe ab: Wenn Dateien sofort im Brief eingehen, können Sie Anhänge zum Brief herunterladen. Wenn die Daten im Brief eingehen, müssen Sie den Brief analysieren. usw. In meinem Fall liegt dem Brief ein Link zum Archiv bei, den ich an einer bestimmten Stelle platzieren und den weiteren Bearbeitungsprozess starten muss.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Der Code ist einfach und bedarf kaum einer weiteren Erklärung. Ich erzähle Ihnen nur etwas über die magische Zeile imap_conn_id. Apache Airflow speichert Verbindungsparameter (Login, Passwort, Adresse und andere Parameter), auf die über eine Zeichenfolgenkennung zugegriffen werden kann. Optisch sieht das Verbindungsmanagement so aus

ETL-Prozess zum Abrufen von Daten aus E-Mails in Apache Airflow

Sensor wartet auf Daten

Da wir bereits wissen, wie man eine Verbindung herstellt und Daten per E-Mail empfängt, können wir jetzt einen Sensor schreiben, der auf sie wartet. In meinem Fall hat es nicht funktioniert, sofort einen Betreiber zu schreiben, der die Daten, falls vorhanden, verarbeiten wird, da andere Prozesse auf der Grundlage der per E-Mail erhaltenen Daten funktionieren, einschließlich solcher, die entsprechende Daten aus anderen Quellen (API, Telefonie) beziehen , Webmetriken usw.). usw.). Ich gebe Ihnen ein Beispiel. Im CRM-System ist ein neuer Benutzer aufgetaucht, dessen UUID wir noch nicht kennen. Wenn wir dann versuchen, Daten von der SIP-Telefonie zu empfangen, erhalten wir Anrufe, die an ihre UUID gebunden sind, können diese jedoch nicht speichern und korrekt verwenden. Dabei ist es wichtig, die Abhängigkeit der Daten im Auge zu behalten, insbesondere wenn diese aus unterschiedlichen Quellen stammen. Dies sind natürlich keine ausreichenden Maßnahmen zur Wahrung der Datenintegrität, aber in manchen Fällen sind sie notwendig. Ja, und Leerlauf zur Ressourcenbelegung ist auch irrational.

Daher startet unser Sensor nachfolgende Eckpunkte des Diagramms, wenn die E-Mail neue Informationen enthält, und markiert außerdem die vorherigen Informationen als irrelevant.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Wir erhalten und verwenden Daten

Um Daten zu empfangen und zu verarbeiten, können Sie einen separaten Operator schreiben oder vorgefertigte verwenden. Da die Logik immer noch trivial ist – um beispielsweise Daten aus dem Brief zu entnehmen, schlage ich den Standard-PythonOperator vor

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Wenn sich Ihre Firmenpost übrigens auch auf mail.ru befindet, können Sie Briefe nicht nach Betreff, Absender usw. suchen. Bereits 2016 versprachen sie die Einführung, änderten aber offenbar ihre Meinung. Ich habe dieses Problem gelöst, indem ich einen separaten Ordner für die benötigten Briefe erstellt und im Mail-Webinterface einen Filter für die benötigten Briefe eingerichtet habe. Somit gelangen nur die für die Suche notwendigen Buchstaben und Bedingungen, in meinem Fall, einfach (UNSEEN) in diesen Ordner.

Zusammenfassend haben wir den folgenden Ablauf: Wir prüfen, ob es neue Briefe gibt, die die Bedingungen erfüllen. Wenn ja, laden wir das Archiv über den Link vom letzten Brief herunter.
Unter den letzten Punkten wird weggelassen, dass dieses Archiv entpackt wird, die Daten aus dem Archiv gelöscht und verarbeitet werden und das Ganze dadurch in die Pipeline des ETL-Prozesses gelangt, aber das ist schon darüber hinaus den Umfang des Artikels. Wenn es sich als interessant und nützlich erwiesen hat, beschreibe ich gerne weiterhin ETL-Lösungen und deren Teile für Apache Airflow.

Source: habr.com

Kommentar hinzufügen