Inqubo ye-ETL yokuthola idatha ku-imeyili ku-Apache Airflow

Inqubo ye-ETL yokuthola idatha ku-imeyili ku-Apache Airflow

Akunandaba ukuthi kukhula kangakanani ubuchwepheshe, uchungechunge lwezindlela eziphelelwe yisikhathi luhlala lulandela intuthuko. Lokhu kungase kube ngenxa yenguquko eshelelayo, izici zomuntu, izidingo zobuchwepheshe, noma enye into. Emkhakheni wokucubungula idatha, imithombo yedatha yiyona eveza kakhulu kule ngxenye. Kungakhathaliseki ukuthi siphupha kangakanani ngokususa lokhu, kodwa kuze kube manje ingxenye yedatha ithunyelwa ngezithunywa ezisheshayo nama-imeyili, ingasaphathwa efomethi ye-archaic eyengeziwe. Ngiyakumema ukuthi uhlukanise eyodwa yezinketho ze-Apache Airflow, okubonisa ukuthi ungathatha kanjani idatha kuma-imeyili.

prehistory

Idatha eminingi isadluliswa nge-imeyili, isuka kwezokuxhumana nabantu iye kumazinga okuxhumana phakathi kwezinkampani. Kuhle uma kungenzeka ukubhala i-interface ukuze uthole idatha noma ubeke abantu ehhovisi abazofaka lolu lwazi emithonjeni elula, kodwa ngokuvamile lokhu kungase kungenzeki. Umsebenzi othile engibhekane nawo wawukuxhuma isistimu ye-CRM edume kabi endaweni yokugcina idatha, bese kuba uhlelo lwe-OLAP. Kwenzeka ngokomlando ukuthi enkampanini yethu ukusetshenziswa kwalolu hlelo kwakulula endaweni ethile yebhizinisi. Ngakho-ke, wonke umuntu wayefuna ngempela ukukwazi ukusebenza ngedatha evela kulolu hlelo lomuntu wesithathu. Okokuqala, yiqiniso, ithuba lokuthola idatha ku-API evulekile yafundwa. Ngeshwa, i-API ayizange ihlanganise ukuthola yonke idatha edingekayo, futhi, ngamagama alula, yayigwegwile ngezindlela eziningi, futhi ukusekelwa kwezobuchwepheshe kwakungafuni noma kwakungakwazi ukuhlangabezana nohhafu ukuze kunikeze ukusebenza okubanzi. Kodwa lolu hlelo lunikeze ithuba lokuthola idatha elahlekile ngezikhathi ezithile ngeposi ngendlela yesixhumanisi sokukhipha ingobo yomlando.

Kufanele kuqashelwe ukuthi lesi kwakungesona kuphela isimo lapho ibhizinisi lalifuna ukuqoqa idatha kuma-imeyili noma izithunywa ezisheshayo. Nokho, kulesi simo, asikwazanga ukuthonya inkampani yezinkampani zangaphandle ehlinzeka ngengxenye yedatha ngale ndlela kuphela.

I-Apache Airflow

Ukwakha izinqubo ze-ETL, sivamise ukusebenzisa i-Apache Airflow. Ukuze umfundi ongajwayele lobu buchwepheshe aqonde kangcono ukuthi bubukeka kanjani kumongo futhi ngokujwayelekile, ngizochaza ezimbalwa ezethulayo.

I-Apache Airflow iyinkundla yamahhala esetshenziselwa ukwakha, ukwenza nokuqapha izinqubo ze-ETL (Extract-Transform-Loading) ePython. Umqondo oyinhloko ku-Airflow igrafu ye-acyclic eqondisiwe, lapho ama-vertices egrafu eyizinqubo ezithile, futhi imiphetho yegrafu iwukugeleza kokulawula noma ulwazi. Inqubo ingavele ibize noma yimuphi umsebenzi wePython, noma ingaba nengqondo eyinkimbinkimbi kakhulu kusukela ekubizeni ngokulandelana imisebenzi eminingana kumongo wekilasi. Emisebenzini evame kakhulu, sekuvele kunentuthuko eminingi esenziwe ngomumo engasetshenziswa njengezinqubo. Intuthuko enjalo ihlanganisa:

  • opharetha - ukudlulisa idatha isuka kwenye indawo iye kwenye, isibonelo, isuka kuthebula ledathabheyisi iye endaweni yokugcina idatha;
  • izinzwa - ukulinda ukwenzeka kwesenzakalo esithile nokuqondisa ukugeleza kokulawula kuma-vertices alandelayo wegrafu;
  • izingwegwe - zemisebenzi yezinga eliphansi, isibonelo, ukuthola idatha kusuka kuthebula ledathabhethi (elisetshenziswa ezitatimendeni);
  • nokunye.

Kuyoba okungafanelekile ukuchaza i-Apache Airflow ngokuningiliziwe kulesi sihloko. Izingeniso ezimfishane zingabukwa lapha noma lapha.

Hook ukuze uthole idatha

