ETL працэс атрымання дадзеных з электроннай пошты ў Apache Airflow

ETL працэс атрымання дадзеных з электроннай пошты ў Apache Airflow

Як бы моцна не развіваліся тэхналогіі, за развіццём заўсёды цягнецца чарада састарэлых падыходаў. Гэта можа быць абумоўлена плыўным пераходам, чалавечым фактарам, тэхналагічнымі неабходнасцямі ці нечым іншым. У вобласці апрацоўкі дадзеных найболей паказальнымі ў гэтай частцы з'яўляюцца крыніцы дадзеных. Як бы мы не марылі ад гэтага пазбавіцца, але пакуль частка звестак перасылаецца ў мэсэнджарах і электронных лістах, не кажучы і пра больш архаічныя фарматы. Запрашаю пад кат разабраць адзін з варыянтаў для Apache Airflow, які ілюструе, як можна забіраць дадзеныя з электронных лістоў.

перадгісторыя

Многія дадзеныя да гэтага часу перадаюцца праз электронную пошту, пачынаючы з міжасобасных камунікацый і заканчваючы стандартамі ўзаемадзеяння паміж кампаніямі. Добра, калі атрымоўваецца для атрымання дадзеных напісаць інтэрфейс ці пасадзіць людзей у офісе, якія гэтую інфармацыю будуць уносіць у зручнейшыя крыніцы, але часцяком такой магчымасці можа проста не быць. Канкрэтная задача, з якой сутыкнуўся я, – гэта падлучэнне даволі вядомай CRM сістэмы да сховішча дадзеных, а далей – да сістэмы OLAP. Так гістарычна склалася, што для нашай кампаніі выкарыстанне гэтай сістэмы было зручна ў асобна ўзятай вобласці бізнэсу. Таму ўсім вельмі жадалася мець магчымасць апераваць дадзенымі і з гэтай іншай сістэмы ў тым ліку. У першую чаргу, вядома, была вывучана магчымасць атрымання даных з адкрытага API. Нажаль, API не покрыва атрыманне ўсіх неабходных дадзеных, ды і, кажучы простай мовай, было шмат у чым крывавата, а тэхнічная падтрымка не захацела ці не змагла пайсці насустрач для падавання больш вычарпальнага функцыяналу. Затое дадзеная сістэма давала магчымасць перыядычнага атрымання дадзеных на пошту ў выглядзе спасылкі для выгрузкі архіва.

Трэба адзначыць, што гэта быў не адзіны кейс, па якім бізнэс хацеў збіраць дадзеныя з паштовых лістоў ці месэнджараў. Аднак, у дадзеным выпадку мы не маглі паўплываць на іншую кампанію, якая падае частку дадзеных толькі такім спосабам.

Apache Airflow

Для пабудовы ETL працэсаў мы часцей за ўсё выкарыстоўваем Apache Airflow. Для таго каб чытач, незнаёмы з гэтай тэхналогіяй, лепш зразумеў, як гэта выглядае ў кантэксце і ў цэлым, апішу пару ўступных.

Apache Airflow – свабодная платформа, якая выкарыстоўваецца для пабудовы, выканання і маніторынгу ETL (Extract-Transform-Loading) працэсаў на мове Python. Асноўным паняццем у Airflow з'яўляецца арыентаваны ацыклічны граф, дзе вяршыні графа - канкрэтныя працэсы, а рэбры графа - паток кіравання або інфармацыі. Працэс можа проста выклікаць любую Python функцыю, а можа мець больш складаную логіку з паслядоўнага выкліку некалькіх функцый у кантэксце класа. Для найболей частых аперацый ужо ёсць мноства гатовых напрацовак, якія можна выкарыстаць у якасці працэсаў. Да такіх напрацовак адносяцца:

  • аператары - для перагону дадзеных з аднаго месца ў іншае, напрыклад з табліцы БД у сховішча дадзеных;
  • сэнсары - для чакання наступлення пэўнай падзеі і напрамкі патоку кіравання ў наступныя вяршыні графа;
  • хукі - для больш нізкаўзроўневых аперацый, напрыклад, для атрымання дадзеных з табліцы БД (выкарыстоўваюцца ў аператарах);
  • і г.д.

Апісваць Apache Airflow падрабязна ў гэтым артыкуле будзе немэтазгодна. Кароткія ўводзіны можна паглядзець тут або тут.

Хук для атрымання дадзеных

