ETL-procezo por ricevi datumojn de retpoŝto en Apache Airflow

ETL-procezo por ricevi datumojn de retpoŝto en Apache Airflow

Kiom ajn teknologio evoluas, vico da malmodernaj aliroj ĉiam iras malantaŭ evoluo. Ĉi tio povas esti pro glata transiro, homaj faktoroj, teknologiaj bezonoj aŭ io alia. En la kampo de datumtraktado, datumfontoj estas la plej malkaŝaj en ĉi tiu parto. Kiom ajn ni revas forigi ĉi tion, sed ĝis nun parto de la datumoj estas senditaj en tujmesaĝiloj kaj retpoŝtoj, sen mencii pli arkaikaj formatoj. Mi invitas vin malmunti unu el la opcioj por Apache Airflow, ilustrante kiel vi povas preni datumojn de retpoŝtoj.

antaŭhistorio

Multaj datumoj ankoraŭ estas transdonitaj per retpoŝto, de interhomaj komunikadoj ĝis normoj de interagado inter kompanioj. Estas bone, se eblas verki interfacon por akiri datumojn aŭ meti homojn en la oficejon, kiuj enmetos ĉi tiujn informojn en pli oportunajn fontojn, sed ofte tio simple ne eblas. La specifa tasko, kiun mi alfrontis, estis konekti la konatan CRM-sistemon al la datumstokejo, kaj poste al la OLAP-sistemo. Okazis historie, ke por nia kompanio la uzo de ĉi tiu sistemo estis oportuna en aparta areo de komerco. Tial ĉiuj vere volis povi funkcii kun datumoj ankaŭ de ĉi tiu triaparta sistemo. Unue, kompreneble, oni studis la eblecon akiri datumojn de malfermita API. Bedaŭrinde, la API ne kovris ricevi ĉiujn necesajn datumojn, kaj, en simplaj terminoj, ĝi estis multmaniere malrekta, kaj teknika subteno ne volis aŭ ne povis renkonti duonvoje provizi pli ampleksan funkciecon. Sed ĉi tiu sistemo donis la ŝancon periode ricevi la mankantajn datumojn per poŝto en formo de ligilo por malŝarĝo de la arkivo.

Oni devas rimarki, ke ĉi tio ne estis la sola kazo, en kiu la komerco volis kolekti datumojn de retpoŝtoj aŭ tujmesaĝiloj. Tamen, en ĉi tiu kazo, ni ne povus influi trian kompanion, kiu provizas parton de la datumoj nur tiamaniere.

apaĉa aerfluo

Por konstrui ETL-procezojn, ni plej ofte uzas Apache Airflow. Por ke leganto, kiu ne konas ĉi tiun teknologion, pli bone komprenu kiel ĝi aspektas en la kunteksto kaj ĝenerale, mi priskribos kelkajn enkondukajn.

Apache Airflow estas senpaga platformo, kiu estas uzata por konstrui, ekzekuti kaj monitori ETL (Extract-Transform-Loading) procezojn en Python. La ĉefkoncepto en Airflow estas direktita acikla grafeo, kie la verticoj de la grafeo estas specifaj procezoj, kaj la randoj de la grafeo estas la fluo de kontrolo aŭ informoj. Procezo povas simple voki ajnan Python-funkcion, aŭ ĝi povas havi pli kompleksan logikon de sinsekve vokado de pluraj funkcioj en la kunteksto de klaso. Por la plej oftaj operacioj, jam ekzistas multaj pretaj evoluoj, kiuj povas esti uzataj kiel procezoj. Tiaj evoluoj inkluzivas:

  • operatoroj - por translokigi datumojn de unu loko al alia, ekzemple de datumbaza tabelo al datuma stokejo;
  • sensiloj - por atendi la okazon de certa evento kaj direkti la fluon de kontrolo al la postaj verticoj de la grafeo;
  • hokoj - por malsupernivelaj operacioj, ekzemple, por ricevi datumojn de datumbaza tabelo (uzata en deklaroj);
  • kaj tiel plu.

Estus malkonvene priskribi Apache Airflow detale en ĉi tiu artikolo. Mallongaj enkondukoj estas videblaj tietie.

Hoko por akiri datumojn

Antaŭ ĉio, por solvi la problemon, ni devas skribi hokon per kiu ni povus:

  • konekti al retpoŝto
  • trovi la ĝustan literon
  • ricevi datumojn de la letero.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

