Proċess ETL biex tinkiseb data mill-email f'Apache Airflow

Proċess ETL biex tinkiseb data mill-email f'Apache Airflow

Ma jimpurtax kemm tiżviluppa t-teknoloġija, sensiela ta 'approċċi antikwati dejjem tmur wara l-iżvilupp. Dan jista 'jkun minħabba transizzjoni bla xkiel, fatturi umani, ħtiġijiet teknoloġiċi, jew xi ħaġa oħra. Fil-qasam tal-ipproċessar tad-dejta, is-sorsi tad-dejta huma l-aktar li jiżvelaw f'din il-parti. Ma jimpurtax kemm noħolmu li neħilsu minn dan, iżda s'issa parti mid-dejta tintbagħat f'messaġġi istantanji u emails, biex ma nsemmux formati aktar arkajċi. Nistednek biex tiżżarma waħda mill-għażliet għall-Apache Airflow, li turi kif tista 'tieħu data mill-emails.

preistorja

Ħafna dejta għadha tiġi trasferita permezz tal-posta elettronika, minn komunikazzjonijiet interpersonali għal standards ta’ interazzjoni bejn il-kumpaniji. Huwa tajjeb jekk ikun possibbli li tikteb interface biex tinkiseb data jew tpoġġi nies fl-uffiċċju li se jdaħħlu din l-informazzjoni f'sorsi aktar konvenjenti, iżda ħafna drabi dan jista 'sempliċement ma jkunx possibbli. Il-kompitu speċifiku li ffaċċjajt kien li jgħaqqad is-sistema CRM notorja mal-maħżen tad-dejta, u mbagħad mas-sistema OLAP. Storikament ġara li għall-kumpanija tagħna l-użu ta 'din is-sistema kien konvenjenti f'qasam partikolari tan-negozju. Għalhekk, kulħadd verament ried li jkun jista 'jopera b'dejta minn din is-sistema ta' parti terza wkoll. L-ewwelnett, ovvjament, ġiet studjata l-possibbiltà li tinkiseb data minn API miftuħa. Sfortunatament, l-API ma koprietx il-kisba tad-dejta kollha meħtieġa, u, f'termini sempliċi, kienet f'ħafna modi mgħawweġ, u l-appoġġ tekniku ma riedx jew ma setax jiltaqa 'nofs triq biex jipprovdi funzjonalità aktar komprensiva. Iżda din is-sistema pprovdiet l-opportunità li perjodikament tirċievi d-dejta nieqsa bil-posta fil-forma ta 'link għall-ħatt tal-arkivju.

Ta’ min jinnota li dan ma kienx l-uniku każ li fih in-negozju ried jiġbor data minn emails jew instant messengers. Madankollu, f'dan il-każ, ma nistgħux ninfluwenzaw kumpanija ta' parti terza li tipprovdi parti mid-dejta b'dan il-mod biss.

fluss ta 'arja apache

Biex nibnu proċessi ETL, ħafna drabi nużaw Apache Airflow. Sabiex qarrej li ma jkunx familjari ma 'din it-teknoloġija jifhem aħjar kif tidher fil-kuntest u b'mod ġenerali, se niddeskrivi ftit ta' introduzzjoni.