У першую чаргу, для рашэння задачы трэба напісаць хук, з дапамогай якога мы маглі б:

  • падключацца да электроннай пошты;
  • знаходзіць патрэбны ліст;
  • атрымліваць дадзеныя з ліста.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Логіка такая: падключаемся, знаходзім апошні самы актуальны ліст, калі ёсць іншыя - ігнаруем іх. Выкарыстоўваецца менавіта такая функцыя, таму што пазнейшыя лісты ўтрымоўваюць усе дадзеныя ранніх. Калі гэта не так, то можна вяртаць масіў усіх лістоў або апрацоўваць першае, а астатнія - пры наступным праходзе. Увогуле ўсё як заўсёды залежыць ад задачы.

Дадаем да хуку дзве дапаможныя функцыі: для спампоўкі файла і для запампоўкі файла па спасылцы з ліста. Дарэчы, іх можна вынесці ў аператар, гэта залежыць ад частаты выкарыстання дадзенага функцыяналу. Што яшчэ дапісваць у хук, ізноў жа, залежыць ад задачы: калі ў лісце прыходзяць адразу файлы, то можна спампоўваць прыкладанні да ліста, калі дадзеныя прыходзяць у лісце, то трэба парсіць ліст і т.д. У маім выпадку ліст прыходзіць з адной спасылкай на архіў, які мне трэба пакласці ў пэўнае месца і запусціць далейшы працэс апрацоўкі.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Код просты, таму ці наўрад мае патрэбу ў дадатковых тлумачэннях. Раскажу толькі пра магічны радок imap_conn_id. Apache Airflow захоўвае параметры падлучэнняў (лагін, пароль, адрас і іншыя параметры), да якіх можна звяртацца па радковым ідэнтыфікатару. Візуальна кіраванне падлучэннямі выглядае вось так

ETL працэс атрымання дадзеных з электроннай пошты ў Apache Airflow

Сэнсар для чакання дадзеных

Паколькі мы ўжо ўмеем падлучацца і атрымліваць дадзеныя з пошты, зараз можам напісаць сэнсар для іх чакання. Напісаць адразу аператар, які будзе апрацоўваць дадзеныя, калі яны маюцца, у маім выпадку не атрымалася, бо на падставе атрыманых дадзеных з пошты працуюць і іншыя працэсы, у тым ліку, якія бяруць звязаныя дадзеныя з іншых крыніц (API, тэлефанія, вэб метрыкі і г.д.). Прывяду прыклад. У CRM сістэме з'явіўся новы карыстач, і мы яшчэ не ведаем пра яго UUID. Тады пры спробе атрымаць дадзеныя з SIP-тэлефаніі мы атрымаем званкі, прывязаныя да яго UUID, але карэктна захаваць і выкарыстоўваць іх не зможам. У такіх пытаннях важна мець на ўвазе залежнасць дадзеных, асабліва калі яны з розных крыніц. Гэта, вядома, недастатковыя меры захавання цэласнасці звестак, але ў некаторых выпадках неабходныя. Ды і ў халастую займаць рэсурсы таксама нерацыянальна.

Такім чынам, наш сэнсар будзе запускаць наступныя вяршыні графа, калі ёсць свежая інфармацыя на пошце, а таксама адзначаць неактуальнай папярэднюю інфармацыю.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Атрымліваем і выкарыстоўваем дадзеныя

Для атрымання і апрацоўкі даных можна напісаць асобны аператар, можна выкарыстоўваць гатовыя. Паколькі пакуль логіка трывіяльная - забраць дадзеныя з ліста, то для прыкладу прапаную стандартны PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Дарэчы, калі ваша карпаратыўная пошта таксама на mail.ru, то вам будзе недаступны пошук лістоў па тэме, адпраўніку і т.д. Яны яшчэ ў далёкім 2016 абяцалі ўвесці, але, відаць, перадумалі. Я вырашыў гэтую праблему, стварыўшы пад патрэбныя лісты асобную тэчку і наладзіўшы ў вэб-інтэрфейсе пошты фільтр на патрэбныя лісты. Такім чынам, у гэтую тэчку пападаюць толькі патрэбныя лісты і ўмовы для пошуку ў маім выпадку проста (UNSEEN).

Рэзюмуючы, мы маем наступную паслядоўнасць: правяраем, ці ёсць новыя лісты, якія адпавядаюць умовам, калі ёсць, то спампоўваем архіў па спасылцы з апошняга ліста.
Пад апошнімі шматкроп'ямі апушчана, што гэты архіў будзе распакаваны, дадзеныя з архіва ачышчаны і апрацаваны, і ў выніку ўся гэтая справа сыдзе далей на канвеер ETL працэсу, але гэта ўжо выходзіць за рамкі тэмы артыкула. Калі атрымалася цікава і карысна, то з радасцю працягну апісваць ETL рашэнні і іх часткі для Apache Airflow.

Крыніца: habr.com

Дадаць каментар