Раванди ETL барои гирифтани маълумот аз почтаи электронӣ дар Apache Airflow

Раванди ETL барои гирифтани маълумот аз почтаи электронӣ дар Apache Airflow

Новобаста аз он ки технология чӣ қадар рушд мекунад, як қатор равишҳои кӯҳна ҳамеша дар паси рушд қарор мегиранд. Ин метавонад бо гузариши ҳамвор, омилҳои инсонӣ, ниёзҳои технологӣ ё чизи дигар бошад. Дар соҳаи коркарди додаҳо, манбаъҳои маълумот дар ин бахш бештар ошкор карда мешаванд. Новобаста аз он ки мо чӣ қадар орзу дорем, ки аз ин халос шавем, аммо то ҳол як қисми додаҳо дар паёмнависии фаврӣ ва почтаи электронӣ фиристода мешаванд, на аз форматҳои архаикӣ. Ман шуморо даъват мекунам, ки яке аз имконоти Apache Airflow -ро ҷудо кунед, ки чӣ гуна шумо метавонед маълумотро аз почтаи электронӣ гиред.

prehistory

Бисёр маълумот ҳоло ҳам тавассути почтаи электронӣ, аз муоширати байнишахсӣ то стандартҳои ҳамкории байни ширкатҳо интиқол дода мешавад. Хуб аст, ки агар имкони навиштани интерфейс барои ба даст овардани маълумот ё ҷойгир кардани одамоне дар офис вуҷуд дошта бошад, ки ин маълумотро ба манбаъҳои қулай ворид мекунанд, аммо аксар вақт ин имконнопазир аст. Вазифаи мушаххасе, ки ман дучор шудам, пайваст кардани системаи маъруфи CRM ба анбори додаҳо ва сипас ба системаи OLAP буд. Чунин рӯй дод, ки таърихан барои ширкати мо истифодаи ин система дар як соҳаи муайяни тиҷорат мувофиқ буд. Аз ин рӯ, ҳама воқеан мехостанд бо маълумот аз ин системаи тарафи сеюм низ кор кунанд. Пеш аз ҳама, албатта, имконияти гирифтани маълумот аз API кушода омӯхта шуд. Мутаассифона, API гирифтани ҳама маълумоти заруриро фаро нагирифт ва ба ибораи содда, он аз бисёр ҷиҳат каҷ буд ва дастгирии техникӣ намехост ё натавонист дар нисфи роҳ таъмин шавад, то функсияҳои ҳамаҷонибатарро таъмин кунад. Аммо ин система имкон дод, ки маълумоти норасидаро тавассути почта дар шакли истинод барои боркунии бойгонӣ мунтазам қабул кунад.

Бояд қайд кард, ки ин ягона ҳолат нест, ки дар он тиҷорат мехост маълумот аз почтаи электронӣ ё паёмнависии фаврӣ ҷамъоварӣ кунад. Аммо, дар ин ҳолат, мо натавонистем ба як ширкати сеюм, ки як қисми маълумотро танҳо бо ин роҳ таъмин мекунад, таъсир расонем.

Ҷараёни ҳавоии Apache

Барои сохтани равандҳои ETL, мо аксар вақт Apache Airflow-ро истифода мебарем. Барои он ки хонандае, ки бо ин технология ошно нест, беҳтар дарк кунад, ки он дар контекст ва дар маҷмӯъ чӣ гуна ба назар мерасад, ман як ҷуфти муқаддимаро тавсиф мекунам.

Apache Airflow як платформаи ройгонест, ки барои сохтан, иҷро кардан ва мониторинги равандҳои ETL (Extract-Transform-Loading) дар Python истифода мешавад. Мафҳуми асосӣ дар Airflow графи мутаҳаррикшудаи асикликӣ мебошад, ки дар он қуллаҳои график равандҳои мушаххас мебошанд ва кунҷҳои график ҷараёни назорат ё иттилоот мебошанд. Раванд метавонад ба ҳама гуна функсияи Python занг занад, ё он метавонад мантиқи мураккабтар аз даъвати пайдарпайи чанд функсия дар контексти синф дошта бошад. Барои амалиётҳои маъмултарин, аллакай бисёр таҳияҳои тайёр мавҷуданд, ки онҳоро ҳамчун раванд истифода бурдан мумкин аст. Чунин пешрафтҳо дар бар мегиранд:

  • операторҳо - барои интиқоли маълумот аз як ҷо ба ҷои дигар, масалан, аз ҷадвали пойгоҳи додаҳо ба анбори додаҳо;
  • датчикхо — барои мунтазири руй додани ходисаи муайян ва равон кардани чараёни идоракунй ба куллахои минбаъдаи график;
  • қалмоқҳо - барои амалиётҳои сатҳи поёнӣ, масалан, барои гирифтани маълумот аз ҷадвали пойгоҳи додаҳо (дар изҳорот истифода мешаванд);
  • ва ғайра.

Дар ин мақола ба таври муфассал тавсиф кардани Apache Airflow номувофиқ мебуд. Пешниҳоди мухтасарро дидан мумкин аст дар ин ҷо ё дар ин ҷо.

Хок барои гирифтани маълумот

Пеш аз ҳама, барои ҳалли мушкилот, мо бояд қалмоқеро нависем, ки бо он мо метавонем:

  • ба почтаи электронӣ пайваст шавед
  • ҳарфи дурустро пайдо кунед
  • аз мактуб маълумот гиранд.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Мантиқ ин аст: мо пайваст мешавем, охирин ҳарфи мувофиқро пайдо мекунем, агар дигарон бошанд, мо онҳоро сарфи назар мекунем. Ин функсия истифода мешавад, зеро ҳарфҳои баъдӣ тамоми маълумотҳои қаблиро дар бар мегиранд. Агар ин тавр набошад, шумо метавонед массиви ҳамаи ҳарфҳоро баргардонед, ё яки аввал ва боқимондаро дар гузаргоҳи навбатӣ коркард кунед. Умуман, ҳама чиз, чун ҳамеша, ба вазифа вобаста аст.