Apache Airflow hija pjattaforma b'xejn li tintuża biex tibni, tesegwixxi u timmonitorja proċessi ETL (Extract-Transform-Loading) f'Python. Il-kunċett ewlieni fl-Airflow huwa graff aċikliku dirett, fejn il-vertiċi tal-graff huma proċessi speċifiċi, u t-truf tal-graff huma l-fluss ta 'kontroll jew informazzjoni. Proċess jista 'sempliċement isejjaħ kwalunkwe funzjoni Python, jew jista' jkollu loġika aktar kumplessa milli jsejjaħ sekwenzjali diversi funzjonijiet fil-kuntest ta 'klassi. Għall-operazzjonijiet l-aktar frekwenti, diġà hemm ħafna żviluppi lesti li jistgħu jintużaw bħala proċessi. Żviluppi bħal dawn jinkludu:

  • operaturi - għat-trasferiment tad-dejta minn post għal ieħor, pereżempju, minn tabella tad-dejtabejż għal maħżen tad-dejta;
  • sensuri - biex tistenna l-okkorrenza ta 'ċertu avveniment u tidderieġi l-fluss tal-kontroll lejn il-vertiċi sussegwenti tal-graff;
  • ganċijiet - għal operazzjonijiet ta 'livell aktar baxx, pereżempju, biex tikseb dejta minn tabella ta' database (użata f'dikjarazzjonijiet);
  • eċċ

Ma jkunx xieraq li tiddeskrivi Apache Airflow fid-dettall f'dan l-artikolu. Introduzzjonijiet qosra jistgħu jitqiesu hawn jew hawn.

Hook biex tikseb id-data

L-ewwelnett, biex issolvi l-problema, għandna bżonn niktbu ganċ li bih nistgħu:

  • qabbad mal-email
  • issib l-ittra t-tajba
  • tirċievi data mill-ittra.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Il-loġika hija din: aħna nikkollegaw, insibu l-aħħar ittra l-aktar rilevanti, jekk hemm oħrajn, ninjorawhom. Din il-funzjoni tintuża, għaliex ittri ta 'wara fihom id-dejta kollha ta' dawk preċedenti. Jekk dan ma jkunx il-każ, allura tista 'tirritorna firxa ta' l-ittri kollha, jew tipproċessa l-ewwel waħda, u l-bqija fil-pass li jmiss. B'mod ġenerali, kollox, bħal dejjem, jiddependi fuq il-kompitu.

Aħna nżidu żewġ funzjonijiet awżiljarji mal-ganċ: biex iniżżlu fajl u biex iniżżlu fajl billi tuża link minn email. Mill-mod, jistgħu jiġu mċaqalqa lill-operatur, jiddependi fuq il-frekwenza tal-użu ta 'din il-funzjonalità. X'iktar għandek iżżid mal-ganċ, għal darb'oħra, jiddependi fuq il-kompitu: jekk il-fajls jiġu riċevuti immedjatament fl-ittra, allura tista 'tniżżel l-annessi mal-ittra, jekk id-data tasal fl-ittra, allura għandek bżonn teżamina l-ittra, eċċ. Fil-każ tiegħi, l-ittra tiġi b'rabta waħda għall-arkivju, li għandi bżonn inpoġġi f'ċertu post u nibda l-proċess ta 'proċessar ulterjuri.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Il-kodiċi huwa sempliċi, għalhekk bilkemm jeħtieġ aktar spjegazzjoni. Jien ngħidlek biss dwar il-linja magic imap_conn_id. Apache Airflow jaħżen il-parametri tal-konnessjoni (login, password, indirizz, u parametri oħra) li jistgħu jiġu aċċessati minn identifikatur ta 'string. Viżwalment, il-ġestjoni tal-konnessjoni tidher bħal din

Proċess ETL biex tinkiseb data mill-email f'Apache Airflow

Sensor biex jistenna d-data

Peress li diġà nafu kif nikkonnettjaw u nirċievu data mill-posta, issa nistgħu niktbu sensor biex nistennewhom. Fil-każ tiegħi, ma ħadimx li tikteb operatur minnufih li jipproċessa d-dejta, jekk ikun hemm, minħabba li proċessi oħra jaħdmu bbażati fuq id-dejta riċevuta mill-posta, inklużi dawk li jieħdu dejta relatata minn sorsi oħra (API, telefonija , metriċi tal-web, eċċ.). eċċ.). Nagħtik eżempju. Utent ġdid deher fis-sistema CRM, u għadna ma nafux dwar l-UUID tiegħu. Imbagħad, meta nippruvaw nirċievu data mit-telefonija SIP, aħna nirċievu sejħiet marbuta mal-UUID tagħha, iżda mhux se nkunu nistgħu nissejvjawhom u nużawhom b'mod korrett. Fi kwistjonijiet bħal dawn, huwa importanti li wieħed iżomm f'moħħu d-dipendenza tad-dejta, speċjalment jekk tkun minn sorsi differenti. Dawn huma, ovvjament, miżuri insuffiċjenti biex tiġi ppreservata l-integrità tad-dejta, iżda f'xi każijiet huma meħtieġa. Iva, u idling biex jokkupa r-riżorsi huwa wkoll irrazzjonali.

Għalhekk, is-senser tagħna se jniedi vertiċi sussegwenti tal-graff jekk ikun hemm informazzjoni ġdida fil-posta, u jimmarka wkoll l-informazzjoni preċedenti bħala irrilevanti.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Nirċievu u nużaw data

Biex tirċievi u tipproċessa d-dejta, tista 'tikteb operatur separat, tista' tuża dawk lesti. Peress li l-loġika għadha trivjali - biex tieħu data mill-ittra, pereżempju, nissuġġerixxi l-istandard PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Mill-mod, jekk il-posta korporattiva tiegħek tinsab ukoll fuq mail.ru, allura ma tkunx tista 'tfittex ittri skont is-suġġett, il-mittent, eċċ. Lura fl-2016, wegħdu li se jintroduċuha, iżda jidher li bidlu fehmhom. I solvut din il-problema billi ħoloq folder separat għall-ittri meħtieġa u waqqaf filtru għall-ittri meħtieġa fl-interface web tal-posta. Għalhekk, l-ittri u l-kundizzjonijiet meħtieġa biss għat-tfittxija, fil-każ tiegħi, sempliċement (UNSEEN) jidħlu f'dan il-folder.

Fil-qosor, għandna s-sekwenza li ġejja: niċċekkjaw jekk hemmx ittri ġodda li jissodisfaw il-kundizzjonijiet, jekk hemmx, imbagħad iniżżlu l-arkivju billi tuża l-link mill-aħħar ittra.
Taħt l-aħħar tikek, jitħalla barra li dan l-arkivju se jiġi żppakkjat, id-dejta mill-arkivju se tiġi kklerjata u pproċessata, u bħala riżultat, il-ħaġa sħiħa se tmur lil hinn mill-pipeline tal-proċess ETL, iżda dan huwa diġà lil hinn l-ambitu tal-artikolu. Jekk irriżulta interessanti u utli, allura bi pjaċir inkompli niddeskrivi s-soluzzjonijiet ETL u l-partijiet tagħhom għal Apache Airflow.

Sors: www.habr.com

Żid kumment