ETL maitiro ekutora data kubva kune email muApache Airflow

ETL maitiro ekutora data kubva kune email muApache Airflow

Hazvina mhosva kuti tekinoroji inokura sei, tambo yemaitiro echinyakare inogara ichitevera kumashure kwebudiriro. Izvi zvinogona kunge zviri nekuda kwekuchinja kwakanaka, zvinhu zvevanhu, zvinodiwa zvetekinoroji, kana chimwewo chinhu. Mumunda wekugadzirisa data, masosi edata ndiwo anonyanya kuburitsa muchikamu ichi. Hazvina mhosva kuti tinorota zvakadini kubvisa izvi, asi kusvika ikozvino chikamu che data chinotumirwa muvatumwa pakarepo uye maemail, tisingatauri mamwe mafomati ekare. Ini ndinokukoka iwe kuti uparadzanise imwe yesarudzo dzeApache Airflow, ichiratidza matorero aungaita data kubva kumaemail.

prehistory

Yakawanda yedata ichiri kutamiswa kuburikidza ne-e-mail, kubva pakukurukurirana kwevanhu kuenda kumazinga ekudyidzana pakati pemakambani. Zvakanaka kana zvichikwanisika kunyora interface kuti uwane data kana kuisa vanhu muhofisi vanozoisa ruzivo urwu mune mamwe masosi ari nyore, asi kazhinji izvi zvinogona kunge zvisingaite. Basa chairo randakatarisana naro raive rekubatanidza iyo ine mukurumbira weCRM system kudura re data, uyezve kune iyo OLAP system. Zvakaitika munhoroondo kuti kukambani yedu kushandiswa kwegadziriro iyi kwaive nyore mune imwe nzvimbo yebhizinesi. Naizvozvo, munhu wese aida chaizvo kukwanisa kushanda nedata kubva kune ino yechitatu-bato system zvakare. Chekutanga pane zvese, chokwadi, mukana wekuwana data kubva kune yakavhurika API yakadzidzwa. Nehurombo, iyo API haina kuvhara kuwana ese anodiwa data, uye, mumashoko akareruka, yaive munzira dzakawanda yakatsveyama, uye tsigiro yehunyanzvi yaisada kana kusagona kusangana nepakati kuti ipe mashandiro akazara. Asi iyi sisitimu yakapa mukana wekugashira data rakarasika netsamba nenzira yekubatanidza yekuburitsa dura.

Izvo zvinofanirwa kucherechedzwa kuti iyi yanga isiri iyo chete nyaya iyo bhizinesi yaida kuunganidza data kubva kumaemail kana pakarepo vatumwa. Nekudaro, mune iyi kesi, isu hatina kukwanisa kufurira kambani yechitatu-bato inopa chikamu che data chete nenzira iyi.

apache airflow

Kuvaka ETL maitiro, isu tinowanzo shandisa Apache Airflow. Kuti muverengi asina kujaira tekinoroji iyi anzwisise zviri nani kuti inotaridzika sei mumamiriro ezvinhu uye kazhinji, ini ndichatsanangura akati wandei ekutanga.

Apache Airflow ipuratifomu yemahara iyo inoshandiswa kuvaka, kuita uye kuongorora ETL (Extract-Transform-Loading) maitiro muPython. Pfungwa huru muAirflow inotungamirwa acyclic girafu, uko ma vertices egirafu ari chaiwo maitiro, uye mipendero yegirafu kuyerera kwekutonga kana ruzivo. Chiitiko chinogona kungodaidza chero basa rePython, kana rinogona kuve nehana yakaomesesa kubva pakudaidza akati wandei mabasa mumamiriro ekirasi. Kune anowanzo mashandiro, kwatove nezvakawanda zvakagadzirirwa-zvakagadzirwa izvo zvinogona kushandiswa semaitiro. Zviitiko zvakadaro zvinosanganisira:

  • vashandisi - yekuendesa data kubva kune imwe nzvimbo kuenda kune imwe, semuenzaniso, kubva patafura yedatabase kuenda kudura re data;
  • sensors - yekumirira kuitika kwechimwe chiitiko uye kutungamirira kuyerera kwekutonga kune vertices inotevera yegirafu;
  • zvikorekedzo - zvepazasi-level mashandiro, semuenzaniso, kuwana data kubva kune dhatabhesi tafura (inoshandiswa muzvirevo);
  • uye zvakadaro.

Zvingave zvisina kukodzera kutsanangura Apache Airflow zvakadzama muchinyorwa ichi. Sumo pfupi dzinogona kutariswa pano kana pano.

Hook yekutora data

Chokutanga pane zvose, kugadzirisa dambudziko, tinofanira kunyora chirauro chatinogona nacho:

  • batanidza kune email
  • tsvaga tsamba chaiyo
  • gamuchira data kubva mutsamba.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Mufungo ndeuyu: tinobatanidza, tsvaga iyo yekupedzisira inonyanya kukosha tsamba, kana paine vamwe, isu tinovafuratira. Iri basa rinoshandiswa, nekuti gare gare mavara ane data rese rekutanga. Kana zvisiri izvo, saka unogona kudzosa rondedzero yemavara ese, kana kugadzirisa yekutanga, uye mamwe pane inotevera pass. Kazhinji, zvinhu zvose, senguva dzose, zvinoenderana nebasa racho.