Мо ба қалмоқ ду функсияи ёрирасон илова мекунем: барои зеркашии файл ва зеркашии файл бо истифода аз истинод аз почтаи электронӣ. Дар омади гап, онҳо метавонанд ба оператор интиқол дода шаванд, он аз басомади истифодаи ин функсия вобаста аст. Боз чӣ илова кардан ба қалмоқ, боз аз вазифа вобаста аст: агар файлҳо фавран дар нома гирифта шаванд, шумо метавонед замимаҳоро ба мактуб зеркашӣ кунед, агар маълумот дар мактуб гирифта шуда бошад, пас шумо бояд номаро таҳлил кунед, ва гайра. Дар ҳолати ман, мактуб бо як пайванд ба бойгонӣ меояд, ки ман бояд онро дар ҷои муайян гузошта, раванди коркарди минбаъдаро оғоз кунам.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Рамз оддӣ аст, бинобар ин ба шарҳи иловагӣ ниёз надорад. Ман танҳо ба шумо дар бораи хати ҷодугарӣ imap_conn_id мегӯям. Apache Airflow параметрҳои пайвастшавиро (логин, парол, суроға ва дигар параметрҳо), ки тавассути идентификатори сатр дастрас кардан мумкин аст, нигоҳ медорад. Ба таври визуалӣ, идоракунии пайвастшавӣ чунин менамояд

Раванди ETL барои гирифтани маълумот аз почтаи электронӣ дар Apache Airflow

Сенсор барои интизории маълумот

Азбаски мо аллакай медонем, ки чӣ гуна пайваст шудан ва гирифтани маълумотро аз почта, мо ҳоло метавонем сенсор нависем, то онҳоро интизор шавем. Дар ҳолати ман, дарҳол навиштани операторе кор накард, ки маълумотро коркард мекунад, агар мавҷуд бошад, зеро равандҳои дигар дар асоси маълумоти аз почта гирифташуда кор мекунанд, аз ҷумла онҳое, ки аз дигар манбаъҳо маълумот мегиранд (API, телефония). , метрикаи веб ва ғайра). ва ғайра). Ман ба шумо мисол меорам. Дар системаи CRM корбари нав пайдо шуд ва мо то ҳол дар бораи UUID-и ӯ намедонем. Сипас, ҳангоми кӯшиши гирифтани маълумот аз телефонияи SIP, мо зангҳои ба UUID алоқамандро қабул мекунем, аммо мо онҳоро дуруст нигоҳ дошта наметавонем ва онҳоро дуруст истифода барем. Дар чунин масъалаҳо вобастагии маълумотро дар назар доштан муҳим аст, хусусан агар онҳо аз сарчашмаҳои гуногун бошанд. Инҳо, албатта, барои нигоҳ доштани тамомияти маълумот чораҳои нокифоя мебошанд, аммо дар баъзе ҳолатҳо онҳо заруранд. Бале, бекор истодан барои ишғоли захираҳо низ беақл аст.

Ҳамин тариқ, сенсори мо қуллаҳои минбаъдаи графикро оғоз мекунад, агар дар почта маълумоти тоза мавҷуд бошад ва инчунин маълумоти қаблиро ҳамчун номатлуб қайд мекунад.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Мо маълумот мегирем ва истифода мебарем

Барои қабул ва коркарди маълумот шумо метавонед як оператори алоҳида нависед, шумо метавонед операторҳои тайёрро истифода баред. Азбаски мантиқ ҳанӯз ночиз аст - барои гирифтани маълумот аз ҳарф, масалан, ман пешниҳод мекунам, ки стандарти PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Дар омади гап, агар почтаи корпоративии шумо низ дар mail.ru бошад, пас шумо наметавонед мактубҳоро аз рӯи мавзӯъ, ирсолкунанда ва ғайра ҷустуҷӯ кунед. Ҳанӯз соли 2016 онҳо ваъда дода буданд, ки онро ҷорӣ мекунанд, аммо зоҳиран ақидаашонро дигар кардаанд. Ман ин мушкилотро тавассути сохтани папкаи алоҳида барои ҳарфҳои зарурӣ ва насб кардани филтр барои ҳарфҳои зарурӣ дар веб-интерфейси почта ҳал кардам. Ҳамин тариқ, танҳо ҳарфҳо ва шартҳои зарурӣ барои ҷустуҷӯ, дар ҳолати ман, ба ин ҷузвдон танҳо (НАЙЁН) ворид мешаванд.

Хулоса, мо пайдарпайии зеринро дорем: мо тафтиш мекунем, ки оё ҳарфҳои наве, ки ба шартҳо мувофиқанд, мавҷуд бошанд, пас архивро бо истиноди ҳарфи охирин зеркашӣ мекунем.
Дар зери нуқтаҳои охирин, ин бойгонӣ кушода мешавад, маълумот аз бойгонӣ тоза ва коркард карда мешавад ва дар натиҷа, ҳама чиз ба лӯлаи раванди ETL меравад, аммо ин аллакай фаротар аст. доираи мақола. Агар он ҷолиб ва муфид бошад, ман бо хушнудӣ тавсифи ҳалли ETL ва қисмҳои онҳоро барои Apache Airflow идома медиҳам.

Манбаъ: will.com

Илова Эзоҳ