Ilana ETL fun gbigba data lati imeeli ni Apache Airflow

Ilana ETL fun gbigba data lati imeeli ni Apache Airflow

Laibikita bawo ni imọ-ẹrọ ti ndagba, idagbasoke nigbagbogbo tẹle nipasẹ okun ti awọn isunmọ ti igba atijọ. Eyi le jẹ nitori iyipada didan, awọn ifosiwewe eniyan, awọn iwulo imọ-ẹrọ, tabi nkan miiran. Ni aaye ti sisẹ data, ifihan julọ julọ ni apakan yii jẹ awọn orisun data. Laibikita bawo ni a ti nireti lati yọkuro kuro ninu eyi, fun bayi diẹ ninu awọn data ni a firanṣẹ ni awọn ojiṣẹ lẹsẹkẹsẹ ati awọn apamọ, kii ṣe mẹnukan awọn ọna kika archaic diẹ sii. Mo pe ọ lati wo ọkan ninu awọn aṣayan fun Apache Airflow, n ṣe apejuwe bi o ṣe le gba data lati awọn imeeli.

prehistory

Pupọ data tun jẹ gbigbe nipasẹ imeeli, lati awọn ibaraẹnisọrọ ti ara ẹni si awọn iṣedede ibaraenisepo laarin awọn ile-iṣẹ. O dara ti o ba le kọ wiwo kan lati gba data tabi fi awọn eniyan sinu ọfiisi ti yoo tẹ alaye yii sii sinu awọn orisun irọrun diẹ sii, ṣugbọn nigbagbogbo iṣeeṣe yii le jiroro ko si nibẹ. Iṣẹ-ṣiṣe pato ti Mo dojuko ni sisopọ eto CRM ti a mọ daradara si ile-itaja data, ati lẹhinna si eto OLAP kan. Itan-akọọlẹ, fun ile-iṣẹ wa, lilo eto yii rọrun ni agbegbe kan pato ti iṣowo. Nitorinaa, gbogbo eniyan fẹ gaan lati ni anfani lati ṣiṣẹ pẹlu data lati inu eto ẹnikẹta daradara. Ni akọkọ, nitorinaa, o ṣeeṣe lati gba data lati API ṣiṣi ti ṣawari. Laanu, API ko ni aabo gbigba gbogbo data pataki, ati, ni awọn ofin ti o rọrun, o jẹ wiwọ pupọ, ati atilẹyin imọ-ẹrọ ko fẹ tabi ko lagbara lati pade ni agbedemeji lati pese iṣẹ ṣiṣe ti o peye diẹ sii. Ṣugbọn eto yii pese aye lati gba data ti o padanu lorekore nipasẹ imeeli ni irisi ọna asopọ lati ṣe igbasilẹ iwe-ipamọ naa.

O yẹ ki o ṣe akiyesi pe eyi kii ṣe ọran nikan ninu eyiti iṣowo kan fẹ lati gba data lati awọn imeeli tabi awọn ojiṣẹ lẹsẹkẹsẹ. Sibẹsibẹ, ninu ọran yii, a ko le ni agba ile-iṣẹ ẹnikẹta ti o pese apakan ti data nikan ni ọna yii.

Afun Afẹfẹ

Lati kọ awọn ilana ETL, a nigbagbogbo lo Apache Airflow. Ni ibere fun oluka ti ko mọ pẹlu imọ-ẹrọ yii lati ni oye daradara bi o ṣe n wo ni ọrọ ati ni apapọ, Emi yoo ṣe apejuwe awọn meji ti awọn ifarahan.

