Apache Airflow-en posta elektronikoko datuak lortzeko ETL prozesua

Apache Airflow-en posta elektronikoko datuak lortzeko ETL prozesua

Teknologia zenbat garatzen den edozein dela ere, ikuspegi zaharkituen kate bat beti dago garapenaren atzean. Trantsizio leun baten, giza faktoreen, behar teknologikoen edo beste zerbaiten ondorioz izan daiteke hori. Datuen tratamenduaren alorrean, datu-iturriak dira zati honetan adierazgarrienak. Hori kentzearekin zenbat amesten dugun, baina orain arte datuen zati bat berehalako mezulari eta posta elektronikoetan bidaltzen da, formatu arkaikoagoak ahaztu gabe. Apache Airflow-en aukeretako bat desmuntatzera gonbidatzen zaitut, mezu elektronikoetatik datuak nola hartu ditzakezun ilustratuz.

historiaurrea

Datu asko oraindik posta elektroniko bidez transferitzen dira, pertsonen arteko komunikazioetatik hasi eta enpresen arteko elkarrekintza estandaretaraino. Ona da datuak lortzeko interfaze bat idaztea edo informazio hori iturri erosoagoetan sartuko duten pertsonak bulegoan jartzea posible bada, baina askotan hori ez da posible izango. Aurre egin nuen zeregin zehatza CRM sistema ezaguna datu biltegira konektatzea izan zen, eta gero OLAP sistemara. Gertatu zen historikoki gure enpresarentzat sistema hau erabiltzea komenigarria zela negozio-eremu jakin batean. Hori dela eta, denek nahi zuten hirugarren sistema honetako datuekin ere funtzionatu ahal izateko. Lehenik eta behin, noski, API ireki batetik datuak lortzeko aukera aztertu zen. Zoritxarrez, APIak ez zituen beharrezko datu guztiak eskuratzea estaltzen, eta, termino sinpleetan, modu askotan okerra zegoen, eta laguntza teknikoak ez zuen erdibidean bete nahi edo ezin izan zuen funtzionalitate zabalagoa eskaintzeko. Baina sistema honek aukera ematen zuen aldian-aldian falta ziren datuak postaz jasotzeko artxiboa deskargatzeko esteka moduan.

Kontuan izan behar da ez dela negozioak posta elektroniko edo berehalako mezularien datuak bildu nahi izan dituen kasu bakarra. Hala ere, kasu honetan, ezin izan dugu datuen zati bat horrela bakarrik ematen duen hirugarren enpresa batean eragin.

apache aire-fluxua

ETL prozesuak eraikitzeko, gehienetan Apache Airflow erabiltzen dugu. Teknologia hau ezagutzen ez duen irakurle batek testuinguruan eta orokorrean duen itxura hobeto uler dezan, sarrerako pare bat deskribatuko ditut.

Apache Airflow Python-en ETL (Extract-Transform-Loading) prozesuak eraikitzeko, exekutatzeko eta monitorizatzeko erabiltzen den doako plataforma bat da. Airflow-en kontzeptu nagusia grafiko azikliko zuzendua da, non grafikoaren erpinak prozesu zehatzak diren, eta grafikoaren ertzak kontrol edo informazio-fluxua diren. Prozesu batek Python-en edozein funtzio dei dezake, edo logika konplexuagoa izan dezake klase baten testuinguruan hainbat funtzio sekuentzialki deitzean. Eragiketarik ohikoenetarako, dagoeneko prozesu gisa erabil daitezkeen prest egindako garapen asko daude. Halako garapenak honako hauek dira:

  • operadoreak - datuak leku batetik bestera transferitzeko, adibidez, datu-baseko taula batetik datu biltegira;
  • sentsoreak - gertaera jakin bat gertatuko denaren zain eta kontrol-fluxua grafikoaren ondorengo erpinetara bideratzeko;
  • kakoak - behe-mailako eragiketetarako, adibidez, datu-baseko taula bateko datuak lortzeko (adierazpenetan erabiltzen dena);
  • eta abar.

Desegokia litzateke artikulu honetan Apache Airflow zehatz-mehatz deskribatzea. Sarrera laburrak ikus daitezke Hemen edo Hemen.

Datuak lortzeko kakoa

Lehenik eta behin, arazoa konpontzeko, kako bat idatzi behar dugu, eta horrekin:

  • konektatu posta elektronikora
  • aurkitu letra egokia
  • gutunetik datuak jaso.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika hau da: konektatu egiten dugu, azken letrarik garrantzitsuena aurkitzen dugu, beste batzuk badaude, alde batera uzten ditugu. Funtzio hau erabiltzen da, ondorengo letrek aurrekoen datu guztiak baitituzte. Horrela ez bada, letra guztien array bat itzuli dezakezu edo lehenengoa prozesatu eta gainerakoak hurrengo pasean. Oro har, dena, beti bezala, zereginaren araberakoa da.