Okokuqala, ukuze sixazulule inkinga, sidinga ukubhala ingwegwe esingakwazi ngayo:

  • xhuma ku-imeyili
  • thola incwadi efanele
  • thola idatha evela encwadini.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Umqondo uthi: sixhuma, sithole uhlamvu lokugcina olufaneleka kakhulu, uma kukhona ezinye, asizinaki. Lo msebenzi usetshenziswa, ngoba izinhlamvu zakamuva ziqukethe yonke idatha yangaphambili. Uma kungenjalo, ungabuyisela uxhaxha lwazo zonke izinhlamvu, noma ucubungule eyokuqala, bese esele ngephasi elandelayo. Ngokuvamile, yonke into, njengenjwayelo, incike emsebenzini.

Sengeza imisebenzi emibili yokusiza kuhuku: ukulanda ifayela nokulanda ifayela usebenzisa isixhumanisi esivela ku-imeyili. Ngendlela, bangathuthelwa ku-opharetha, kuya ngemvamisa yokusebenzisa lokhu kusebenza. Yini enye ongayengeza ku-hook, futhi, incike emsebenzini: uma amafayela atholwe ngokushesha encwadini, ungalanda okunamathiselwe encwadini, uma idatha itholwa encwadini, khona-ke udinga ukuhlukanisa incwadi, njll. Endabeni yami, incwadi iza nesixhumanisi esisodwa sengobo yomlando, engidinga ukuyibeka endaweni ethile futhi ngiqale inqubo yokucubungula eyengeziwe.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Ikhodi ilula, ngakho ayidingi incazelo eyengeziwe. Ngizokutshela nje ngomugqa womlingo imap_conn_id. I-Apache Airflow igcina amapharamitha okuxhumana (ukungena ngemvume, iphasiwedi, ikheli, namanye amapharamitha) angafinyelelwa ngesihlonzi seyunithi yezinhlamvu. Ngokubukeka, ukuphathwa kokuxhumana kubukeka kanje

Inqubo ye-ETL yokuthola idatha ku-imeyili ku-Apache Airflow

Inzwa izolinda idatha

Njengoba sesivele sazi ukuthi singaxhuma futhi samukele kanjani idatha evela kumeyili, manje sesingakwazi ukubhala inzwa ukuze siyilinde. Endabeni yami, akuzange kusebenze ukubhala isisebenzisi ngokushesha esizocubungula idatha, uma ikhona, ngoba ezinye izinqubo zisebenza ngokusekelwe kudatha etholwe nge-imeyili, kuhlanganise naleyo ethatha idatha ehlobene kweminye imithombo (API, ucingo). , amamethrikhi ewebhu, njll.). njll.). Ngizokunikeza isibonelo. Umsebenzisi omusha uvele ohlelweni lwe-CRM, futhi namanje asazi nge-UUID yakhe. Bese, lapho sizama ukuthola idatha evela kucingo lwe-SIP, sizothola amakholi axhunywe ku-UUID yayo, kodwa ngeke sikwazi ukuwalondoloza siwasebenzise ngendlela efanele. Ezindabeni ezinjalo, kubalulekile ukukhumbula ukuncika kwedatha, ikakhulukazi uma ivela emithonjeni ehlukene. Lezi, yiqiniso, izinyathelo ezinganele zokulondoloza ubuqotho bedatha, kodwa kwezinye izimo ziyadingeka. Yebo, futhi ukungenzi lutho ngokuthatha izinsiza nakho akunangqondo.

Ngakho-ke, inzwa yethu izokwethula ama-vertices alandelayo egrafu uma kukhona ulwazi olusha kumeyili, futhi iphinde imake ulwazi lwangaphambilini njengelungabalulekile.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Sithola futhi sisebenzisa idatha

Ukwamukela nokucubungula idatha, ungabhala opharetha ohlukile, ungasebenzisa esenziwe ngomumo. Njengoba i-logic isencane - ukuthatha idatha encwadini, isibonelo, ngiphakamisa i-PythonOperator ejwayelekile

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Ngendlela, uma i-imeyili yakho yenkampani nayo iku-mail.ru, ngeke ukwazi ukucinga izincwadi ngesihloko, umthumeli, njll. Emuva ngo-2016, bathembisa ukuyethula, kodwa ngokusobala bashintsha imiqondo yabo. Ngixazulule le nkinga ngokwakha ifolda ehlukile yezinhlamvu ezidingekayo futhi ngimise isihlungi sezinhlamvu ezidingekayo kusixhumi esibonakalayo sewebhu. Ngakho-ke, kuphela izinhlamvu nemibandela edingekayo yokusesha, esimweni sami, kalula (OKUNGABONWA) ukungena kule folda.

Ukufingqa, sinokulandelana okulandelayo: sibheka ukuthi zikhona yini izinhlamvu ezintsha ezihlangabezana nemibandela, uma zikhona, bese silanda ingobo yomlando sisebenzisa isixhumanisi esivela encwadini yokugcina.
Ngaphansi kwamachashazi okugcina, kushiywe ukuthi le ngobo yomlando izokhishwa, idatha evela kungobo yomlando izosulwa futhi icutshungulwe, futhi ngenxa yalokho, yonke into izoqhubekela phambili epayipini lenqubo ye-ETL, kodwa lokhu sekungaphezu kwalokho. ububanzi besihloko. Uma kubonakale kuthakazelisa futhi kuwusizo, ngizobe ngizoqhubeka nokuchaza izixazululo ze-ETL nezingxenye zazo ze-Apache Airflow.

Source: www.habr.com

Engeza amazwana