ETL process datu iegūšanai no e-pasta pakalpojumā Apache Airflow

ETL process datu iegūšanai no e-pasta pakalpojumā Apache Airflow

Neatkarīgi no tā, cik daudz tehnoloģijas attīstās, attīstībai vienmēr seko virkne novecojušu pieeju. Tas var būt saistīts ar vienmērīgu pāreju, cilvēka faktoriem, tehnoloģiskām vajadzībām vai kaut ko citu. Datu apstrādes jomā atklājošākie šajā daļā ir datu avoti. Neatkarīgi no tā, kā mēs sapņojam no tā atbrīvoties, pagaidām daži dati tiek nosūtīti tūlītējos ziņojumos un e-pastos, nemaz nerunājot par arhaiskākiem formātiem. Es aicinu jūs apskatīt vienu no Apache Airflow opcijām, kas ilustrē, kā varat vākt datus no e-pastiem.

Aizvēsture

Daudz datu joprojām tiek pārsūtīts pa e-pastu, sākot no starppersonu komunikācijas līdz uzņēmumu mijiedarbības standartiem. Ir labi, ja ir iespējams uzrakstīt interfeisu datu iegūšanai vai ievietot birojā cilvēkus, kuri ievadīs šo informāciju ērtākos avotos, taču bieži vien tas var vienkārši nebūt iespējams. Īpašais uzdevums, ar kuru saskāros, bija bēdīgi slavenās CRM sistēmas savienošana ar datu noliktavu un pēc tam ar OLAP sistēmu. Vēsturiski ir sanācis, ka mūsu uzņēmumam šīs sistēmas izmantošana bija ērta noteiktā uzņēmējdarbības jomā. Tāpēc visi ļoti vēlējās, lai varētu darboties arī ar datiem no šīs trešās puses sistēmas. Vispirms, protams, tika pētīta iespēja iegūt datus no atvērtas API. Diemžēl API neaptvēra visu nepieciešamo datu iegūšanu, un, vienkāršāk sakot, tā daudzējādā ziņā bija greiza, un tehniskais atbalsts nevēlējās vai nevarēja sasniegt pusceļā, lai nodrošinātu visaptverošāku funkcionalitāti. Bet šī sistēma nodrošināja iespēju periodiski saņemt trūkstošos datus pa pastu saites veidā arhīva izkraušanai.

Jāpiebilst, ka šis nebija vienīgais gadījums, kad kāds bizness vēlējās ievākt datus no e-pastiem vai tūlītējiem kurjeriem. Taču šajā gadījumā mēs nevarējām ietekmēt trešās puses uzņēmumu, kas sniedz daļu datu tikai šādā veidā.

Apache gaisa plūsma

Lai izveidotu ETL procesus, mēs visbiežāk izmantojam Apache Airflow. Lai lasītājs, kuram šī tehnoloģija ir sveša, labāk saprastu, kā tā izskatās kontekstā un kopumā, aprakstīšu pāris ievadvārdus.

Apache Airflow ir bezmaksas platforma, kas tiek izmantota, lai izveidotu, palaistu un pārraudzītu ETL (Extract-Transform-Loading) procesus Python. Gaisa plūsmas galvenais jēdziens ir virzīts aciklisks grafs, kur grafa virsotnes ir konkrēti procesi, bet grafa malas ir vadības vai informācijas plūsma. Process var vienkārši izsaukt jebkuru Python funkciju, vai arī tam var būt sarežģītāka loģika, kas secīgi izsauc vairākas funkcijas klases kontekstā. Visbiežāk sastopamajām operācijām jau ir daudz gatavu izstrādņu, ko var izmantot kā procesus. Šādi notikumi ietver:

  • operatori - datu pārvietošanai no vienas vietas uz citu, piemēram, no datu bāzes tabulas uz datu noliktavu;
  • sensori - gaidīt noteikta notikuma iestāšanos un vadīt vadības plūsmu uz nākamajām grafa virsotnēm;
  • āķi - zemāka līmeņa operācijām, piemēram, datu iegūšanai no datu bāzes tabulas (izmanto paziņojumos);
  • uc

Būtu nepiemēroti šajā rakstā sīki aprakstīt Apache Airflow. Var apskatīt īsus ievadus šeit vai šeit.

Āķis datu iegūšanai

