ETL գործընթաց՝ Apache Airflow-ում էլփոստից տվյալներ ստանալու համար

ETL գործընթաց՝ Apache Airflow-ում էլփոստից տվյալներ ստանալու համար

Անկախ նրանից, թե որքանով է զարգանում տեխնոլոգիան, մի շարք հնացած մոտեցումներ միշտ հետևում են զարգացմանը: Դա կարող է պայմանավորված լինել սահուն անցումով, մարդկային գործոններով, տեխնոլոգիական կարիքներով կամ այլ բանով: Տվյալների մշակման ոլորտում տվյալների աղբյուրներն ամենաբացահայտումն են այս մասում։ Անկախ նրանից, թե որքան ենք մենք երազում ազատվել սրանից, բայց առայժմ տվյալների մի մասն ուղարկվում է ակնթարթային մեսենջերներով և էլեկտրոնային նամակներով, էլ չեմ խոսում ավելի հնացած ձևաչափերի մասին: Ես հրավիրում եմ ձեզ ապամոնտաժել Apache Airflow-ի տարբերակներից մեկը՝ ցույց տալով, թե ինչպես կարող եք տվյալներ վերցնել նամակներից:

նախապատմությանը

Էլեկտրոնային փոստի միջոցով դեռ շատ տվյալներ են փոխանցվում՝ միջանձնային հաղորդակցությունից մինչև ընկերությունների միջև փոխգործակցության չափանիշներ: Լավ է, եթե հնարավոր լինի գրել ինտերֆեյս՝ տվյալներ ձեռք բերելու համար կամ գրասենյակում տեղադրել մարդկանց, ովքեր այս տեղեկատվությունը մուտքագրելու են ավելի հարմար աղբյուրներ, բայց հաճախ դա պարզապես հնարավոր չէ: Հատուկ խնդիրը, որին ես բախվեցի, տխրահռչակ CRM համակարգը տվյալների պահեստին միացնելն էր, այնուհետև OLAP համակարգին: Պատմականորեն այնպես է պատահել, որ մեր ընկերության համար այս համակարգի օգտագործումը հարմար էր բիզնեսի որոշակի ոլորտում: Հետևաբար, բոլորն իսկապես ցանկանում էին, որ կարողանան աշխատել նաև այս երրորդ կողմի համակարգի տվյալների հետ: Առաջին հերթին, իհարկե, ուսումնասիրվել է բաց API-ից տվյալներ ստանալու հնարավորությունը։ Ցավոք, API-ն չէր ծածկում բոլոր անհրաժեշտ տվյալների ստացումը, և, պարզ ասած, այն շատ առումներով խեղաթյուրված էր, և տեխնիկական աջակցությունը չէր ցանկանում կամ չէր կարող հասնել կես ճանապարհին ավելի համապարփակ գործառույթ ապահովելու համար: Բայց այս համակարգը հնարավորություն էր տալիս պարբերաբար փոստով ստանալ բացակայող տվյալները՝ արխիվը բեռնաթափելու հղման տեսքով։

Հարկ է նշել, որ սա միակ դեպքը չէ, երբ բիզնեսը ցանկացել է տվյալներ հավաքել էլեկտրոնային նամակներից կամ ակնթարթային մեսենջերներից։ Սակայն այս դեպքում մենք չէինք կարող ազդել երրորդ կողմի ընկերության վրա, որը միայն այս կերպ է տրամադրում տվյալների մի մասը։

apache օդային հոսք

ETL գործընթացներ կառուցելու համար մենք ամենից հաճախ օգտագործում ենք Apache Airflow: Որպեսզի ընթերցողը, ով անծանոթ է այս տեխնոլոգիային, ավելի լավ հասկանա, թե ինչպես է այն համատեքստում և ընդհանրապես, ես նկարագրելու եմ մի քանի ներածական:

