Njira ya ETL yopezera deta kuchokera ku imelo ku Apache Airflow

Njira ya ETL yopezera deta kuchokera ku imelo ku Apache Airflow

Ziribe kanthu kuchuluka kwa ukadaulo ukukula, njira zingapo zakale zimatsata chitukuko. Izi zitha kukhala chifukwa cha kusintha kosavuta, zinthu zaumunthu, zosowa zaukadaulo, kapena china. Pankhani yokonza deta, magwero a deta ndi omwe amawulula kwambiri gawoli. Ziribe kanthu kuti timalota bwanji kuti tichotse izi, koma mpaka pano gawo la deta limatumizidwa mwamsanga amithenga ndi maimelo, osatchula mitundu yambiri yakale. Ndikukupemphani kuti muwone imodzi mwazosankha za Apache Airflow, zomwe zikuwonetsa momwe mungasonkhanitsire deta kuchokera pamaimelo.

prehistory

Zambiri zimatumizidwabe kudzera pa imelo, kuchokera pakulankhulana pakati pa anthu kupita kumiyezo yolumikizana pakati pamakampani. Ndibwino ngati mutha kulemba mawonekedwe kuti mulandire deta kapena kuyika anthu muofesi omwe angalowetse izi m'malo osavuta, koma nthawi zambiri izi sizingakhalepo. Ntchito yeniyeni yomwe ndinakumana nayo inali yolumikiza dongosolo lodziwika bwino la CRM ku malo osungiramo deta, ndiyeno ku dongosolo la OLAP. M'mbiri, kwa kampani yathu, kugwiritsa ntchito dongosololi kunali kosavuta m'dera linalake la bizinesi. Choncho, aliyense ankafuna kuti athe kugwira ntchito ndi deta kuchokera dongosolo lachitatu chipani komanso. Choyamba, ndithudi, kuthekera kopeza deta kuchokera ku API yotseguka kunafufuzidwa. Tsoka ilo, API sinaphimbe kupeza zonse zofunika, ndipo, mwachidule, inali yokhotakhota m'njira zambiri, ndipo chithandizo chaukadaulo sichinkafuna kapena sichinathe kukumana ndi theka kuti chipereke magwiridwe antchito ambiri. Koma kachitidwe kameneka kamapereka mwayi wolandila deta yosowa nthawi ndi nthawi kudzera pa imelo ngati ulalo wotsitsa zosungidwazo.

Tiyenera kukumbukira kuti izi sizinali zokhazokha zomwe bizinesiyo inkafuna kusonkhanitsa deta kuchokera ku maimelo kapena amithenga apompopompo. Komabe, pamenepa, sitingathe kukopa kampani yachitatu yomwe imapereka gawo la deta motere.

Apache Airflow

Kupanga njira za ETL, nthawi zambiri timagwiritsa ntchito Apache Airflow. Kuti wowerenga yemwe sadziwa bwino lusoli kuti amvetse bwino momwe amawonekera m'nkhaniyo komanso mwachizoloΕ΅ezi, ndikufotokozera zingapo zoyambirira.

