Processus ETL pour obtenir des données à partir d'un e-mail dans Apache Airflow

Processus ETL pour obtenir des données à partir d'un e-mail dans Apache Airflow

Peu importe l’évolution de la technologie, une série d’approches dépassées est toujours à la traîne du développement. Cela peut être dû à une transition en douceur, à des facteurs humains, à des besoins technologiques ou à autre chose. Dans le domaine du traitement des données, les sources de données sont les plus révélatrices dans cette partie. Peu importe à quel point nous rêvons de nous en débarrasser, mais jusqu'à présent, une partie des données est envoyée dans des messageries instantanées et des e-mails, sans parler de formats plus archaïques. Je vous invite à démonter l'une des options d'Apache Airflow, illustrant comment récupérer les données des e-mails.

Préhistoire

De nombreuses données sont encore transférées par courrier électronique, depuis les communications interpersonnelles jusqu'aux normes d'interaction entre les entreprises. C'est bien s'il est possible d'écrire une interface pour obtenir des données ou de placer des personnes au bureau qui saisiront ces informations dans des sources plus pratiques, mais souvent, cela peut tout simplement ne pas être possible. La tâche spécifique à laquelle j'étais confronté consistait à connecter le fameux système CRM à l'entrepôt de données, puis au système OLAP. Historiquement, il s'est avéré que pour notre entreprise, l'utilisation de ce système était pratique dans un domaine d'activité particulier. Par conséquent, tout le monde voulait vraiment pouvoir également fonctionner avec les données de ce système tiers. Tout d’abord, bien entendu, la possibilité d’obtenir des données à partir d’une API ouverte a été étudiée. Malheureusement, l'API ne permettait pas d'obtenir toutes les données nécessaires et, en termes simples, elle était tordue à bien des égards, et le support technique ne voulait pas ou ne pouvait pas se réunir à mi-chemin pour fournir des fonctionnalités plus complètes. Mais ce système offrait la possibilité de recevoir périodiquement les données manquantes par courrier sous forme de lien de déchargement de l'archive.

Il convient de noter que ce n’était pas le seul cas dans lequel l’entreprise souhaitait collecter des données à partir d’e-mails ou de messageries instantanées. Cependant, dans ce cas, nous ne pourrions pas influencer une société tierce qui fournit uniquement une partie des données de cette manière.

Flux d'air Apache

Pour créer des processus ETL, nous utilisons le plus souvent Apache Airflow. Afin qu'un lecteur qui n'est pas familier avec cette technologie puisse mieux comprendre à quoi elle ressemble dans le contexte et en général, j'en décrirai quelques-unes d'introduction.

Apache Airflow est une plate-forme gratuite utilisée pour créer, exécuter et surveiller les processus ETL (Extract-Transform-Loading) en Python. Le concept principal d'Airflow est un graphe acyclique orienté, où les sommets du graphe sont des processus spécifiques et les bords du graphe sont le flux de contrôle ou d'informations. Un processus peut simplement appeler n'importe quelle fonction Python, ou il peut avoir une logique plus complexe en appelant séquentiellement plusieurs fonctions dans le contexte d'une classe. Pour les opérations les plus fréquentes, il existe déjà de nombreux développements prêts à l'emploi pouvant être utilisés comme processus. Ces développements comprennent :

  • opérateurs - pour transférer des données d'un endroit à un autre, par exemple d'une table de base de données vers un entrepôt de données ;
  • capteurs - pour attendre l'apparition d'un certain événement et diriger le flux de contrôle vers les sommets suivants du graphique ;
  • hooks - pour les opérations de niveau inférieur, par exemple, pour obtenir des données d'une table de base de données (utilisées dans les instructions) ;
  • etc.

Il serait inapproprié de décrire Apache Airflow en détail dans cet article. De brèves introductions peuvent être consultées ici ou ici.

Crochet pour obtenir des données

Tout d’abord, pour résoudre le problème, nous devons écrire un hook avec lequel nous pourrions :

  • se connecter au courrier électronique
  • trouver la bonne lettre
  • recevoir les données de la lettre.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