Apache Airflow jẹ pẹpẹ ti o ni ọfẹ ti o lo lati kọ, ṣiṣẹ, ati atẹle awọn ilana ETL (Jade-Transform-Loading) ni Python. Agbekale akọkọ ni Airflow jẹ iwọn acyclic ti a darí, nibiti awọn igun ti iwọn jẹ awọn ilana kan pato, ati awọn egbegbe ti iwọn jẹ ṣiṣan iṣakoso tabi alaye. Ilana kan le jiroro ni pe eyikeyi iṣẹ Python, tabi o le ni imọ-jinlẹ eka diẹ sii ti pipe awọn iṣẹ lẹsẹsẹ ni agbegbe ti kilasi kan. Fun awọn iṣẹ ṣiṣe ti o wọpọ julọ, ọpọlọpọ awọn idagbasoke ti a ti ṣetan tẹlẹ ti wa ti o le ṣee lo bi awọn ilana. Iru awọn idagbasoke pẹlu:

  • awọn oniṣẹ - fun gbigbe data lati ibi kan si omiran, fun apẹẹrẹ, lati tabili data si ibi ipamọ data;
  • awọn sensosi - lati duro fun iṣẹlẹ ti iṣẹlẹ kan ati taara ṣiṣan iṣakoso si awọn inaro ti o tẹle ti iwọn;
  • ìkọ - fun awọn iṣẹ ipele-kekere, fun apẹẹrẹ, fun gbigba data pada lati tabili data data (ti a lo ninu awọn alaye);
  • ati bẹbẹ lọ.

Yoo jẹ aibojumu lati ṣapejuwe Apache Airflow ni awọn alaye ni nkan yii. Awọn ifihan kukuru ni a le wo nibi tabi nibi.

Kio fun gbigba data

Ni akọkọ, lati yanju iṣoro naa a nilo lati kọ kio kan pẹlu eyiti a le:

  • sopọ si imeeli;
  • ri lẹta ti o nilo;
  • gba data lati kan lẹta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Awọn kannaa ni yi: a so, ri awọn ti o kẹhin julọ ti o yẹ lẹta, ti o ba ti nibẹ ni o wa miiran, a foju wọn. Iṣẹ yii ni a lo nitori awọn ifiranṣẹ nigbamii ni gbogbo data lati awọn iṣaaju ninu. Ti eyi ko ba jẹ ọran, lẹhinna o le da ọpọlọpọ awọn lẹta pada tabi ṣe ilana akọkọ ati iyokù lori iwe-iwọle atẹle. Ni gbogbogbo, bi nigbagbogbo, ohun gbogbo da lori iṣẹ-ṣiṣe.

A ṣafikun awọn iṣẹ oluranlọwọ meji si kio: fun gbigba faili kan ati gbigba faili kan nipa lilo ọna asopọ lati lẹta kan. Nipa ọna, wọn le wa ninu oniṣẹ ẹrọ, o da lori igbohunsafẹfẹ ti lilo iṣẹ yii. Kini ohun miiran lati fi kun si kio, lẹẹkansi, da lori iṣẹ-ṣiṣe naa: ti o ba gba awọn faili lẹsẹkẹsẹ ninu lẹta naa, lẹhinna o le ṣe igbasilẹ awọn asomọ si lẹta naa, ti data ba wa ninu lẹta naa, lẹhinna o nilo lati sọ lẹta naa, ati bẹbẹ lọ. Ninu ọran mi, lẹta naa de pẹlu ọna asopọ kan si ile-ipamọ, eyiti Mo nilo lati fi si aaye kan ki o bẹrẹ sisẹ siwaju sii.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Awọn koodu ni o rọrun, ki o fee nilo eyikeyi afikun alaye. Emi yoo kan sọ fun ọ nipa ila idan imap_conn_id. Apache Airflow tọju awọn paramita asopọ (iwọle, ọrọ igbaniwọle, adirẹsi ati awọn aye miiran), eyiti o le wọle nipasẹ idanimọ okun. Ni wiwo, iṣakoso asopọ dabi eyi

Ilana ETL fun gbigba data lati imeeli ni Apache Airflow

Sensọ fun nduro fun data