Apache Airflow-ը անվճար հարթակ է, որն օգտագործվում է Python-ում ETL (Extract-Transform-Loading) գործընթացները կառուցելու, գործարկելու և վերահսկելու համար: Օդային հոսքի հիմնական հայեցակարգը ուղղորդված ացիկլիկ գրաֆիկն է, որտեղ գրաֆիկի գագաթները կոնկրետ գործընթացներ են, իսկ գրաֆիկի եզրերը՝ հսկողության կամ տեղեկատվության հոսք: Գործընթացը կարող է պարզապես զանգահարել Python-ի ցանկացած ֆունկցիա, կամ կարող է ունենալ ավելի բարդ տրամաբանություն՝ դասի համատեքստում մի քանի ֆունկցիաներ հաջորդաբար կանչելուց: Ամենահաճախակի գործողությունների համար արդեն կան բազմաթիվ պատրաստի մշակումներ, որոնք կարող են օգտագործվել որպես գործընթացներ: Նման զարգացումները ներառում են.

  • օպերատորներ - տվյալների մի վայրից մյուսը փոխանցելու համար, օրինակ, տվյալների բազայի աղյուսակից տվյալների պահեստ.
  • սենսորներ - որոշակի իրադարձության առաջացմանը սպասելու և հսկողության հոսքը գրաֆիկի հաջորդ գագաթներին ուղղելու համար.
  • կեռիկներ - ավելի ցածր մակարդակի գործողությունների համար, օրինակ, տվյալների բազայի աղյուսակից տվյալներ ստանալու համար (օգտագործվում է հայտարարություններում);
  • եւ այլն:

Անպատշաճ կլիներ այս հոդվածում մանրամասն նկարագրել Apache Airflow-ը: Կարճ ներածությունները կարելի է դիտել այստեղ կամ այստեղ.

Կեռիկ՝ տվյալներ ստանալու համար

Նախ և առաջ խնդիրը լուծելու համար պետք է գրել կեռիկ, որով կարող ենք.

  • միանալ էլփոստին
  • գտնել ճիշտ տառը
  • ստանալ տվյալներ նամակից.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Տրամաբանությունը սա է՝ մենք կապում ենք, գտնում ենք ամենաարդիական վերջին տառը, եթե կան ուրիշներ՝ անտեսում ենք։ Այս ֆունկցիան օգտագործվում է, քանի որ հետագա տառերը պարունակում են ավելի վաղ տառերի բոլոր տվյալները։ Եթե ​​դա այդպես չէ, ապա կարող եք վերադարձնել բոլոր տառերի զանգվածը կամ մշակել առաջինը, իսկ մնացածը հաջորդ անցումով: Ընդհանուր առմամբ, ամեն ինչ, ինչպես միշտ, կախված է առաջադրանքից։

Կեռիկին ավելացնում ենք երկու օժանդակ գործառույթ՝ ֆայլ ներբեռնելու և ֆայլ ներբեռնելու համար՝ օգտագործելով էլփոստի հղումը: Ի դեպ, դրանք կարող են տեղափոխվել օպերատոր, դա կախված է այս ֆունկցիոնալության օգտագործման հաճախականությունից: Ուրիշ ինչ ավելացնել մանգաղին, կրկին կախված է առաջադրանքից. եթե ֆայլերը ստացվում են անմիջապես նամակում, ապա կարող եք ներբեռնել նամակի հավելվածները, եթե տվյալները ստացվել են նամակում, ապա պետք է վերլուծել նամակը, և այլն: Իմ դեպքում նամակը գալիս է արխիվի մեկ հղումով, որը պետք է տեղադրեմ որոշակի տեղում և սկսեմ հետագա մշակման գործընթացը։

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Կոդը պարզ է, ուստի այն հազիվ թե լրացուցիչ բացատրության կարիք ունենա: Ես ձեզ պարզապես կասեմ imap_conn_id կախարդական գծի մասին: Apache Airflow-ը պահպանում է կապի պարամետրերը (մուտք, գաղտնաբառ, հասցե և այլ պարամետրեր), որոնց հասանելի է լարային նույնացուցիչը: Տեսողականորեն կապի կառավարումն այսպիսի տեսք ունի

ETL գործընթաց՝ Apache Airflow-ում էլփոստից տվյալներ ստանալու համար

