Apache Airflow-da e-poçtdan məlumat əldə etmək üçün ETL prosesi

Apache Airflow-da e-poçtdan məlumat əldə etmək üçün ETL prosesi

Texnologiya nə qədər inkişaf etməsindən asılı olmayaraq, bir sıra köhnəlmiş yanaşmalar həmişə inkişafın arxasında dayanır. Bu, hamar keçid, insan faktorları, texnoloji ehtiyaclar və ya başqa bir şeylə bağlı ola bilər. Məlumatların emalı sahəsində məlumat mənbələri bu hissədə ən çox açıqlananlardır. Bundan qurtulmağı nə qədər xəyal etsək də, indiyə qədər məlumatların bir hissəsi daha çox arxaik formatlardan bəhs etməyən ani messencerlərdə və e-poçtlarda göndərilir. Sizi e-poçtlardan necə məlumat ala biləcəyinizi göstərən Apache Airflow variantlarından birini sökməyə dəvət edirəm.

Prehistorya

Bir çox məlumat hələ də şəxsiyyətlərarası ünsiyyətdən şirkətlər arasında qarşılıqlı əlaqə standartlarına qədər e-poçt vasitəsilə ötürülür. Məlumat əldə etmək üçün interfeys yazmaq və ya bu məlumatları daha rahat mənbələrə daxil edəcək insanları ofisə yerləşdirmək mümkün olsa yaxşıdır, lakin çox vaxt bu, sadəcə olaraq mümkün olmaya bilər. Qarşılaşdığım xüsusi tapşırıq bədnam CRM sistemini məlumat anbarına, sonra isə OLAP sisteminə qoşmaq idi. Tarixən belə oldu ki, şirkətimiz üçün bu sistemin istifadəsi müəyyən bir iş sahəsində əlverişli idi. Buna görə də, hər kəs həqiqətən də bu üçüncü tərəf sisteminin məlumatları ilə işləyə bilmək istəyirdi. İlk növbədə, təbii ki, açıq API-dən məlumatların əldə edilməsi imkanları öyrənilib. Təəssüf ki, API bütün lazımi məlumatları əldə etməyi əhatə etmədi və sadə dillə desək, bir çox cəhətdən əyri idi və texniki dəstək daha əhatəli funksionallıq təmin etmək üçün yarı yolda istəmirdi və ya cavab verə bilmədi. Lakin bu sistem, arxivin boşaldılması üçün bir keçid şəklində poçtla çatışmayan məlumatları vaxtaşırı almaq imkanı verdi.

Qeyd etmək lazımdır ki, bu, biznesin e-poçt və ya ani messencerlərdən məlumat toplamaq istədiyi yeganə hal deyildi. Bununla belə, bu halda, məlumatların bir hissəsini təmin edən üçüncü şirkətə yalnız bu şəkildə təsir edə bilməzdik.

Apache hava axını

ETL proseslərini qurmaq üçün biz ən çox Apache Airflow istifadə edirik. Bu texnologiya ilə tanış olmayan bir oxucunun kontekstdə və ümumiyyətlə necə göründüyünü daha yaxşı başa düşməsi üçün bir neçə girişi təsvir edəcəyəm.

Apache Airflow, Python-da ETL (Extract-Transform-Loading) proseslərini qurmaq, icra etmək və izləmək üçün istifadə edilən pulsuz platformadır. Hava axınında əsas konsepsiya istiqamətlənmiş asiklik qrafikdir, burada qrafikin təpələri xüsusi proseslər, qrafikin kənarları isə idarəetmə və ya məlumat axınıdır. Proses sadəcə olaraq istənilən Python funksiyasını çağıra bilər və ya bir sinif kontekstində bir neçə funksiyanı ardıcıl çağırmaqdan daha mürəkkəb məntiqə malik ola bilər. Ən tez-tez görülən əməliyyatlar üçün artıq proses kimi istifadə edilə bilən bir çox hazır inkişaf var. Belə inkişaflara aşağıdakılar daxildir:

  • operatorlar - verilənlərin bir yerdən digər yerə, məsələn, verilənlər bazası cədvəlindən məlumat anbarına ötürülməsi üçün;
  • sensorlar - müəyyən hadisənin baş verməsini gözləmək və idarəetmə axınını qrafikin sonrakı təpələrinə yönəltmək üçün;
  • qarmaqlar - aşağı səviyyəli əməliyyatlar üçün, məsələn, verilənlər bazası cədvəlindən məlumat almaq üçün (ifadələrdə istifadə olunur);
  • və s.

Bu məqalədə Apache Airflow-u ətraflı təsvir etmək yersiz olardı. Qısa təqdimatlara baxmaq olar burada və ya burada.

Məlumat əldə etmək üçün qarmaq

Əvvəla, problemi həll etmək üçün biz edə biləcəyimiz bir çəngəl yazmalıyıq:

  • e-poçta qoşulun
  • düzgün məktubu tapın
  • məktubdan məlumat almaq.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Məntiq belədir: əlaqə saxlayırıq, sonuncu ən uyğun hərfi tapırıq, başqaları varsa, onlara məhəl qoymuruq. Bu funksiyadan istifadə olunur, çünki sonrakı hərflər əvvəlkilərin bütün məlumatlarını ehtiva edir. Əgər belə deyilsə, onda siz bütün hərflərin bir sırasını qaytara və ya birincisini, qalanını isə növbəti keçiddə emal edə bilərsiniz. Ümumiyyətlə, hər şey, həmişə olduğu kimi, vəzifədən asılıdır.

