Mchakato wa ETL wa kupata data kutoka kwa barua pepe katika Apache Airflow

Mchakato wa ETL wa kupata data kutoka kwa barua pepe katika Apache Airflow

Haijalishi ni teknolojia ngapi inakua, mlolongo wa mbinu za kizamani daima hufuata maendeleo. Hii inaweza kuwa kutokana na mabadiliko ya laini, mambo ya kibinadamu, mahitaji ya kiteknolojia, au kitu kingine. Katika uwanja wa usindikaji wa data, vyanzo vya data ndivyo vinavyofichua zaidi katika sehemu hii. Haijalishi ni kiasi gani tunaota ya kuondokana na hili, lakini hadi sasa sehemu ya data inatumwa kwa wajumbe wa papo hapo na barua pepe, bila kutaja fomati zaidi za kizamani. Ninakualika utenganishe moja ya chaguo za Apache Airflow, nikionyesha jinsi unavyoweza kuchukua data kutoka kwa barua pepe.

kabla ya historia

Data nyingi bado huhamishwa kupitia barua pepe, kutoka kwa mawasiliano baina ya watu hadi viwango vya mwingiliano kati ya makampuni. Ni vizuri ikiwa inawezekana kuandika kiolesura cha kupata data au kuweka watu ofisini ambao wataingiza habari hii katika vyanzo rahisi zaidi, lakini mara nyingi hii inaweza kuwa haiwezekani. Jukumu mahususi ambalo nilikabili lilikuwa kuunganisha mfumo wa CRM maarufu kwenye ghala la data, na kisha kwa mfumo wa OLAP. Ilifanyika kihistoria kwamba kwa kampuni yetu matumizi ya mfumo huu yalikuwa rahisi katika eneo fulani la biashara. Kwa hivyo, kila mtu alitaka sana kuweza kufanya kazi na data kutoka kwa mfumo huu wa watu wengine pia. Awali ya yote, bila shaka, uwezekano wa kupata data kutoka kwa API wazi ilisomwa. Kwa bahati mbaya, API haikushughulikia kupata data zote muhimu, na, kwa maneno rahisi, ilikuwa imepotoshwa kwa njia nyingi, na msaada wa kiufundi haukutaka au haukuweza kufikia nusu ili kutoa utendakazi wa kina zaidi. Lakini mfumo huu ulitoa fursa ya kupokea mara kwa mara data iliyokosekana kwa njia ya barua kwa njia ya kiunga cha kupakua kumbukumbu.

Ikumbukwe kwamba hii haikuwa kesi pekee ambayo biashara ilitaka kukusanya data kutoka kwa barua pepe au wajumbe wa papo hapo. Hata hivyo, katika kesi hii, hatukuweza kushawishi kampuni ya tatu ambayo hutoa sehemu ya data kwa njia hii tu.

mtiririko wa hewa wa apache

Ili kuunda michakato ya ETL, mara nyingi sisi hutumia Apache Airflow. Ili msomaji ambaye hajui teknolojia hii kuelewa vizuri jinsi inavyoonekana katika muktadha na kwa ujumla, nitaelezea michache ya utangulizi.

Apache Airflow ni jukwaa lisilolipishwa ambalo hutumika kujenga, kutekeleza na kufuatilia michakato ya ETL (Extract-Transform-Loading) katika Python. Dhana kuu katika Airflow ni grafu ya acyclic iliyoelekezwa, ambapo vipeo vya grafu ni michakato maalum, na kingo za grafu ni mtiririko wa udhibiti au habari. Mchakato unaweza tu kuita kazi yoyote ya Python, au inaweza kuwa na mantiki ngumu zaidi kutoka kwa kuita kazi kadhaa mfululizo katika muktadha wa darasa. Kwa shughuli za mara kwa mara, tayari kuna maendeleo mengi yaliyotengenezwa tayari ambayo yanaweza kutumika kama michakato. Maendeleo kama haya ni pamoja na:

  • waendeshaji - kwa kuhamisha data kutoka sehemu moja hadi nyingine, kwa mfano, kutoka kwa meza ya database hadi ghala la data;
  • sensorer - kwa kusubiri tukio la tukio fulani na kuelekeza mtiririko wa udhibiti kwa wima zifuatazo za grafu;
  • ndoano - kwa shughuli za kiwango cha chini, kwa mfano, kupata data kutoka kwa meza ya hifadhidata (kutumika katika taarifa);
  • nk

Haitakuwa sawa kuelezea Apache Airflow kwa undani katika nakala hii. Utangulizi mfupi unaweza kutazamwa hapa au hapa.

Hook kwa kupata data

Kwanza kabisa, ili kutatua shida, tunahitaji kuandika ndoano ambayo tunaweza:

  • unganisha kwa barua pepe
  • tafuta barua sahihi
  • kupokea data kutoka kwa barua.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Mantiki ni hii: tunaunganisha, kupata barua ya mwisho inayofaa zaidi, ikiwa kuna wengine, tunawapuuza. Kazi hii inatumiwa, kwa sababu barua za baadaye zina data zote za mapema. Ikiwa sivyo, basi unaweza kurudisha safu ya herufi zote, au kusindika ya kwanza, na iliyobaki kwenye kupita inayofuata. Kwa ujumla, kila kitu, kama kawaida, inategemea kazi.