Isu tinowedzera maviri anobatsira mabasa kune hoko: yekurodha faira uye yekurodha faira uchishandisa chinongedzo kubva kune email. Nenzira, ivo vanogona kutamirwa kune opareta, zvinoenderana nehuwandu hwekushandisa basa iri. Chii chimwe chekuwedzera kune hoko, zvakare, zvinoenderana nebasa: kana mafaera akagamuchirwa nekukurumidza mutsamba, iwe unogona kudhawunirodha zvakabatanidzwa kune tsamba, kana iyo data yakagamuchirwa mutsamba, saka iwe unofanirwa kuparura tsamba, etc. Mune mhaka yangu, iyo tsamba inouya neine imwe link kune archive, iyo yandinofanira kuisa mune imwe nzvimbo uye kutanga imwezve kugadzirisa maitiro.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Iyo kodhi iri nyore, saka haitomboda imwe tsananguro. Ini ndichangokuudza nezve mutsara wemashiripiti imap_conn_id. Apache Airflow inochengetedza maparamendi ekubatanidza (login, password, kero, uye mamwe ma paramita) anogona kuwanikwa netambo identifier. Zvinotaridzika, manejimendi yekubatanidza inotaridzika seizvi

ETL maitiro ekutora data kubva kune email muApache Airflow

Sensor yekumirira data

Sezvo isu tatoziva nzira yekubatanidza uye kugamuchira data kubva kune tsamba, isu tinogona ikozvino kunyora sensor kuvamirira. Mune yangu, hazvina kushanda kunyora opareta ipapo ipapo anozogadzirisa iyo data, kana iripo, nekuti mamwe maitiro anoshanda zvichienderana nedata rakagamuchirwa kubva kune iyo mail, kusanganisira iyo inotora inoenderana data kubva kune mamwe masosi (API, telephony. , web metrics, nezvimwewo). etc.). Ndichakupa muenzaniso. Mushandisi mutsva akaonekwa muCRM system, uye isu hatisati taziva nezve UUID yake. Zvadaro, pakuedza kugamuchira data kubva kuSIP telephony, tichagamuchira mafoni akasungirirwa kuUUID yayo, asi isu hatizokwanisi kuchengetedza nekuvashandisa nemazvo. Munyaya dzakadai, zvakakosha kuchengeta mupfungwa kutsamira kweiyo data, kunyanya kana ichibva kwakasiyana. Izvi, zvechokwadi, matanho asina kukwana kuchengetedza kutendeseka kwedata, asi mune zvimwe zviitiko zvakakosha. Hongu, uye kuregera kutora zviwanikwa hakunawo musoro.

Saka, sensor yedu inovhura inotevera vertices yegirafu kana paine ruzivo rutsva mutsamba, uye nyorawo ruzivo rwekare serusina basa.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Isu tinogamuchira uye tinoshandisa data

Kuti ugamuchire uye kugadzirisa data, unogona kunyora akasiyana opareta, unogona kushandisa akagadzirira-akagadzirwa. Sezvo kusvika parizvino pfungwa yacho idiki - kutora data kubva mutsamba, semuenzaniso, ini ndinokurudzira iyo yakajairwa PythonOperator.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Nenzira, kana tsamba yako yekambani iriwo pa mail.ru, saka haugone kutsvaga mabhii nenyaya, mutumi, nezvimwe. Kudzoka muna 2016, vakavimbisa kuisuma, asi sezviri pachena vakachinja pfungwa dzavo. Ndakagadzirisa dambudziko iri nekugadzira dhairekitori rakaparadzana remavara anodiwa uye kumisikidza sefa yemavara anodiwa mune iyo tsamba yewebhu interface. Saka, chete mavara anodiwa uye mamiriro ekutsvaga, mune yangu, zviri nyore (UNSEEN) anopinda mune iyi folda.

Kupfupisa, isu tine kutevedzana kunotevera: tinotarisa kana kune mavara matsva anosangana nemamiriro ezvinhu, kana aripo, saka tinodhawunirodha archive tichishandisa chinongedzo kubva mutsamba yekupedzisira.
Pasi pemadonhwe ekupedzisira, inosiiwa kuti ino chengetedzo ichaburitswa, data kubva mudura richacheneswa nekugadziriswa, uye nekudaro, chinhu chose chichaenda mberi kune pombi yeETL maitiro, asi izvi zvatopfuura. hukuru hwechinyorwa. Kana zvikazonakidza uye zvichibatsira, saka ini ndichaenderera mberi ndichitsanangura ETL mhinduro uye zvikamu zvadzo zveApache Airflow.

Source: www.habr.com

Voeg