Pwosesis ETL pou jwenn done ki soti nan imel nan Apache Airflow

Pwosesis ETL pou jwenn done ki soti nan imel nan Apache Airflow

Kèlkeswa kantite teknoloji devlope, yon seri apwòch demode toujou dèyè devlopman. Sa a ka akòz yon tranzisyon lis, faktè imen, bezwen teknolojik, oswa yon lòt bagay. Nan jaden an nan pwosesis done, sous done yo se pi revele nan pati sa a. Kèlkeswa kantite lajan nou rèv debarase m de sa a, men jiskaprezan yon pati nan done yo voye nan mesaje enstantane ak imèl, nou pa mansyone fòma plis akayik. Mwen envite ou demonte youn nan opsyon yo pou Apache Airflow, ilistre ki jan ou ka pran done nan imèl.

pre-istwa

Yon anpil nan done yo toujou transfere atravè imel, soti nan kominikasyon entèpèsonèl nan estanda nan entèraksyon ant konpayi yo. Li bon si li posib pou ekri yon koòdone pou jwenn done oswa mete moun nan biwo a ki pral antre enfòmasyon sa a nan sous ki pi pratik, men souvan sa a ka tou senpleman pa posib. Travay espesifik ke mwen te fè fas a te konekte sistèm nan CRM notwa nan depo done a, ak Lè sa a, nan sistèm nan OLAP. Li te rive istorikman ke pou konpayi nou an itilize nan sistèm sa a te pratik nan yon zòn patikilye nan biznis. Se poutèt sa, tout moun reyèlman te vle pou kapab opere ak done ki soti nan sistèm twazyèm pati sa a tou. Premye a tout, nan kou, yo te etidye posibilite pou jwenn done ki sòti nan yon API louvri. Malerezman, API a pa t ' kouvri jwenn tout done ki nesesè yo, epi, an tèm senp, li te nan plizyè fason kwochi, ak sipò teknik pa t 'vle oswa pa t 'kapab rankontre mwatye nan bay plis fonctionnalités konplè. Men, sistèm sa a te bay opòtinite pou detanzantan resevwa done ki manke yo pa lapòs nan fòm lan nan yon lyen pou dechaje achiv la.

Li ta dwe remake ke sa a pa te ka a sèlman nan ki biznis la te vle kolekte done ki soti nan imèl oswa mesaje enstantane. Sepandan, nan ka sa a, nou pa t 'kapab enfliyanse yon konpayi twazyèm-pati ki bay yon pati nan done yo sèlman nan fason sa a.

Apache airflow

Pou konstwi pwosesis ETL, nou pi souvan itilize Apache Airflow. Nan lòd pou yon lektè ki pa abitye ak teknoloji sa a pi byen konprann ki jan li sanble nan kontèks la ak an jeneral, mwen pral dekri yon koup nan entwodiksyon.

Apache Airflow se yon platfòm gratis ki itilize pou konstwi, egzekite ak kontwole pwosesis ETL (Extract-Transform-Loading) nan Python. Konsèp prensipal la nan Airflow se yon graf asilik dirije, kote somè graf la se pwosesis espesifik, ak bor graf la se koule nan kontwòl oswa enfòmasyon. Yon pwosesis ka tou senpleman rele nenpòt fonksyon Python, oswa li ka gen lojik pi konplèks nan sekans rele plizyè fonksyon nan kontèks yon klas. Pou operasyon ki pi souvan yo, deja gen anpil devlopman pare ki ka itilize kòm pwosesis. Devlopman sa yo enkli:

  • operatè - pou transfere done soti nan yon kote nan yon lòt, pou egzanp, soti nan yon tab baz done nan yon depo done;
  • detèktè - pou tann ensidan an nan yon sèten evènman ak dirije koule nan kontwòl nan somè ki vin apre nan graf la;
  • kwòk - pou operasyon nivo pi ba yo, pou egzanp, jwenn done ki sòti nan yon tab baz done (yo itilize nan deklarasyon);
  • elatriye

Li ta apwopriye pou dekri Apache Airflow an detay nan atik sa a. Entwodiksyon kout ka wè isit la oswa isit la.

Hook pou jwenn done

Premye a tout, pou rezoud pwoblèm nan, nou bezwen ekri yon zen ak ki nou ta ka:

  • konekte nan imèl
  • jwenn bon lèt la
  • resevwa done nan lèt la.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Lojik la se sa a: nou konekte, jwenn dènye lèt ki pi enpòtan an, si gen lòt, nou inyore yo. Yo itilize fonksyon sa a, paske lèt ki vin apre yo gen tout done yo nan lèt anvan yo. Si sa a se pa ka a, Lè sa a, ou ka retounen yon etalaj de tout lèt, oswa trete premye a, ak rès la sou pwochen pas la. An jeneral, tout bagay, kòm toujou, depann sou travay la.