Tunaongeza kazi mbili za msaidizi kwenye ndoano: kwa kupakua faili na kupakua faili kwa kutumia kiungo kutoka kwa barua pepe. Kwa njia, wanaweza kuhamishwa kwa operator, inategemea mzunguko wa kutumia utendaji huu. Nini kingine cha kuongeza kwenye ndoano, tena, inategemea kazi: ikiwa faili zinapokelewa mara moja katika barua, basi unaweza kupakua viambatisho kwa barua, ikiwa data imepokelewa katika barua, basi unahitaji kuchanganua barua, na kadhalika. Katika kesi yangu, barua inakuja na kiungo kimoja kwenye kumbukumbu, ambayo ninahitaji kuweka mahali fulani na kuanza mchakato zaidi wa usindikaji.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Nambari ni rahisi, kwa hivyo haihitaji maelezo zaidi. Nitakuambia tu kuhusu mstari wa kichawi imap_conn_id. Apache Airflow huhifadhi vigezo vya muunganisho (kuingia, nenosiri, anwani, na vigezo vingine) vinavyoweza kufikiwa na kitambulishi cha kamba. Kwa kuibua, usimamizi wa muunganisho unaonekana kama hii

Mchakato wa ETL wa kupata data kutoka kwa barua pepe katika Apache Airflow

Kitambuzi cha kusubiri data

Kwa kuwa tayari tunajua jinsi ya kuunganisha na kupokea data kutoka kwa barua, sasa tunaweza kuandika kitambuzi ili kuwasubiri. Katika kesi yangu, haikufanya kazi kuandika opereta mara moja ambayo itashughulikia data, ikiwa ipo, kwa sababu michakato mingine inafanya kazi kulingana na data iliyopokelewa kutoka kwa barua, pamoja na zile zinazochukua data inayohusiana kutoka kwa vyanzo vingine (API, simu. , vipimo vya wavuti, n.k.) nk.). Nitakupa mfano. Mtumiaji mpya ameonekana katika mfumo wa CRM, na bado hatujui kuhusu UUID yake. Kisha, tunapojaribu kupokea data kutoka kwa simu ya SIP, tutapokea simu zilizounganishwa na UUID yake, lakini hatutaweza kuzihifadhi na kuzitumia kwa usahihi. Katika masuala hayo, ni muhimu kukumbuka utegemezi wa data, hasa ikiwa ni kutoka kwa vyanzo tofauti. Hizi ni, bila shaka, hatua za kutosha za kuhifadhi uadilifu wa data, lakini katika baadhi ya matukio ni muhimu. Ndio, na kuzembea kuchukua rasilimali pia ni jambo lisilo la busara.

Kwa hivyo, sensor yetu itazindua wima zinazofuata za grafu ikiwa kuna habari mpya kwenye barua, na pia alama habari ya hapo awali kuwa haina maana.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Tunapokea na kutumia data

Ili kupokea na kusindika data, unaweza kuandika opereta tofauti, unaweza kutumia zilizotengenezwa tayari. Kwa kuwa hadi sasa mantiki ni ndogo - kuchukua data kutoka kwa barua, kwa mfano, ninapendekeza PythonOperator ya kawaida.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Kwa njia, ikiwa barua yako ya ushirika pia iko kwenye mail.ru, basi hutaweza kutafuta barua kwa somo, mtumaji, nk. Huko nyuma mnamo 2016, waliahidi kuitambulisha, lakini inaonekana walibadilisha mawazo yao. Nilitatua tatizo hili kwa kuunda folda tofauti kwa barua muhimu na kuanzisha chujio kwa barua muhimu katika interface ya mtandao wa barua. Kwa hivyo, barua na masharti muhimu tu ya utaftaji, kwa upande wangu, kwa urahisi (isiyoonekana) huingia kwenye folda hii.

Kwa muhtasari, tuna mlolongo wafuatayo: tunaangalia ikiwa kuna barua mpya zinazokidhi masharti, ikiwa zipo, basi tunapakua kumbukumbu kwa kutumia kiungo kutoka kwa barua ya mwisho.
Chini ya dots za mwisho, imeachwa kuwa kumbukumbu hii itafunguliwa, data kutoka kwenye kumbukumbu itafutwa na kusindika, na kwa sababu hiyo, jambo lote litaenda zaidi kwenye bomba la mchakato wa ETL, lakini hii tayari imepita. upeo wa makala. Ikiwa iligeuka kuwa ya kufurahisha na muhimu, basi nitaendelea kuelezea suluhisho za ETL na sehemu zao za Apache Airflow.

Chanzo: mapenzi.com

Kuongeza maoni