Inkqubo ye-ETL yokufumana idatha kwi-imeyile kwi-Apache Airflow

Inkqubo ye-ETL yokufumana idatha kwi-imeyile kwi-Apache Airflow

Nokuba ingakanani itekhnoloji ephuhlayo, umtya weendlela eziphelelwe lixesha uhlala ulandela uphuhliso. Oku kusenokuba ngenxa yenguqu egudileyo, izinto zabantu, iimfuno zobuchwepheshe, okanye enye into. Kwintsimi yokulungiswa kwedatha, imithombo yedatha yeyona nto ityhilayo kule nxalenye. Kungakhathaliseki ukuba siphupha kangakanani ukulahla oku, kodwa ukuza kuthi ga ngoku inxalenye yedatha ithunyelwa kwizithunywa ezikhawulezayo kunye nee-imeyile, singasathethi ke iifomati ze-archaic. Ndiyakumema ukuba udibanise enye yeenketho ze-Apache Airflow, ebonisa indlela ongathatha ngayo idatha kwii-imeyile.

ukubuzwa

Uninzi lwedatha lusakhutshelwa nge-imeyile, ukusuka kunxibelelwano phakathi kwabantu ukuya kwimigangatho yentsebenziswano phakathi kweenkampani. Kulungile ukuba kunokwenzeka ukubhala ujongano ukufumana idatha okanye ukubeka abantu eofisini abaya kungenisa olu lwazi kwimithombo eluncedo ngakumbi, kodwa amaxesha amaninzi oku kusenokungenzeki. Umsebenzi okhethekileyo endijongene nawo yayikukudibanisa inkqubo yeCRM edume kakubi kwindawo yokugcina idatha, kwaye emva koko kwinkqubo ye-OLAP. Kwenzeke ngokwembali ukuba kwinkampani yethu ukusetyenziswa kwale nkqubo kwakulula kwindawo ethile yeshishini. Ke ngoko, wonke umntu wayefuna ngenene ukukwazi ukusebenza ngedatha evela kule nkqubo yomntu wesithathu ngokunjalo. Okokuqala, ngokuqinisekileyo, ithuba lokufumana idatha kwi-API evulekile yafundwa. Ngelishwa, i-API ayizange igubungele ukufumana yonke idatha efunekayo, kwaye, ngeendlela ezilula, yayiziindlela ezininzi ezigwenxa, kwaye inkxaso yezobuchwephesha ayifuni okanye ayikwazanga ukuhlangabezana nesiqingatha sokubonelela ngokusebenza okubanzi. Kodwa le nkqubo inike ithuba lokufumana ngamaxesha athile idatha elahlekileyo ngeposi ngendlela yekhonkco lokukhulula i-archive.

Kufuneka kuqatshelwe ukuba oku kwakungeyona kuphela imeko apho ishishini lalifuna ukuqokelela idatha kwii-imeyile okanye kwizithunywa ezikhawulezayo. Nangona kunjalo, kulo mzekelo, asikwazanga ukuphembelela inkampani yesithathu enikezela inxalenye yedatha kuphela ngale ndlela.

i-apache airflow

Ukwakha iinkqubo ze-ETL, sihlala sisebenzisa iApache Airflow. Ukuze umfundi ongaqhelananga nale teknoloji aqonde ngcono indlela ekhangeleka ngayo kumxholo kwaye ngokubanzi, ndiya kuchaza isibini sentshayelelo.

I-Apache Airflow yindawo yamahhala esetyenziselwa ukwakha, ukwenza kunye nokubeka iliso kwiinkqubo ze-ETL (i-Extract-Transform-Loading) kwiPython. Ingcamango ephambili kwi-Airflow yigrafu ye-acyclic eqondisiweyo, apho i-vertices yegrafu ziinkqubo ezithile, kwaye imiphetho yegrafu yindlela yokulawula okanye ulwazi. Inkqubo inokubiza ngokulula nawuphi na umsebenzi wePython, okanye inokuba nengqiqo entsonkothileyo ukusuka ekubizeni ngokulandelelana imisebenzi emininzi kumxholo weklasi. Kweyona misebenzi ixhaphakileyo, sele kukho uphuhliso oluninzi olwenziweyo olunokusetyenziswa njengeenkqubo. Olu phuhliso lubandakanya:

  • abaqhubi - ukudlulisa idatha ukusuka kwenye indawo ukuya kwenye, umzekelo, ukusuka kwitafile yedatha ukuya kwindawo yokugcina idatha;
  • abenzi boluvo - ukulinda ukwenzeka kwesiganeko esithile kunye nokuqondisa ukuhamba kokulawula kwii-vertices ezilandelayo zegrafu;
  • iikhonkco - kwimisebenzi yezinga eliphantsi, umzekelo, ukufumana idatha kwitafile yedatha (esetyenziswe kwiingxelo);
  • njalo njalo.

Kuya kuba yinto engafanelekanga ukuchaza iApache Airflow ngokweenkcukacha kweli nqaku. Iintshayelelo ezimfutshane zinokujongwa apha okanye apha.

Hook ukufumana idatha

