Proseso sa ETL alang sa pagkuha sa datos gikan sa email sa Apache Airflow

Proseso sa ETL alang sa pagkuha sa datos gikan sa email sa Apache Airflow

Bisan unsa pa kadaghan ang teknolohiya nga naugmad, ang usa ka hugpong sa mga karaan nga mga pamaagi kanunay nga nagsunod sa pag-uswag. Mahimong tungod kini sa usa ka hapsay nga pagbalhin, mga hinungdan sa tawo, mga panginahanglanon sa teknolohiya, o uban pa. Sa natad sa pagproseso sa datos, ang mga tinubdan sa datos mao ang labing nagpadayag niini nga bahin. Dili igsapayan kung unsa ka dako ang among damgo nga makuha kini, apan hangtod karon ang bahin sa datos gipadala sa mga instant messenger ug email, wala’y labot ang labi ka karaan nga mga format. Gidapit ko ikaw sa pag-disassemble sa usa sa mga kapilian alang sa Apache Airflow, nga naghulagway kung giunsa nimo pagkuha ang datos gikan sa mga email.

sa naunang kasaysayan

Daghang mga datos ang gibalhin gihapon pinaagi sa e-mail, gikan sa interpersonal nga komunikasyon hangtod sa mga sumbanan sa interaksyon tali sa mga kompanya. Maayo kung posible nga magsulat usa ka interface aron makakuha mga datos o ibutang ang mga tawo sa opisina nga mosulod niini nga kasayuran sa labi ka kombenyente nga mga gigikanan, apan sa kasagaran dili kini mahimo. Ang piho nga buluhaton nga akong giatubang mao ang pagkonektar sa bantog nga CRM nga sistema sa bodega sa datos, ug dayon sa sistema sa OLAP. Nahitabo kini sa kasaysayan nga alang sa among kompanya ang paggamit niini nga sistema kombenyente sa usa ka partikular nga lugar sa negosyo. Busa, ang tanan gusto gayud nga makahimo sa pag-operate uban sa data gikan niini nga ikatulo nga-partido nga sistema usab. Una sa tanan, siyempre, ang posibilidad sa pagkuha sa datos gikan sa usa ka bukas nga API gitun-an. Ikasubo, ang API wala maglakip sa pagkuha sa tanan nga gikinahanglan nga datos, ug, sa yano nga mga termino, kini sa daghang mga paagi hiwi, ug ang teknikal nga suporta dili gusto o dili makatagbo sa tunga-tunga sa paghatag og mas komprehensibo nga gamit. Apan kini nga sistema naghatag higayon nga makadawat matag karon ug unya sa nawala nga datos pinaagi sa koreo sa porma sa usa ka link alang sa pagdiskarga sa archive.

Kinahanglan nga hinumdoman nga dili lamang kini ang kaso diin gusto sa negosyo nga mangolekta mga datos gikan sa mga email o instant messenger. Bisan pa, sa kini nga kaso, dili kami makaimpluwensya sa usa ka kompanya sa ikatulo nga partido nga naghatag bahin sa datos lamang sa kini nga paagi.

apache airflow

Aron matukod ang mga proseso sa ETL, kanunay namong gigamit ang Apache Airflow. Aron ang usa ka magbabasa nga dili pamilyar sa kini nga teknolohiya mas masabtan kung unsa ang hitsura niini sa konteksto ug sa kinatibuk-an, akong ihulagway ang usa ka pares nga pasiuna.

Ang Apache Airflow usa ka libre nga plataporma nga gigamit sa pagtukod, pagpatuman ug pagmonitor sa mga proseso sa ETL (Extract-Transform-Loading) sa Python. Ang nag-unang konsepto sa Airflow usa ka direkta nga acyclic graph, diin ang mga vertices sa graph mga piho nga proseso, ug ang mga kilid sa graph mao ang dagan sa kontrol o impormasyon. Ang usa ka proseso mahimong yano nga motawag sa bisan unsang Python function, o kini adunay mas komplikado nga lohika gikan sa sunod-sunod nga pagtawag sa daghang mga function sa konteksto sa usa ka klase. Alang sa labing kanunay nga mga operasyon, adunay daghan na nga andam nga mga pag-uswag nga magamit ingon mga proseso. Ang maong mga kalamboan naglakip sa:

  • operators - alang sa pagbalhin sa data gikan sa usa ka dapit ngadto sa lain, alang sa panig-ingnan, gikan sa usa ka database lamesa ngadto sa usa ka data bodega;
  • mga sensor - alang sa paghulat sa panghitabo sa usa ka piho nga panghitabo ug pagdirekta sa dagan sa kontrol sa sunod nga mga vertices sa graph;
  • mga kaw-it - alang sa ubos nga lebel nga mga operasyon, pananglitan, aron makakuha og datos gikan sa lamesa sa database (gigamit sa mga pahayag);
  • ug uban pa.

Dili angay nga ihulagway ang Apache Airflow sa detalye niini nga artikulo. Ang mubu nga mga pasiuna mahimong tan-awon dinhi o dinhi.

Hook alang sa pagkuha sa datos

