Procesi ETL për marrjen e të dhënave nga emaili në Apache Airflow

Procesi ETL për marrjen e të dhënave nga emaili në Apache Airflow

Pavarësisht se sa shumë zhvillohet teknologjia, një varg qasjesh të vjetruara mbeten gjithmonë pas zhvillimit. Kjo mund të jetë për shkak të një tranzicioni të qetë, faktorëve njerëzorë, nevojave teknologjike ose diçka tjetër. Në fushën e përpunimit të të dhënave, burimet e të dhënave janë më zbuluesit në këtë pjesë. Pavarësisht se sa shumë ëndërrojmë ta heqim qafe këtë, por deri më tani një pjesë e të dhënave dërgohen në mesazhe të çastit dhe email, për të mos përmendur formate më arkaike. Ju ftoj të çmontoni një nga opsionet për Apache Airflow, duke ilustruar se si mund të merrni të dhëna nga emailet.

parahistorinë

Shumë të dhëna ende transferohen përmes postës elektronike, nga komunikimet ndërpersonale te standardet e ndërveprimit midis kompanive. Është mirë nëse është e mundur të shkruash një ndërfaqe për të marrë të dhëna ose të vendosësh njerëz në zyrë që do ta fusin këtë informacion në burime më të përshtatshme, por shpesh kjo thjesht mund të mos jetë e mundur. Detyra specifike me të cilën u përballa ishte lidhja e sistemit famëkeq CRM me depon e të dhënave dhe më pas me sistemin OLAP. Kështu ndodhi historikisht që për kompaninë tonë përdorimi i këtij sistemi ishte i përshtatshëm në një fushë të veçantë të biznesit. Prandaj, të gjithë donin vërtet të ishin në gjendje të operonin edhe me të dhëna nga ky sistem i palëve të treta. Para së gjithash, natyrisht, u studiua mundësia e marrjes së të dhënave nga një API e hapur. Fatkeqësisht, API nuk mbuloi marrjen e të gjitha të dhënave të nevojshme dhe, me fjalë të thjeshta, ishte në shumë mënyra të shtrembër, dhe mbështetja teknike nuk donte ose nuk mund të arrinte në gjysmë të rrugës për të ofruar funksionalitet më gjithëpërfshirës. Por ky sistem dha mundësinë për të marrë periodikisht të dhënat e munguara me postë në formën e një lidhjeje për shkarkimin e arkivit.

Duhet të theksohet se ky nuk ishte rasti i vetëm në të cilin biznesi donte të mblidhte të dhëna nga emailet ose mesazhet e çastit. Megjithatë, në këtë rast, ne nuk mund të ndikonim në një kompani të palës së tretë që siguron një pjesë të të dhënave vetëm në këtë mënyrë.

Rrjedha e ajrit Apache

Për të ndërtuar procese ETL, ne më së shpeshti përdorim Apache Airflow. Në mënyrë që një lexues që nuk është i njohur me këtë teknologji të kuptojë më mirë se si duket në kontekst dhe në përgjithësi, unë do të përshkruaj disa nga ato hyrëse.

Apache Airflow është një platformë falas që përdoret për të ndërtuar, ekzekutuar dhe monitoruar proceset ETL (Extract-Transform-Loading) në Python. Koncepti kryesor në Airflow është një grafik jociklik i drejtuar, ku kulmet e grafikut janë procese specifike, dhe skajet e grafikut janë rrjedha e kontrollit ose informacionit. Një proces thjesht mund të thërrasë çdo funksion Python, ose mund të ketë logjikë më komplekse nga thirrja e njëpasnjëshme e disa funksioneve në kontekstin e një klase. Për operacionet më të shpeshta, tashmë ka shumë zhvillime të gatshme që mund të përdoren si procese. Zhvillime të tilla përfshijnë:

  • operatorët - për transferimin e të dhënave nga një vend në tjetrin, për shembull, nga një tabelë e bazës së të dhënave në një depo të dhënash;
  • sensorë - për pritjen e ndodhjes së një ngjarje të caktuar dhe drejtimin e rrjedhës së kontrollit në kulmet pasuese të grafikut;
  • grepa - për operacione të nivelit më të ulët, për shembull, për të marrë të dhëna nga një tabelë e bazës së të dhënave (përdoret në deklarata);
  • etj

Do të ishte e papërshtatshme të përshkruanim në detaje Apache Airflow në këtë artikull. Prezantimet e shkurtra mund të shihen këtu ose këtu.

Hook për marrjen e të dhënave

Para së gjithash, për të zgjidhur problemin, duhet të shkruajmë një goditje me të cilën mund të:

  • lidheni me email
  • gjeni shkronjën e duhur
  • marrin të dhëna nga letra.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logjika është kjo: lidhemi, gjejmë shkronjën e fundit më të rëndësishme, nëse ka të tjera, i shpërfillim. Ky funksion përdoret, sepse shkronjat e mëvonshme përmbajnë të gjitha të dhënat e atyre të mëparshme. Nëse nuk është kështu, atëherë mund të ktheni një grup të të gjitha shkronjave, ose të përpunoni të parën, dhe pjesën tjetër në kalimin tjetër. Në përgjithësi, gjithçka, si gjithmonë, varet nga detyra.