Pirmkārt, lai atrisinātu problēmu, mums ir jāuzraksta āķis, ar kuru mēs varētu:

  • izveidot savienojumu ar e-pastu;
  • atrast pareizo burtu
  • saņemt datus no vēstules.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Loģika ir šāda: mēs savienojam, atrodam pēdējo atbilstošāko burtu, ja ir citi, mēs tos ignorējam. Šī funkcija tiek izmantota, jo vēlākie burti satur visus iepriekšējos datus. Ja tas tā nav, varat atgriezt visu burtu masīvu vai apstrādāt pirmo, bet pārējos nākamajā piegājienā. Kopumā viss, kā vienmēr, ir atkarīgs no uzdevuma.

Mēs pievienojam āķim divas papildu funkcijas: faila lejupielādei un faila lejupielādei, izmantojot saiti no e-pasta. Starp citu, tos var pārvietot uz operatoru, tas ir atkarīgs no šīs funkcionalitātes izmantošanas biežuma. Ko vēl pievienot āķim, atkal ir atkarīgs no uzdevuma: ja faili tiek saņemti uzreiz vēstulē, tad varat lejupielādēt vēstules pielikumus, ja dati ir saņemti vēstulē, tad jums ir nepieciešams parsēt vēstuli, utt. Manā gadījumā vēstulei nāk viena saite uz arhīvu, kuru vajag ievietot noteiktā vietā un uzsākt tālāko apstrādes procesu.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kods ir vienkāršs, tāpēc tam gandrīz nav nepieciešams papildu skaidrojums. Es tikai pastāstīšu par maģisko līniju imap_conn_id. Apache Airflow saglabā savienojuma parametrus (pieteikšanās vārdu, paroli, adresi un citus parametrus), kuriem var piekļūt, izmantojot virknes identifikatoru. Vizuāli savienojuma pārvaldība izskatās šādi

ETL process datu iegūšanai no e-pasta pakalpojumā Apache Airflow

Sensors, lai gaidītu datus

Tā kā mēs jau zinām, kā izveidot savienojumu un saņemt datus no pasta, tagad varam uzrakstīt sensoru, lai to gaidītu. Manā gadījumā neizdevās nekavējoties uzrakstīt operatoru, kurš apstrādās datus, ja tādi ir, jo citi procesi darbojas, pamatojoties uz datiem, kas saņemti no pasta, ieskaitot tos, kas ņem saistītos datus no citiem avotiem (API, telefonija, tīmekļa metrika, utt.) utt.). Es jums došu piemēru. CRM sistēmā ir parādījies jauns lietotājs, un mēs vēl nezinām par viņa UUID. Tad, mēģinot saņemt datus no SIP telefonijas, mēs saņemsim zvanus, kas saistīti ar tās UUID, taču nevarēsim tos pareizi saglabāt un izmantot. Šādos jautājumos ir svarīgi paturēt prātā datu atkarību, īpaši, ja tie ir no dažādiem avotiem. Tie, protams, ir nepietiekami pasākumi datu integritātes saglabāšanai, taču dažos gadījumos tie ir nepieciešami. Un resursu aizņemšanās dīkstāvē arī ir neracionāla.

Tādējādi mūsu sensors palaidīs nākamās grafika virsotnes, ja pastā būs jauna informācija, kā arī atzīmēs iepriekšējo informāciju kā neatbilstošu.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Mēs saņemam un izmantojam datus

Datu saņemšanai un apstrādei var rakstīt atsevišķu operatoru, var izmantot jau gatavus. Tā kā līdz šim loģika ir triviāla - ņemt datus, piemēram, no burta, iesaku standarta PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Starp citu, ja jūsu uzņēmuma pasts atrodas arī vietnē mail.ru, tad jūs nevarēsit meklēt vēstules pēc tēmas, sūtītāja utt. Viņi solīja to ieviest jau 2016. gadā, taču acīmredzot mainīja savas domas. Šo problēmu atrisināju, izveidojot atsevišķu mapi nepieciešamajiem burtiem un pasta tīmekļa saskarnē uzstādot filtru nepieciešamajiem burtiem. Tādējādi šajā mapē nokļūst tikai meklēšanai nepieciešamie burti un nosacījumi, manā gadījumā vienkārši (NEREDZĒTI).

Rezumējot, mums ir šāda secība: mēs pārbaudām, vai ir jauni burti, kas atbilst nosacījumiem; ja ir, tad mēs lejupielādējam arhīvu, izmantojot saiti no pēdējā burta.
Zem pēdējiem punktiem ir izlaists, ka šis arhīvs tiks izpakots, dati no arhīva tiks notīrīti un apstrādāti, kā rezultātā visa lieta nonāks tālāk ETL procesa konveijerā, bet tas jau ir ārpus raksta apjoms. Ja izrādījās interesanti un noderīgi, tad labprāt turpināšu aprakstīt ETL risinājumus un to daļas priekš Apache Airflow.

Avots: www.habr.com

Pievieno komentāru