Okokuqala, ukusombulula ingxaki, kufuneka sibhale ikhonkco esinokuthi ngalo:

  • qhagamshela kwi-imeyile
  • fumana unobumba ochanekileyo
  • fumana idatha kwileta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Ingqiqo yile: sidibanisa, sifumana unobumba wokugqibela ofanelekileyo, ukuba kukho abanye, asibahoyi. Lo msebenzi usetyenziswa, kuba iileta zamva ziqulethe yonke idatha yangaphambili. Ukuba oku akunjalo, ngoko ungabuyisela uluhlu lwabo bonke oonobumba, okanye usebenze eyokuqala, kunye nabanye kwipasi elandelayo. Ngokubanzi, yonke into, njengesiqhelo, ixhomekeke kumsebenzi.

Songeza imisebenzi emibini yokuncedisa kwi-hook: ukukhuphela ifayile kunye nokukhuphela ifayile usebenzisa ikhonkco kwi-imeyile. Ngendlela, banokushenxiswa kumqhubi, kuxhomekeke kwixesha lokusebenzisa lo msebenzi. Yintoni enye yokongeza kwi-hook, kwakhona, kuxhomekeke kumsebenzi: ukuba iifayile zifunyenwe ngokukhawuleza kwileta, ngoko unokukhuphela izihlomelo kwileta, ukuba idatha ifunyenwe kwileta, ngoko kufuneka uhlalutye ileta, njl. Kwimeko yam, ileta iza nekhonkco enye kwindawo yokugcina, endiyidingayo ukuyibeka kwindawo ethile kwaye ndiqalise inkqubo yokuqhubela phambili.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Ikhowudi ilula, ngoko ayifuni ngcaciso eyongezelelekileyo. Ndiza kukuxelela ngomgca womlingo imap_conn_id. I-Apache Airflow igcina iiparamitha zoqhagamshelo (ukungena, igama lokugqitha, idilesi, kunye nezinye iiparamitha) ezinokufikelelwa ngesichongi somtya. Ngokubonakalayo, ulawulo loqhagamshelwano lujongeka ngolu hlobo

Inkqubo ye-ETL yokufumana idatha kwi-imeyile kwi-Apache Airflow

Inzwa yokulinda idatha

Ekubeni sele siyayazi indlela yokudibanisa kunye nokufumana idatha kwi-imeyile, ngoku sinokubhala inzwa yokuyilinda. Kwimeko yam, ayizange isebenze ukubhala umqhubi ngoko nangoko oya kuqhuba idatha, ukuba ikhona, kuba ezinye iinkqubo zisebenza ngokusekelwe kwidatha efunyenwe kwi-imeyile, kubandakanywa nalabo bathatha idatha ehambelanayo kweminye imithombo (API, ucingo). , iimetriki zewebhu, njl. njl.). Ndiza kukunika umzekelo. Umsebenzisi omtsha uvele kwinkqubo yeCRM, kwaye asikayazi malunga ne-UUID yakhe. Emva koko, xa uzama ukufumana idatha kwi-SIP telephony, siya kufumana iifowuni ezibotshelelwe kwi-UUID yayo, kodwa asiyi kukwazi ukugcina kunye nokuzisebenzisa ngokuchanekileyo. Kwimiba enjalo, kubalulekile ukugcina engqondweni ukuxhomekeka kwedatha, ngakumbi ukuba ivela kwimithombo eyahlukeneyo. Oku, ngokuqinisekileyo, amanyathelo aneleyo okugcina ingqibelelo yedatha, kodwa kwezinye iimeko ziyimfuneko. Ewe, kwaye ukuncama ukufumana izixhobo nako akukho ngqiqweni.

Ke, uluvo lwethu luya kuqalisa ii-vertices ezilandelayo zegrafu ukuba kukho ulwazi olutsha kwi-imeyile, kwaye iphawule ulwazi lwangaphambili njengento engabalulekanga.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Sifumana kwaye sisebenzise idatha

Ukufumana kunye nokusebenza kwedatha, ungabhala umqhubi owahlukileyo, ungasebenzisa esele zenziwe. Ukusukela ngoku ingqiqo iyinto encinci - ukuthatha idatha kwileta, umzekelo, ndiphakamisa umgangatho wePythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Ngendlela, ukuba i-imeyile yakho yenkampani ikwi-mail.ru, ngoko awuyi kukwazi ukukhangela iileta ngesihloko, umthumeli, njl. Emuva ngo-2016, bathembisa ukuzisa, kodwa kubonakala ukuba batshintshe iingqondo zabo. Ndiyisombulule le ngxaki ngokwenza ifolda eyahlukileyo yeeleta eziyimfuneko kunye nokumisela icebo lokucoca iileta eziyimfuneko kwi-mail interface yewebhu. Ngaloo ndlela, kuphela iileta eziyimfuneko kunye neemeko zokukhangela, kwimeko yam, ngokulula (UNSEEN) ukungena kule folda.

Ukushwankathela, sinokulandelelana okulandelayo: sijonga ukuba kukho iileta ezintsha ezihlangabezana neemeko, ukuba zikhona, ngoko sikhuphela i-archive usebenzisa ikhonkco kwileta yokugqibela.
Ngaphantsi kwamachaphaza okugqibela, ishiywe ukuba le ndawo yogcino iya kukhutshwa, idatha evela kwi-archive iya kucocwa kwaye iqhutywe, kwaye ngenxa yoko, yonke into iya kuqhubela phambili kwipayipi yenkqubo ye-ETL, kodwa oku sele kudlule. ububanzi benqaku. Ukuba ibonakale inomdla kwaye iluncedo, ngoko ndiya kuqhubeka ndichaza izisombululo ze-ETL kunye namalungu azo eApache Airflow.

umthombo: www.habr.com

Yongeza izimvo