ETL процес отримання даних з електронної пошти в Apache Airflow

ETL процес отримання даних з електронної пошти в Apache Airflow

Як би сильно не розвивалися технології, за розвитком завжди тягнеться низка застарілих підходів. Це може бути обумовлено плавним переходом, людським фактором, технологічними потребами чи чимось іншим. В області обробки даних найбільш показовими у цій частині є джерела даних. Як би ми не мріяли цього позбутися, але поки частина даних пересилається в месенджерах та електронних листах, не кажучи і про більш архаїчні формати. Запрошую під кат розібрати один із варіантів для Apache Airflow, що ілюструє, як можна забирати дані з електронних листів.

Передісторія

Багато даних досі передаються електронною поштою, починаючи з міжособистісних комунікацій і закінчуючи стандартами взаємодії між компаніями. Добре, якщо вдається для отримання даних написати інтерфейс або посадити людей в офісі, які цю інформацію вноситимуть у зручніші джерела, але часто такої можливості може просто не бути. Конкретне завдання, з яким я зіткнувся, — це підключення відомої CRM системи до сховища даних, а далі — до системи OLAP. Так історично склалося, що для нашої компанії використання цієї системи було зручно в окремій області бізнесу. Тому всім дуже хотілося мати можливість оперувати даними і з цієї сторонньої системи, зокрема. Насамперед, звичайно, було вивчено можливість отримання даних з відкритого API. На жаль, API не покривало отримання всіх необхідних даних, та й, висловлюючись простою мовою, багато в чому кривувато, а технічна підтримка не захотіла або не змогла піти назустріч для надання більш вичерпного функціоналу. Зате дана система надавала можливість періодичного отримання даних на пошту у вигляді посилання для вивантаження архіву.

Слід зазначити, що це був єдиний кейс, яким бізнес хотів збирати дані з поштових листів чи месенджерів. Однак, у цьому випадку ми не могли вплинути на сторонню компанію, яка надає частину даних лише у такий спосіб.

Потік повітря Apache

Для побудови процесів ETL ми найчастіше використовуємо Apache Airflow. Для того, щоб читач, незнайомий з цією технологією, краще зрозумів, як це виглядає в контексті і в цілому, опишу пару вступних.

Apache Airflow – вільна платформа, яка використовується для побудови, виконання та моніторингу ETL (Extract-Transform-Loading) процесів мовою Python. Основним поняттям Airflow є орієнтований ациклічний граф, де вершини графа — конкретні процеси, а ребра графа — потік управління чи інформації. Процес може просто викликати будь-яку функцію Python, а може мати більш складну логіку з послідовного виклику декількох функцій в контексті класу. Для найчастіших операцій вже є безліч готових напрацювань, які можна використовувати як процеси. До таких напрацювань відносяться:

  • оператори - для перегону даних з одного місця до іншого, наприклад з таблиці БД в сховище даних;
  • сенсори - для очікування настання певної події та напрямки потоку управління у наступні вершини графа;
  • хуки - для більш низькорівневих операцій, наприклад, для отримання даних із таблиці БД (використовуються в операторах);
  • тощо.

Описувати Apache Airflow докладно у цій статті буде недоцільно. Короткі вступ можна подивитися тут або тут.

Хук для отримання даних

Насамперед, для вирішення завдання потрібно написати хук, за допомогою якого ми могли б:

  • підключатися до електронної пошти;
  • знаходити потрібний лист;
  • отримувати дані з листа.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Логіка така: підключаємося, знаходимо останній найактуальніший лист, якщо є інші — ігноруємо їх. Використовується саме така функція, тому що пізніші листи містять усі дані ранніх. Якщо це не так, то можна повертати масив усіх листів або обробляти перший, а решта — при наступному проході. Загалом все як завжди залежить від завдання.

Додаємо до хука дві допоміжні функції: для завантаження файлу та для завантаження файлу за посиланням з листа. До речі, їх можна винести в оператор, це залежить від частоти використання цього функціоналу. Що ще дописувати в хук, знову ж таки, залежить від завдання: якщо в листі приходять одразу файли, то можна завантажувати додатки до листа, якщо дані приходять у листі, то треба ширяти лист і т.д. У моєму випадку лист надходить з одним посиланням на архів, який мені потрібно покласти в певне місце і запустити подальший процес обробки.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Код простий, тому навряд чи потребує додаткових пояснень. Розповім лише про магічний рядок imap_conn_id. Apache Airflow зберігає параметри підключень (логін, пароль, адресу та інші параметри), до яких можна звертатись за рядковим ідентифікатором. Візуально керування підключеннями виглядає ось так

ETL процес отримання даних з електронної пошти в Apache Airflow

Сенсор для очікування даних

Оскільки ми вже вміємо підключатися та отримувати дані з пошти, тепер можемо написати сенсор для їхнього очікування. Написати відразу оператор, який оброблятиме дані, якщо вони є, в моєму випадку не вийшло, оскільки на підставі отриманих даних з пошти працюють й інші процеси, в тому числі, пов'язані дані, що беруть, з інших джерел (API, телефонія, веб метрики і і т.д.). Наведу приклад. У системі CRM з'явився новий користувач, і ми ще не знаємо про його UUID. Тоді при спробі отримати дані з SIP-телефонії ми отримаємо дзвінки, прив'язані до UUID, але коректно зберегти і використовувати їх не зможемо. У таких питаннях важливо мати на увазі залежність даних, особливо якщо вони з різних джерел. Це, звісно, ​​недостатні заходи збереження цілісності даних, але у випадках необхідні. Та й у холосту позичати ресурси теж нераціонально.

Таким чином, наш сенсор буде запускати наступні вершини графа, якщо є свіжа інформація поштою, а також позначати неактуальною попередню інформацію.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Отримуємо та використовуємо дані

Для отримання та обробки даних можна написати окремий оператор, можна використовувати готові. Оскільки поки що логіка тривіальна — забрати дані з листа, то для прикладу пропоную стандартний PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

До речі, якщо ваша корпоративна пошта теж на mail.ru, вам буде недоступний пошук листів по темі, відправнику і т.д. Вони ще в далекому 2016 році обіцяли ввести, але, мабуть, передумали. Я вирішив цю проблему, створивши під потрібні листи окрему папку та налаштувавши у веб-інтерфейсі пошти фільтр на потрібні листи. Таким чином, у цю папку потрапляють лише потрібні листи та умови для пошуку у моєму випадку просто (UNSEEN).

Резюмуючи, ми маємо таку послідовність: перевіряємо, чи є нові листи, які відповідають умовам, якщо є, то завантажуємо архів за посиланням з останнього листа.
Під останніми трьома крапками опущено, що цей архів буде розпакований, дані з архіву очищені і оброблені, і в результаті ця справа піде далі на конвеєр ETL процесу, але це вже виходить за рамки теми статті. Якщо вийшло цікаво та корисно, то з радістю продовжу описувати ETL рішення та їх частини для Apache Airflow.

Джерело: habr.com

Додати коментар або відгук