La logiko estas jena: ni konektas, trovas la lastan plej gravan literon, se estas aliaj, ni ignoras ilin. Ĉi tiu funkcio estas uzata, ĉar postaj literoj enhavas ĉiujn datumojn de pli fruaj. Se ĉi tio ne estas la kazo, tiam vi povas redoni tabelon de ĉiuj literoj, aŭ prilabori la unuan, kaj la ceterajn en la sekva paŝo. Ĝenerale, ĉio, kiel ĉiam, dependas de la tasko.

Ni aldonas du helpajn funkciojn al la hoko: por elŝuti dosieron kaj por elŝuti dosieron per ligilo de retpoŝto. Cetere, ili povas esti movitaj al la funkciigisto, ĝi dependas de la ofteco de uzado de ĉi tiu funkcio. Kion alian aldoni al la hoko, denove, dependas de la tasko: se dosieroj estas ricevitaj tuj en la letero, tiam vi povas elŝuti aldonaĵojn al la letero, se la datumoj estas ricevitaj en la letero, tiam vi devas analizi la leteron, ktp. En mia kazo, la letero venas kun unu ligilo al la arkivo, kiun mi bezonas meti en certan lokon kaj komenci la pluan prilaboradon.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

La kodo estas simpla, do ĝi apenaŭ bezonas plian klarigon. Mi nur rakontos al vi pri la magia linio imap_conn_id. Apache Airflow stokas konektajn parametrojn (ensaluton, pasvorton, adreson kaj aliajn parametrojn) alireblajn per ĉenidentigilo. Vide, konektadministrado aspektas tiel

ETL-procezo por ricevi datumojn de retpoŝto en Apache Airflow

Sensilo por atendi datumojn

Ĉar ni jam scias kiel konekti kaj ricevi datumojn de poŝto, ni nun povas skribi sensilon por atendi ilin. En mia kazo, ne funkciis tuj skribi operatoron, kiu prilaboros la datumojn, se ekzistas, ĉar aliaj procezoj funkcias surbaze de la datumoj ricevitaj de la poŝto, inkluzive de tiuj, kiuj prenas rilatajn datumojn de aliaj fontoj (API, telefonio). , retaj metrikoj, ktp.). ktp.). Mi donos al vi ekzemplon. Nova uzanto aperis en la CRM-sistemo, kaj ni ankoraŭ ne scias pri lia UUID. Tiam, kiam ni provos ricevi datumojn de SIP-telefonio, ni ricevos vokojn ligitajn al ĝia UUID, sed ni ne povos konservi kaj uzi ilin ĝuste. En tiaj aferoj, estas grave memori la dependecon de la datumoj, precipe se ili estas de malsamaj fontoj. Ĉi tiuj estas, kompreneble, nesufiĉaj mezuroj por konservi datuman integrecon, sed en iuj kazoj ili estas necesaj. Jes, kaj senlabori por okupi rimedojn ankaŭ estas neracia.

Tiel, nia sensilo lanĉos postajn verticojn de la grafeo se estas freŝaj informoj en la poŝto, kaj ankaŭ markos la antaŭajn informojn kiel senrilatajn.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Ni ricevas kaj uzas datumojn

Por ricevi kaj prilabori datumojn, vi povas skribi apartan funkciigiston, vi povas uzi pretajn. Ĉar ĝis nun la logiko estas bagatela - por preni datumojn de la letero, ekzemple, mi proponas la norman PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Cetere, se via kompania poŝto ankaŭ estas ĉe mail.ru, tiam vi ne povos serĉi leterojn laŭ temo, sendinto ktp. Reen en 2016, ili promesis enkonduki ĝin, sed ŝajne ŝanĝis opinion. Mi solvis ĉi tiun problemon kreante apartan dosierujon por la necesaj literoj kaj starigante filtrilon por la necesaj literoj en la retinterfaco de poŝto. Tiel, nur la necesaj literoj kaj kondiĉoj por la serĉo, en mia kazo, simple (NEVIDE) eniras ĉi tiun dosierujon.

Resumante, ni havas la jenan sinsekvon: ni kontrolas ĉu estas novaj literoj, kiuj plenumas la kondiĉojn, se ekzistas, tiam ni elŝutas la arkivon per la ligilo de la lasta litero.
Sub la lastaj punktoj, estas preterlasita, ke ĉi tiu arkivo estos malpakita, la datumoj de la arkivo estos purigitaj kaj prilaboritaj, kaj kiel rezulto, la tuta afero iros plu al la dukto de la ETL-procezo, sed ĉi tio jam estas pretere. la amplekso de la artikolo. Se ĝi rezultis interesa kaj utila, tiam mi volonte daŭre priskribos ETL-solvojn kaj iliajn partojn por Apache Airflow.

fonto: www.habr.com

Aldoni komenton