Apache Airflow ndi nsanja yaulere yomwe imagwiritsidwa ntchito pomanga, kuyendetsa, ndikuwunika njira za ETL (Extract-Transform-Loading) mu Python. Lingaliro lalikulu mu Airflow ndi graph acyclic graph, pomwe ma vertices a graph ndi njira zenizeni, ndipo m'mphepete mwa graph ndikuyenda kwa ulamuliro kapena chidziwitso. Njira imatha kutchula ntchito iliyonse ya Python, kapena ikhoza kukhala ndi malingaliro ovuta kwambiri oyitanitsa ntchito zingapo motsatira kalasi. Pazochita zodziwika bwino, pali zambiri zomwe zapangidwa kale zomwe zingagwiritsidwe ntchito ngati njira. Zoterezi zikuphatikizapo:

  • ogwira ntchito - kusamutsa deta kuchokera kumalo ena kupita kumalo ena, mwachitsanzo, kuchokera pa tebulo la database kupita kumalo osungiramo deta;
  • masensa - podikirira kuchitika kwa chochitika china ndikuwongolera kuwongolera kumayendedwe otsatira a graph;
  • mbedza - zogwirira ntchito zapansi, mwachitsanzo, kupeza deta kuchokera pa tebulo la database (logwiritsidwa ntchito m'mawu);
  • ndi zina zotero.

Zingakhale zosayenera kufotokozera Apache Airflow mwatsatanetsatane m'nkhaniyi. Mau oyamba achidule atha kuwonedwa apa kapena apa.

Hook kuti mupeze deta

Choyamba, kuti tithane ndi vutoli tiyenera kulemba mbedza zomwe tingathe:

  • kulumikizana ndi imelo
  • pezani kalata yomwe mukufuna;
  • kulandira deta kuchokera kalata.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Lingaliro ndiloti: timagwirizanitsa, kupeza kalata yomaliza yofunikira, ngati pali ena, timawanyalanyaza. Ntchitoyi imagwiritsidwa ntchito, chifukwa zilembo zam'tsogolo zimakhala ndi zonse zakale. Ngati sizili choncho, ndiye kuti mutha kubweza zilembo zingapo, kapena kukonza yoyamba, ndi ena onse pachiphaso chotsatira. Kawirikawiri, zonse, monga nthawi zonse, zimadalira ntchitoyo.

Timawonjezera ntchito ziwiri zothandizira pa mbedza: kutsitsa fayilo komanso kutsitsa fayilo pogwiritsa ntchito ulalo wa imelo. Mwa njira, amatha kusunthidwa kwa wogwiritsa ntchito, zimatengera pafupipafupi kugwiritsa ntchito ntchitoyi. Ndi chiyani chinanso chowonjezera ku mbedza, kachiwiri, zimadalira ntchitoyo: ngati mafayilo alandiridwa mwamsanga m'kalatayo, ndiye kuti mukhoza kukopera zolembera ku kalatayo, ngati deta yalandiridwa mu kalatayo, ndiye kuti muyenera kuyika kalatayo, ndi zina. Kwa ine, kalatayo imabwera ndi ulalo umodzi ku zosungirako, zomwe ndiyenera kuziyika pamalo ena ndikuyamba kukonzanso.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Khodiyo ndi yosavuta, choncho sifunikanso kufotokozera. Ndingokuuzani zamatsenga mzere imap_conn_id. Apache Airflow imasunga magawo olumikizirana (lolowera, mawu achinsinsi, adilesi, ndi magawo ena) omwe angapezeke ndi chozindikiritsa zingwe. Zowoneka, kasamalidwe ka kulumikizana kumawoneka chonchi

Njira ya ETL yopezera deta kuchokera ku imelo ku Apache Airflow

Sensor kudikirira deta

Popeza tikudziwa kale momwe tingagwirizanitse ndi kulandira deta kuchokera ku makalata, tsopano tikhoza kulemba sensa kuti tidikire. Kwa ine, kulembera nthawi yomweyo wogwiritsa ntchito yemwe adzakonza deta, ngati alipo, sizinagwire ntchito, popeza njira zina zimagwira ntchito potengera zomwe zalandilidwa kuchokera ku makalata, kuphatikizapo zomwe zimatengera deta yokhudzana ndi zina (API, telephony, metrics web), etc.) etc.). Ndikupatsani chitsanzo. Wogwiritsa ntchito watsopano wawonekera mu CRM system, ndipo sitikudziwabe za UUID yake. Kenako, tikayesa kulandira deta kuchokera ku telefoni ya SIP, tidzalandira mafoni olumikizidwa ku UUID yake, koma sitingathe kupulumutsa ndikuzigwiritsa ntchito moyenera. M'mafunso oterowo, ndikofunikira kukumbukira kudalira kwa data, makamaka ngati akuchokera kuzinthu zosiyanasiyana. Izi, ndithudi, njira zosakwanira zosungira kukhulupirika kwa deta, koma nthawi zina ndizofunikira. Inde, komanso kusasamala kuti mukhale ndi chuma kulinso kopanda nzeru.

Chifukwa chake, sensa yathu idzayambitsa ma vertices otsatirawa a graph ngati pali chidziwitso chatsopano mu imelo, ndikuyikanso zomwe zapita kale ngati sizofunikira.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Timalandira ndi kugwiritsa ntchito deta

Kuti mulandire ndi kukonza deta, mukhoza kulemba woyendetsa osiyana, mungagwiritse ntchito okonzeka. Popeza mpaka pano malingaliro ake ndi ang'onoang'ono - kutenga deta kuchokera ku kalatayo, mwachitsanzo, ndikupangira PythonOperator wamba.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Mwa njira, ngati makalata anu akampani alinso pa mail.ru, ndiye kuti simungathe kusaka makalata ndi mutu, wotumiza, ndi zina. Kubwerera ku 2016, adalonjeza kuti adzayambitsa, koma mwachiwonekere adasintha malingaliro awo. Ndinathetsa vutoli popanga chikwatu chosiyana cha zilembo zofunika ndikukhazikitsa fyuluta ya zilembo zofunika pa intaneti yamakalata. Chifukwa chake, zilembo ndi zofunikira zokha zakusaka, kwa ine, mophweka (ZOSAWONEKA) zimalowa mufodayi.

Mwachidule, tili ndi mndandanda wotsatirawu: timayang'ana ngati pali zilembo zatsopano zomwe zikugwirizana ndi zikhalidwe, ngati zilipo, ndiye kuti timatsitsa zolembazo pogwiritsa ntchito ulalo wa chilembo chomaliza.
Pansi pa madontho otsiriza, sikunasiyidwe kuti zosungidwa izi zidzatsegulidwa, deta yochokera kumalo osungirako idzachotsedwa ndi kukonzedwa, ndipo chifukwa chake, chinthu chonsecho chidzapita patsogolo pa ndondomeko ya ndondomeko ya ETL, koma izi zadutsa kale. kukula kwa nkhaniyo. Zikadakhala zosangalatsa komanso zothandiza, ndiye kuti ndipitiliza kufotokoza mayankho a ETL ndi magawo awo a Apache Airflow.

Source: www.habr.com

Kuwonjezera ndemanga