Ne shtojmë dy funksione ndihmëse në grep: për shkarkimin e një skedari dhe për shkarkimin e një skedari duke përdorur një lidhje nga një email. Nga rruga, ato mund të zhvendosen te operatori, kjo varet nga frekuenca e përdorimit të këtij funksioni. Çfarë tjetër të shtoni në grep, përsëri, varet nga detyra: nëse skedarët merren menjëherë në letër, atëherë mund të shkarkoni bashkëngjitjet në letër, nëse të dhënat merren në letër, atëherë duhet të analizoni letrën, etj. Në rastin tim, letra vjen me një lidhje të arkivit, të cilën më duhet ta vendos në një vend të caktuar dhe të filloj procesin e mëtejshëm të përpunimit.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kodi është i thjeshtë, kështu që vështirë se ka nevojë për shpjegime të mëtejshme. Unë do t'ju tregoj vetëm për linjën magjike imap_conn_id. Apache Airflow ruan parametrat e lidhjes (identifikimi, fjalëkalimi, adresa dhe parametra të tjerë) që mund të arrihen nga një identifikues i vargut. Vizualisht, menaxhimi i lidhjes duket kështu

Procesi ETL për marrjen e të dhënave nga emaili në Apache Airflow

Sensori për të pritur të dhënat

Meqenëse ne tashmë dimë se si të lidhemi dhe të marrim të dhëna nga posta, tani mund të shkruajmë një sensor për t'i pritur ato. Në rastin tim, nuk funksionoi të shkruaj menjëherë një operator që do të përpunojë të dhënat, nëse ka, sepse proceset e tjera funksionojnë bazuar në të dhënat e marra nga posta, duke përfshirë ato që marrin të dhëna të lidhura nga burime të tjera (API, telefonia , matjet e uebit, etj.) etj.). Unë do t'ju jap një shembull. Një përdorues i ri është shfaqur në sistemin CRM dhe ne ende nuk dimë për UUID-në e tij. Më pas, kur përpiqemi të marrim të dhëna nga telefonia SIP, do të marrim telefonata të lidhura me UUID-në e saj, por nuk do të jemi në gjendje t'i ruajmë dhe t'i përdorim ato siç duhet. Në çështje të tilla, është e rëndësishme të kihet parasysh varësia e të dhënave, veçanërisht nëse ato janë nga burime të ndryshme. Këto janë, natyrisht, masa të pamjaftueshme për të ruajtur integritetin e të dhënave, por në disa raste ato janë të nevojshme. Po, dhe përtacia për të zënë burime është gjithashtu irracionale.

Kështu, sensori ynë do të nisë kulmet pasuese të grafikut nëse ka informacion të freskët në postë, dhe gjithashtu do të shënojë informacionin e mëparshëm si të parëndësishëm.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Ne marrim dhe përdorim të dhëna

Për të marrë dhe përpunuar të dhëna, mund të shkruani një operator të veçantë, mund të përdorni ato të gatshme. Meqenëse logjika është ende e parëndësishme - për të marrë të dhëna nga letra, për shembull, unë sugjeroj PythonOperatorin standard

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Nga rruga, nëse posta juaj e korporatës është gjithashtu në mail.ru, atëherë nuk do të jeni në gjendje të kërkoni letra sipas subjektit, dërguesit, etj. Në vitin 2016, ata premtuan ta prezantonin, por mesa duket ndryshuan mendje. E zgjidha këtë problem duke krijuar një dosje të veçantë për letrat e nevojshme dhe duke vendosur një filtër për shkronjat e nevojshme në ndërfaqen e internetit të postës. Kështu, vetëm shkronjat dhe kushtet e nevojshme për kërkimin, në rastin tim, thjesht (të PAPASUR) futen në këtë dosje.

Duke përmbledhur, kemi sekuencën e mëposhtme: kontrollojmë nëse ka shkronja të reja që plotësojnë kushtet, nëse ka, atëherë shkarkojmë arkivin duke përdorur lidhjen nga shkronja e fundit.
Në pikat e fundit, është lënë jashtë që ky arkiv do të zbërthehet, të dhënat nga arkivi do të pastrohen dhe përpunohen dhe si rezultat, e gjithë puna do të shkojë më tej në tubacionin e procesit ETL, por kjo tashmë është përtej. qëllimi i artikullit. Nëse doli interesante dhe e dobishme, atëherë me kënaqësi do të vazhdoj të përshkruaj zgjidhjet ETL dhe pjesët e tyre për Apache Airflow.

Burimi: www.habr.com

Shto një koment