Nou ajoute de fonksyon oksilyè nan zen an: pou telechaje yon fichye ak pou telechaje yon fichye lè l sèvi avèk yon lyen ki soti nan yon imèl. By wout la, yo ka deplase nan operatè a, sa depann de frekans nan lè l sèvi avèk fonksyonalite sa a. Ki lòt bagay pou ajoute nan zen an, ankò, depann sou travay la: si dosye yo resevwa imedyatman nan lèt la, Lè sa a, ou ka telechaje atachman nan lèt la, si done yo resevwa nan lèt la, Lè sa a, ou bezwen analize lèt la, elatriye. Nan ka mwen an, lèt la vini ak yon lyen nan achiv la, ki mwen bezwen mete nan yon sèten kote epi kòmanse pwosesis la plis pwosesis.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kòd la senp, kidonk li pa bezwen plis eksplikasyon. Mwen pral jis pale w sou liy majik imap_conn_id la. Apache Airflow estoke paramèt koneksyon (login, modpas, adrès, ak lòt paramèt) ki ka jwenn aksè nan yon idantifyan fisèl. Vizyèlman, jesyon koneksyon sanble sa a

Pwosesis ETL pou jwenn done ki soti nan imel nan Apache Airflow

Capteur pou tann done

Depi nou deja konnen ki jan yo konekte ak resevwa done ki soti nan lapòs, kounye a nou ka ekri yon Capteur pou tann yo. Nan ka mwen an, li pa t travay pou ekri yon operatè touswit ki pral trete done yo, si genyen, paske lòt pwosesis travay ki baze sou done yo resevwa nan lapòs la, ki gen ladan sa yo ki pran done ki gen rapò ak lòt sous (API, telefòn). , metrik entènèt, elatriye). elatriye). Mwen pral ba ou yon egzanp. Yon nouvo itilizatè parèt nan sistèm CRM a, epi nou toujou pa konnen sou UUID li. Lè sa a, lè w ap eseye resevwa done ki soti nan telefòn SIP, nou pral resevwa apèl ki mare ak UUID li yo, men nou pa yo pral kapab sove epi sèvi ak yo kòrèkteman. Nan zafè sa yo, li enpòtan pou kenbe nan tèt ou depandans done yo, sitou si yo soti nan diferan sous. Sa yo se, nan kou, mezi ensifizan pou prezève entegrite done, men nan kèk ka yo nesesè. Wi, ak idling pou okipe resous tou irasyonèl.

Kidonk, Capteur nou an pral lanse somè ki vin apre nan graf la si gen enfòmasyon fre nan lapòs la, epi tou make enfòmasyon anvan an kòm petinan.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Nou resevwa epi itilize done yo

Pou resevwa ak trete done, ou ka ekri yon operatè separe, ou ka itilize moun ki pare yo. Depi lojik la toujou trivial - pran done nan lèt la, pou egzanp, mwen sijere estanda PythonOperator la.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

By wout la, si lapòs antrepriz ou a tou sou mail.ru, Lè sa a, ou pa yo pral kapab fè rechèch pou lèt pa sijè, moun k ap voye, elatriye. Retounen nan 2016, yo te pwomèt prezante li, men aparamman chanje lide yo. Mwen rezoud pwoblèm sa a lè mwen kreye yon katab separe pou lèt ki nesesè yo epi mete yon filtè pou lèt ki nesesè yo nan koòdone entènèt lapòs la. Kidonk, sèlman lèt ki nesesè yo ak kondisyon pou rechèch la, nan ka mwen an, tou senpleman (UNSEEN) antre nan katab sa a.

Rezime, nou gen sekans sa a: nou tcheke si gen nouvo lèt ki satisfè kondisyon yo, si genyen, Lè sa a, nou telechaje achiv la lè l sèvi avèk lyen ki soti nan dènye lèt la.
Anba dènye pwen yo, li omisyon ke achiv sa a pral depake, done ki soti nan achiv la pral otorize ak trete, ak kòm yon rezilta, tout bagay la pral ale pi lwen nan tiyo a nan pwosesis ETL la, men sa a se deja pi lwen pase. sijè ki abòde lan atik la. Si li te tounen enteresan ak itil, Lè sa a, mwen pral kontan kontinye dekri solisyon ETL ak pati yo pou Apache Airflow.

Sous: www.habr.com

Add nouvo kòmantè