ETL процес за добивање податоци од е-пошта во Apache Airflow

ETL процес за добивање податоци од е-пошта во Apache Airflow

Без разлика колку се развива технологијата, низа застарени пристапи секогаш следат зад развојот. Ова може да се должи на непречена транзиција, човечки фактори, технолошки потреби или нешто друго. Во областа на обработката на податоците, во овој дел најоткриваат изворите на податоци. Колку и да сонуваме да се ослободиме од ова, но досега дел од податоците се испраќаат во инстант-месинџери и мејлови, а да не зборуваме за поархаични формати. Ве поканувам да расклопите една од опциите за Apache Airflow, илустрирајќи како можете да земате податоци од е-пошта.

праисторијата

Многу податоци сè уште се пренесуваат преку е-пошта, од интерперсонални комуникации до стандарди за интеракција помеѓу компаниите. Добро е ако е можно да се напише интерфејс за да се добијат податоци или да се стават луѓе во канцеларијата кои ќе ги внесат овие информации во попогодни извори, но често тоа едноставно може да не е можно. Специфичната задача со која се соочив беше поврзување на озлогласениот CRM систем со складиштето на податоци, а потоа и со системот OLAP. Историски се случи дека за нашата компанија употребата на овој систем беше погодна во одредена деловна област. Затоа, секој навистина сакаше да може да работи со податоци и од овој систем од трета страна. Пред сè, се разбира, беше проучена можноста за добивање податоци од отворен API. За жал, API-то не го покрива добивањето на сите потребни податоци и, во едноставни термини, на многу начини беше криво, а техничката поддршка не сакаше или не можеше да се сретне на половина пат за да обезбеди посеопфатна функционалност. Но, овој систем даде можност периодично да ги прима податоците што недостасуваат по пошта во форма на врска за растоварување на архивата.

Треба да се напомене дека ова не беше единствениот случај во кој бизнисот сакаше да собира податоци од е-пошта или инстант-месинџери. Меѓутоа, во овој случај не можевме да влијаеме на трета компанија која обезбедува дел од податоците само на овој начин.

Апачи проток на воздух

За да изградиме ETL процеси, најчесто користиме Apache Airflow. Со цел читателот кој не е запознаен со оваа технологија подобро да разбере како изгледа во контекст и воопшто, ќе опишам неколку воведни.

Apache Airflow е бесплатна платформа која се користи за градење, извршување и следење на процесите ETL (Extract-Transform-Loading) во Python. Главниот концепт во Воздухот е насочен ацикличен график, каде што темињата на графикот се специфични процеси, а рабовите на графикот се протокот на контрола или информации. Процесот може едноставно да повика која било функција на Пајтон, или може да има посложена логика од секвенцијално повикување неколку функции во контекст на класа. За најчестите операции, веќе има многу готови развивања кои можат да се користат како процеси. Ваквите случувања вклучуваат:

  • оператори - за пренос на податоци од едно место на друго, на пример, од табела со база на податоци до складиште на податоци;
  • сензори - за чекање за појава на одреден настан и насочување на протокот на контрола кон следните темиња на графикот;
  • куки - за операции на пониско ниво, на пример, за добивање податоци од табела со база на податоци (се користи во изјавите);
  • итн

Би било несоодветно детално да се опише Apache Airflow во оваа статија. Може да се видат кратки воведи тука или тука.

Кука за добивање податоци

Најпрво, за да го решиме проблемот, треба да напишеме кука со која би можеле:

  • поврзете се на е-пошта
  • најдете ја вистинската буква
  • добиваат податоци од писмото.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Логиката е следна: се поврзуваме, ја наоѓаме последната најрелевантна буква, ако има други, ги игнорираме. Оваа функција се користи бидејќи подоцнежните букви ги содржат сите податоци од претходните. Ако ова не е случај, тогаш можете да вратите низа од сите букви или да ја обработите првата, а остатокот на следната пропусница. Во принцип, сè, како и секогаш, зависи од задачата.

Додаваме две помошни функции на куката: за преземање датотека и за преземање датотека користејќи врска од е-пошта. Патем, тие можат да се преместат кај операторот, тоа зависи од фреквенцијата на користење на оваа функционалност. Што друго да се додаде на куката, повторно, зависи од задачата: ако датотеките се примени веднаш во писмото, тогаш можете да преземете прилози на писмото, ако податоците се примени во писмото, тогаш треба да ја анализирате буквата, итн. Во мојот случај, писмото доаѓа со еден линк до архивата, кој треба да го ставам на одредено место и да го започнам процесот на понатамошна обработка.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Кодот е едноставен, така што едвај треба дополнително објаснување. Само ќе ви кажам за магичната линија imap_conn_id. Apache Airflow ги складира параметрите за поврзување (најава, лозинка, адреса и други параметри) до кои може да се пристапи со идентификатор на низа. Визуелно, управувањето со конекцијата изгледа вака

ETL процес за добивање податоци од е-пошта во Apache Airflow

Сензор да чека податоци

Бидејќи веќе знаеме како да се поврзуваме и примаме податоци од пошта, сега можеме да напишеме сензор за да ги чекаме. Во мојот случај, не успеа веднаш да напишам оператор кој ќе ги обработува податоците, доколку ги има, бидејќи другите процеси функционираат врз основа на податоците добиени од поштата, вклучувајќи ги и оние што преземаат поврзани податоци од други извори (API, телефонија , веб метрика, итн.) итн.). Ќе ти дадам пример. Во CRM системот се појави нов корисник, а сè уште не знаеме за неговиот UUID. Потоа, кога се обидуваме да примаме податоци од SIP телефонијата, ќе добиваме повици поврзани со нејзиниот UUID, но нема да можеме да ги зачуваме и користиме правилно. Во такви работи, важно е да се има предвид зависноста на податоците, особено ако тие се од различни извори. Ова се, се разбира, недоволни мерки за зачувување на интегритетот на податоците, но во некои случаи тие се неопходни. Да, и безделничењето за окупирање ресурси е исто така ирационално.

Така, нашиот сензор ќе ги лансира следните темиња на графикот ако има свежи информации во поштата, а исто така ќе ги означи претходните информации како ирелевантни.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Добиваме и користиме податоци

За да примате и обработувате податоци, можете да напишете посебен оператор, можете да користите готови. Бидејќи досега логиката е тривијална - да се земат податоци од буквата, на пример, го предлагам стандардниот PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Патем, ако вашата корпоративна пошта е исто така на mail.ru, тогаш нема да можете да пребарувате писма по тема, испраќач итн. Уште во 2016 година ветија дека ќе го воведат, но очигледно се предомислија. Го решив овој проблем со создавање посебна папка за потребните букви и поставување филтер за потребните букви во веб-интерфејсот за пошта. Така, само потребните букви и услови за пребарување, во мојот случај, едноставно (НЕВИДЕНИ) влегуваат во оваа папка.

Сумирајќи, ја имаме следната низа: проверуваме дали има нови букви кои ги исполнуваат условите, дали ги има, потоа ја преземаме архивата користејќи ја врската од последната буква.
Под последните точки, испуштено е дека оваа архива ќе биде отпакувана, податоците од архивата ќе бидат исчистени и обработени и како резултат на тоа, целата работа ќе оди подалеку до цевководот на процесот ETL, но ова е веќе подалеку. опсегот на статијата. Ако се покажа интересно и корисно, тогаш со задоволство ќе продолжам да ги опишувам ETL решенијата и нивните делови за Apache Airflow.

Извор: www.habr.com

Додадете коментар