La logique est la suivante : on connecte, on trouve la dernière lettre la plus pertinente, s'il y en a d'autres, on les ignore. Cette fonction est utilisée car les lettres ultérieures contiennent toutes les données des lettres précédentes. Si ce n'est pas le cas, vous pouvez alors renvoyer un tableau de toutes les lettres, ou traiter la première et le reste lors du passage suivant. En général, comme toujours, tout dépend de la tâche.

Nous ajoutons deux fonctions auxiliaires au hook : pour télécharger un fichier et pour télécharger un fichier à l'aide d'un lien provenant d'un email. À propos, ils peuvent être déplacés vers l'opérateur, cela dépend de la fréquence d'utilisation de cette fonctionnalité. Ce qu'il faut ajouter d'autre au crochet dépend encore une fois de la tâche : si les fichiers sont reçus immédiatement dans la lettre, vous pouvez alors télécharger les pièces jointes à la lettre, si les données sont reçues dans la lettre, vous devez alors analyser la lettre, etc. Dans mon cas, la lettre est accompagnée d'un lien vers les archives, que je dois placer à un certain endroit et lancer le processus de traitement ultérieur.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Le code est simple, il ne nécessite donc guère d’explications supplémentaires. Je vais juste vous parler de la ligne magique imap_conn_id. Apache Airflow stocke les paramètres de connexion (identifiant, mot de passe, adresse et autres paramètres) accessibles par un identifiant de chaîne. Visuellement, la gestion des connexions ressemble à ceci

Processus ETL pour obtenir des données à partir d'un e-mail dans Apache Airflow

Capteur pour attendre les données

Puisque nous savons déjà comment nous connecter et recevoir des données du courrier, nous pouvons désormais écrire un capteur pour les attendre. Dans mon cas, cela n'a pas fonctionné d'écrire immédiatement un opérateur qui traitera les données, le cas échéant, car d'autres processus fonctionnent sur la base des données reçues du courrier, y compris ceux qui prennent les données associées provenant d'autres sources (API, téléphonie , métriques web, etc.). Je vais vous donner un exemple. Un nouvel utilisateur est apparu dans le système CRM et nous ne connaissons toujours pas son UUID. Ensuite, en essayant de recevoir des données de la téléphonie SIP, nous recevrons des appels liés à son UUID, mais nous ne pourrons pas les sauvegarder et les utiliser correctement. Dans de tels domaines, il est important de garder à l’esprit la dépendance des données, surtout si elles proviennent de sources différentes. Bien entendu, ces mesures ne suffisent pas à préserver l’intégrité des données, mais elles sont nécessaires dans certains cas. Oui, et tourner au ralenti pour occuper des ressources est également irrationnel.

Ainsi, notre capteur lancera les sommets suivants du graphique s'il y a de nouvelles informations dans le courrier, et marquera également les informations précédentes comme non pertinentes.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Nous recevons et utilisons des données

Pour recevoir et traiter des données, vous pouvez écrire un opérateur distinct, vous pouvez utiliser des opérateurs prêts à l'emploi. Puisque jusqu'à présent la logique est triviale - pour prendre des données d'une lettre, par exemple, je suggère le PythonOperator standard

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

À propos, si votre courrier d'entreprise se trouve également sur mail.ru, vous ne pourrez pas rechercher de lettres par sujet, expéditeur, etc. En 2016, ils avaient promis de l’introduire, mais ont apparemment changé d’avis. J'ai résolu ce problème en créant un dossier séparé pour les lettres nécessaires et en configurant un filtre pour les lettres nécessaires dans l'interface Web de messagerie. Ainsi, seules les lettres et conditions nécessaires à la recherche, dans mon cas, sont simplement (INCONNUES) dans ce dossier.

En résumé, nous avons la séquence suivante : nous vérifions s'il y a de nouvelles lettres qui remplissent les conditions, s'il y en a, puis nous téléchargeons l'archive en utilisant le lien de la dernière lettre.
Sous les derniers points, il est omis que cette archive sera décompressée, les données de l'archive seront effacées et traitées, et par conséquent, le tout ira plus loin dans le pipeline du processus ETL, mais c'est déjà au-delà la portée de l'article. Si cela s'avère intéressant et utile, je continuerai volontiers à décrire les solutions ETL et leurs composants pour Apache Airflow.

Source: habr.com

Ajouter un commentaire