Niwọn bi a ti mọ tẹlẹ bi a ṣe le sopọ ati gba data lati meeli, a le kọ sensọ bayi lati duro de. Ninu ọran mi, kikọ lẹsẹkẹsẹ oniṣẹ ẹrọ kan ti yoo ṣe ilana data naa, ti eyikeyi, ko ṣiṣẹ, nitori awọn ilana miiran ṣiṣẹ da lori data ti o gba lati meeli, pẹlu awọn ti o gba data ti o jọmọ lati awọn orisun miiran (API, telephony, awọn metiriki wẹẹbu, ati be be lo) ati be be lo). Jẹ ki n fun ọ ni apẹẹrẹ. Olumulo tuntun ti han ninu eto CRM, ati pe a ko tii mọ nipa UUID rẹ. Lẹhinna, nigba ti a ba gbiyanju lati gba data lati foonu SIP, a yoo gba awọn ipe ti a so mọ UUID rẹ, ṣugbọn a kii yoo ni anfani lati fipamọ daradara ati lo wọn. Ni iru awọn ọrọ bẹẹ, o ṣe pataki lati tọju ni lokan igbẹkẹle ti data, paapaa ti wọn ba wa lati awọn orisun oriṣiriṣi. Iwọnyi jẹ, nitorinaa, awọn igbese ti ko to lati ṣetọju iduroṣinṣin data, ṣugbọn ni awọn ọran wọn jẹ pataki. Ati yiya awọn orisun lakoko ti ko ṣiṣẹ tun jẹ aibikita.

Nitorinaa, sensọ wa yoo ṣe ifilọlẹ awọn inaro ti o tẹle ti iwọn ti alaye tuntun ba wa ninu meeli, ati tun samisi alaye iṣaaju bi ko ṣe pataki.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Gbigba ati lilo data

Lati gba ati ṣiṣẹ data, o le kọ oniṣẹ ẹrọ lọtọ, tabi o le lo awọn ti a ti ṣetan. Niwọn igba ti ọgbọn naa tun jẹ bintin - lati gba data lati lẹta kan, lẹhinna bi apẹẹrẹ Mo daba pe PythonOperator boṣewa

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Nipa ọna, ti imeeli ile-iṣẹ rẹ tun wa lori mail.ru, lẹhinna o kii yoo ni anfani lati wa awọn lẹta nipasẹ koko-ọrọ, olufiranṣẹ, ati bẹbẹ lọ. Wọn ṣe ileri lati ṣafihan rẹ pada ni ọdun 2016, ṣugbọn o han gedegbe yi ọkan wọn pada. Mo yanju iṣoro yii nipa ṣiṣẹda folda lọtọ fun awọn lẹta pataki ati ṣeto àlẹmọ ni wiwo wẹẹbu meeli fun awọn lẹta pataki. Nitorinaa, awọn lẹta pataki nikan ati awọn ipo wiwa, ninu ọran mi, nirọrun (UNSEEN) lọ sinu folda yii.

Lati ṣe akopọ, a ni ọna atẹle: a ṣayẹwo boya awọn lẹta tuntun wa ti o pade awọn ipo; ti o ba wa, lẹhinna a ṣe igbasilẹ iwe-ipamọ naa nipa lilo ọna asopọ lati lẹta ti o kẹhin.
Labẹ awọn ellipses ti o kẹhin, o ti yọkuro pe iwe-ipamọ yii yoo jẹ ṣiṣi silẹ, data lati ile-ipamọ yoo di mimọ ati ṣiṣẹ, ati ni ipari gbogbo nkan yii yoo lọ siwaju si opo gigun ti epo ilana ETL, ṣugbọn eyi ti kọja opin ti nkan naa. Ti o ba jẹ ohun ti o nifẹ ati iwulo, lẹhinna Emi yoo ni idunnu lati tẹsiwaju apejuwe awọn solusan ETL ati awọn ẹya wọn fun Apache Airflow.

orisun: www.habr.com

Fi ọrọìwòye kun