Տվյալների սպասման տվիչ

Քանի որ մենք արդեն գիտենք, թե ինչպես կապել և ստանալ տվյալներ փոստից, այժմ կարող ենք գրել սենսոր՝ նրանց սպասելու համար: Իմ դեպքում, չհաջողվեց անմիջապես գրել օպերատոր, որը կմշակի տվյալները, եթե այդպիսիք կան, քանի որ այլ գործընթացներ աշխատում են փոստից ստացված տվյալների հիման վրա, ներառյալ նրանք, որոնք առնչվում են այլ աղբյուրներից (API, հեռախոսակապ): , վեբ չափումներ և այլն) և այլն)։ Ես ձեզ օրինակ բերեմ. CRM համակարգում նոր օգտատեր է հայտնվել, և մենք դեռ չգիտենք նրա UUID-ի մասին։ Այնուհետև, երբ փորձում ենք տվյալներ ստանալ SIP հեռախոսակապից, մենք կստանանք զանգեր՝ կապված դրա UUID-ի հետ, բայց մենք չենք կարողանա պահպանել և ճիշտ օգտագործել դրանք: Նման հարցերում կարևոր է նկատի ունենալ տվյալների կախվածությունը, հատկապես, եթե դրանք տարբեր աղբյուրներից են։ Սրանք, իհարկե, անբավարար միջոցներ են տվյալների ամբողջականությունը պահպանելու համար, բայց որոշ դեպքերում դրանք անհրաժեշտ են: Այո, և ռեսուրսներ զբաղեցնելու համար պարապ մնալը նույնպես իռացիոնալ է։

Այսպիսով, մեր սենսորը կգործարկի գրաֆիկի հաջորդ գագաթները, եթե փոստում թարմ տեղեկատվություն լինի, ինչպես նաև կնշի նախորդ տեղեկատվությունը որպես անտեղի:

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Մենք ստանում և օգտագործում ենք տվյալներ

Տվյալներ ստանալու և մշակելու համար կարող եք գրել առանձին օպերատոր, կարող եք օգտագործել պատրաստի: Քանի որ մինչ այժմ տրամաբանությունը չնչին է. օրինակ վերցնել տառից տվյալներ, առաջարկում եմ ստանդարտ PythonOperator-ը:

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Ի դեպ, եթե ձեր կորպորատիվ փոստը նույնպես mail.ru-ում է, ապա դուք չեք կարողանա նամակներ որոնել ըստ թեմայի, ուղարկողի և այլն: Դեռ 2016 թվականին նրանք խոստացել էին ներկայացնել այն, սակայն, ըստ երեւույթին, մտափոխվեցին։ Ես լուծեցի այս խնդիրը՝ ստեղծելով առանձին թղթապանակ անհրաժեշտ տառերի համար և տեղադրելով ֆիլտր փոստի վեբ ինտերֆեյսում անհրաժեշտ տառերի համար։ Այսպիսով, միայն որոնման համար անհրաժեշտ տառերն ու պայմանները, իմ դեպքում, պարզապես (ԱՆՏԵՍՆԱԼ) մտնում են այս թղթապանակում:

Ամփոփելով՝ ունենք հետևյալ հաջորդականությունը՝ ստուգում ենք, արդյոք կան պայմաններին համապատասխանող նոր տառեր, եթե կան, ապա ներբեռնում ենք արխիվը՝ օգտագործելով վերջին տառի հղումը։
Վերջին կետերի տակ բաց է թողնվում, որ այս արխիվը կբացվի, արխիվի տվյալները կմաքրվեն և կմշակվեն, և արդյունքում ամբողջը կգնա դեպի ETL գործընթացի խողովակաշարը, բայց սա արդեն այն կողմ է: հոդվածի շրջանակը։ Եթե ​​հետաքրքիր և օգտակար ստացվեց, ապա ես սիրով կշարունակեմ նկարագրել ETL լուծումները և դրանց մասերը Apache Airflow-ի համար։

Source: www.habr.com

Добавить комментарий