Çəngələ iki köməkçi funksiya əlavə edirik: faylı yükləmək və e-poçtdan bir keçiddən istifadə edərək faylı yükləmək üçün. Yeri gəlmişkən, onlar operatora köçürülə bilər, bu, bu funksionallıqdan istifadə tezliyindən asılıdır. Çəngələ başqa nə əlavə etmək, yenə də vəzifədən asılıdır: əgər fayllar məktubda dərhal qəbul edilərsə, məktuba əlavələri yükləyə bilərsiniz, əgər məlumat məktubda qəbul edilərsə, məktubu təhlil etməlisiniz, və s. Mənim vəziyyətimdə məktub arxivə bir keçidlə gəlir, onu müəyyən bir yerə qoymalı və sonrakı emal prosesinə başlamalıyam.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kod sadədir, ona görə də əlavə izahata ehtiyac yoxdur. Mən sizə sadəcə imap_conn_id sehrli xətti haqqında məlumat verəcəyəm. Apache Airflow sətir identifikatoru ilə əldə edilə bilən əlaqə parametrlərini (giriş, parol, ünvan və digər parametrlər) saxlayır. Vizual olaraq, əlaqə idarəetməsi belə görünür

Apache Airflow-da e-poçtdan məlumat əldə etmək üçün ETL prosesi

Məlumat gözləmək üçün sensor

Biz artıq poçta necə qoşulacağımızı və məlumat alacağımızı bildiyimiz üçün indi onları gözləmək üçün sensor yaza bilərik. Mənim vəziyyətimdə, əgər varsa, məlumatları emal edəcək operatoru dərhal yazmaq işləmədi, çünki digər proseslər poçtdan alınan məlumatlara, o cümlədən digər mənbələrdən (API, telefoniya) əlaqəli məlumatları götürənlər əsasında işləyir. , veb ölçüləri və s.) və s.). Mən sizə bir nümunə verim. CRM sistemində yeni istifadəçi peyda oldu və onun UUID-i haqqında hələ də məlumatımız yoxdur. Sonra, SIP telefoniyasından məlumat almağa çalışarkən, biz onun UUID ilə əlaqəli zəngləri qəbul edəcəyik, lakin biz onları saxlaya və düzgün istifadə edə bilməyəcəyik. Bu cür məsələlərdə, xüsusən də müxtəlif mənbələrdən olan məlumatların asılılığını nəzərə almaq vacibdir. Bunlar, əlbəttə ki, məlumatların bütövlüyünü qorumaq üçün qeyri-kafi tədbirlərdir, lakin bəzi hallarda zəruridir. Bəli və resursları zəbt etmək üçün boş dayanmaq da məntiqsizdir.

Beləliklə, poçtda təzə məlumat varsa, sensorumuz qrafikin sonrakı təpələrini işə salacaq, həmçinin əvvəlki məlumatları əhəmiyyətsiz kimi qeyd edəcəkdir.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Biz məlumatları qəbul edirik və istifadə edirik

Məlumatları qəbul etmək və emal etmək üçün ayrıca operator yaza bilərsiniz, hazır olanlardan istifadə edə bilərsiniz. Məntiq hələ də əhəmiyyətsiz olduğundan - məktubdan məlumat almaq üçün, məsələn, standart PythonOperator təklif edirəm.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Yeri gəlmişkən, əgər sizin korporativ poçtunuz da mail.ru-dadırsa, o zaman məktubları mövzu, göndərici və s. üzrə axtara bilməyəcəksiniz. Hələ 2016-cı ildə onlar bunu təqdim edəcəklərinə söz vermişdilər, lakin yəqin ki, fikirlərini dəyişiblər. Mən bu problemi lazımi hərflər üçün ayrıca qovluq yaradaraq və poçt veb interfeysində lazımi hərflər üçün filtr quraraq həll etdim. Beləliklə, yalnız axtarış üçün lazım olan hərflər və şərtlər, mənim vəziyyətimdə, sadəcə (GÖRÜNMƏYƏN) bu qovluğa daxil olur.

Xülasə edərək, aşağıdakı ardıcıllığa sahibik: şərtlərə cavab verən yeni məktubların olub olmadığını yoxlayırıq, əgər varsa, arxivi sonuncu məktubdakı keçiddən istifadə edərək yükləyirik.
Son nöqtələr altında, bu arxivin açılacağı, arxiv məlumatlarının təmizlənəcəyi və işlənəcəyi nəzərə alınmır və nəticədə hər şey ETL prosesinin boru xəttinə gedəcək, lakin bu, artıq kənardadır. məqalənin əhatə dairəsi. Maraqlı və faydalı olarsa, mən məmnuniyyətlə Apache Airflow üçün ETL həllərini və onların hissələrini təsvir etməyə davam edəcəyəm.

Mənbə: www.habr.com

Добавить комментарий