Una sa tanan, aron masulbad ang problema, kinahanglan namon nga magsulat usa ka kaw-it diin mahimo namon:

  • sumpay sa email
  • pangitaa ang husto nga letra
  • makadawat og datos gikan sa sulat.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Ang lohika mao kini: nagkonektar kami, pangitaa ang katapusan nga labing may kalabutan nga letra, kung adunay uban, wala namon sila gibalewala. Gigamit kini nga function, tungod kay ang ulahi nga mga letra naglangkob sa tanan nga datos sa mga nauna. Kung dili kini ang kahimtang, mahimo nimong ibalik ang usa ka han-ay sa tanan nga mga letra, o iproseso ang una, ug ang nahabilin sa sunod nga pass. Sa kinatibuk-an, ang tanan, sama sa kanunay, nagdepende sa buluhaton.

Nagdugang kami og duha ka auxiliary function sa hook: alang sa pag-download sa usa ka file ug alang sa pag-download sa usa ka file gamit ang usa ka link gikan sa usa ka email. Pinaagi sa dalan, mahimo silang ibalhin sa operator, nagdepende kini sa kadaghan sa paggamit niini nga gamit. Unsa pa ang idugang sa kaw-it, pag-usab, nagdepende sa buluhaton: kung ang mga file madawat dayon sa sulat, nan mahimo nimong i-download ang mga attachment sa sulat, kung ang datos nadawat sa sulat, nan kinahanglan nimo nga i-parse ang sulat, ug uban pa. Sa akong kaso, ang sulat adunay usa ka link sa archive, nga kinahanglan nako ibutang sa usa ka lugar ug magsugod sa dugang nga proseso sa pagproseso.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Simple ra ang code, mao nga halos wala na kini magkinahanglan ug dugang nga katin-awan. Sultihan ko lang ka bahin sa magic line nga imap_conn_id. Ang Apache Airflow nagtipig sa mga parameter sa koneksyon (login, password, address, ug uban pang mga parameter) nga mahimong ma-access pinaagi sa usa ka string identifier. Sa tan-aw, ang pagdumala sa koneksyon ingon niini

Proseso sa ETL alang sa pagkuha sa datos gikan sa email sa Apache Airflow

Sensor nga maghulat alang sa datos

Tungod kay nahibal-an na namon kung giunsa ang pagkonektar ug pagdawat mga datos gikan sa mail, mahimo na namon magsulat usa ka sensor aron maghulat alang kanila. Sa akong kaso, wala kini nagtrabaho sa pagsulat dayon sa usa ka operator nga magproseso sa datos, kung naa man, tungod kay ang ubang mga proseso nagtrabaho base sa datos nga nadawat gikan sa mail, lakip ang mga nagkuha sa mga may kalabutan nga datos gikan sa ubang mga gigikanan (API, telephony , web metrics, etc.). etc.). Hatagan ko ikaw usa ka pananglitan. Usa ka bag-ong user ang mitungha sa CRM system, ug wala gihapon mi kahibalo bahin sa iyang UUID. Unya, sa dihang mosulay sa pagdawat ug data gikan sa SIP telephony, makadawat kami ug mga tawag nga nahigot sa UUID niini, apan dili namo kini matipigan ug magamit sa husto. Sa ingon nga mga butang, hinungdanon nga hinumdoman ang pagsalig sa datos, labi na kung kini gikan sa lainlaing mga gigikanan. Kini, siyempre, dili igo nga mga lakang aron mapreserbar ang integridad sa datos, apan sa pipila ka mga kaso kini gikinahanglan. Oo, ug ang pag-undang sa pag-okupar sa mga kahinguhaan dili usab makatarunganon.

Sa ingon, ang among sensor maglansad sa sunod nga mga vertices sa graph kung adunay bag-ong kasayuran sa mail, ug markahan usab ang miaging kasayuran nga wala’y kalabotan.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Kami nakadawat ug naggamit sa datos

Aron makadawat ug pagproseso sa datos, mahimo ka magsulat sa usa ka bulag nga operator, mahimo nimong gamiton ang mga andam na. Tungod kay hangtod karon ang lohika wala’y hinungdan - aron makuha ang datos gikan sa sulat, pananglitan, gisugyot nako ang sukaranan nga PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Pinaagi sa dalan, kung ang imong corporate mail naa usab sa mail.ru, nan dili ka makapangita sa mga sulat pinaagi sa hilisgutan, nagpadala, ug uban pa. Balik sa 2016, sila misaad sa pagpaila niini, apan dayag nga nausab ang ilang mga hunahuna. Gisulbad nako kini nga problema pinaagi sa paghimo og usa ka separado nga folder alang sa gikinahanglan nga mga letra ug pag-set up og filter alang sa gikinahanglan nga mga letra sa mail web interface. Busa, ang gikinahanglan nga mga letra ug mga kondisyon alang sa pagpangita, sa akong kaso, yano (UNSEEN) nga makasulod niini nga folder.

Sa pag-summarize, kami adunay mosunod nga han-ay: among gisusi kung adunay bag-ong mga letra nga nagtagbo sa mga kondisyon, kung adunay, unya among i-download ang archive gamit ang link gikan sa katapusang sulat.
Ubos sa katapusan nga mga tuldok, wala'y mahimo nga kini nga archive ma-unpack, ang datos gikan sa archive malimpyohan ug maproseso, ug isip resulta, ang tibuok nga butang moadto sa pipeline sa proseso sa ETL, apan kini labaw pa. ang gilapdon sa artikulo. Kung kini nahimo nga makapaikag ug mapuslanon, nan malipayon kong magpadayon sa paghulagway sa mga solusyon sa ETL ug sa ilang mga bahin alang sa Apache Airflow.

Source: www.habr.com

Idugang sa usa ka comment