Bi funtzio osagarri gehitzen dizkiogu kakoari: fitxategi bat deskargatzeko eta fitxategi bat deskargatzeko e-mail bateko esteka erabiliz. Bide batez, operadorearengana eraman daitezke, funtzionalitate hau erabiltzeko maiztasunaren araberakoa da. Zer gehiago gehitu amuari, berriro ere, zereginaren araberakoa da: fitxategiak gutunean berehala jasotzen badira, gutunaren eranskinak deskargatu ditzakezu, datuak gutunean jasotzen badira, gutuna analizatu behar duzu, etab. Nire kasuan, gutuna artxiborako esteka batekin dator, leku jakin batean jarri eta prozesatzeko prozesuari ekin behar diot.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kodea sinplea da, beraz, ez du ia azalpen gehiago behar. Imap_conn_id lerro magikoaren berri emango dizut. Apache Airflow-ek konexio-parametroak gordetzen ditu (saioa, pasahitza, helbidea eta beste parametro batzuk) kate-identifikatzaile baten bidez atzi daitezkeen. Ikusmenean, konexioen kudeaketak itxura hau du

Apache Airflow-en posta elektronikoko datuak lortzeko ETL prozesua

Datuen zain egoteko sentsorea

Dagoeneko postatik datuak konektatu eta jasotzen dakigunez, orain sentsore bat idatz dezakegu haien zain egoteko. Nire kasuan, ez du funtzionatu datuak prozesatuko dituen operadore bat idaztea berehala, halakorik balego, beste prozesu batzuek postatik jasotako datuetan oinarrituta funtzionatzen dutelako, beste iturri batzuetatik erlazionatutako datuak hartzen dituztenak barne (APIa, telefonia). , web-neurriak, etab.) etab.). Adibide bat jarriko dizut. Erabiltzaile berri bat agertu da CRM sisteman, eta oraindik ez dakigu bere UUIDaren berri. Gero, SIP telefoniatik datuak jasotzen saiatzean, bere UUIDari lotuta deiak jasoko ditugu, baina ezin izango ditugu behar bezala gorde eta erabili. Horrelako gaietan, kontuan izan behar da datuen menpekotasuna, batez ere iturri ezberdinetakoak badira. Horiek, noski, datuen osotasuna zaintzeko neurri nahikoak ez dira, baina kasu batzuetan beharrezkoak dira. Bai, eta baliabideak okupatzeko alfer-jartzea ere irrazionala da.

Horrela, gure sentsoreak grafikoaren ondorengo erpinak abiaraziko ditu posta elektronikoan informazio freskoa badago, eta aurreko informazioa garrantzirik gabekotzat ere markatuko du.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Datuak jasotzen eta erabiltzen ditugu

Datuak jaso eta prozesatzeko, beste operadore bat idatz dezakezu, prest egindakoak erabil ditzakezu. Logika oraindik hutsala denez - gutunetik datuak hartzeko, adibidez, PythonOperator estandarra proposatzen dut

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Bide batez, zure posta korporatiboa mail.ru-n ere badago, orduan ezin izango dituzu gutunak bilatu gaiaren, bidaltzailearen, etab. 2016an, aurkeztuko zutela agindu zuten, baina itxuraz iritziz aldatu zuten. Arazo hau konpondu nuen beharrezkoak diren letren karpeta bereizi bat sortuz eta beharrezko letren iragazki bat ezarriz posta web interfazean. Horrela, bilaketarako beharrezkoak diren letrak eta baldintzak bakarrik, nire kasuan, besterik gabe (IKUSITA) karpeta honetan sartzen dira.

Laburbilduz, ondoko sekuentzia dugu: baldintzak betetzen dituzten letra berririk ba ote dagoen egiaztatzen dugu, badaude, gero artxiboa deskargatuko dugu azken letrako esteka erabiliz.
Azken puntuen azpian, artxibo hau deskonprimituko dela, artxiboko datuak garbitu eta prozesatu egingo dira eta, ondorioz, guztia ETL prozesuaren kanalizaziora joango da, baina hori dagoeneko haratago dago. artikuluaren esparrua. Interesgarria eta erabilgarria izan balitz, pozik jarraituko dut Apache Airflow-erako ETL soluzioak eta haien zatiak deskribatzen.

Iturria: www.habr.com

Gehitu iruzkin berria