Usoro ETL maka ịnweta data sitere na email na Apache Airflow

Usoro ETL maka ịnweta data sitere na email na Apache Airflow

N'agbanyeghị otú nkà na ụzụ na-esi etolite, ọtụtụ ụzọ ochie na-aga n'azụ mmepe. Nke a nwere ike ịbụ n'ihi mgbanwe dị nro, ihe mmadụ na-akpata, mkpa nkà na ụzụ, ma ọ bụ ihe ọzọ. N'ihe gbasara nhazi data, isi mmalite data bụ ihe kachasị ekpughe na akụkụ a. N'agbanyeghị otú anyị na-arọ nrọ ikpochapụ nke a, ma ka ọ dị ugbu a, a na-ezigara akụkụ nke data ahụ na ndị ozi ozugbo na ozi ịntanetị, ọ bụghị ịkọkwu ụdị archaic. M na-akpọ gị òkù ka ị kwasa otu n'ime nhọrọ maka Apache Airflow, na-egosi otu ị ga-esi nweta data site na ozi-e.

prehistory

A ka na-ebufe ọtụtụ data site na ozi-e, site na nkwukọrịta mmekọrịta mmadụ na ibe ya na ụkpụrụ mmekọrịta n'etiti ụlọ ọrụ. Ọ dị mma ma ọ bụrụ na ọ ga-ekwe omume ịde interface iji nweta data ma ọ bụ tinye ndị mmadụ n'ọfịs ndị ga-abanye ozi a n'ime isi mmalite ndị dị mma, mana ọtụtụ mgbe nke a nwere ike ọ gaghị ekwe omume. Ọrụ a kapịrị ọnụ m chere ihu bụ ijikọ sistemụ CRM ama ama na ụlọ nkwakọba ihe data, wee banye na sistemụ OLAP. Ọ mere akụkọ ihe mere eme na maka ụlọ ọrụ anyị iji usoro a dị mma na mpaghara azụmahịa. Ya mere, onye ọ bụla chọrọ n'ezie inwe ike iji data sitere na usoro nke atọ a rụọ ọrụ. Nke mbụ, n'ezie, a mụrụ ohere ịnweta data site na API mepere emepe. N'ụzọ dị mwute, API ekpuchighị ịnweta data niile dị mkpa, na, na okwu dị mfe, ọ bụ n'ọtụtụ ụzọ gbagọrọ agbagọ, na nkwado teknụzụ achọghị ma ọ bụ enweghị ike izute ọkara iji nye ọrụ zuru oke. Mana usoro a nyere ohere ịnweta data na-efu efu site na mail n'ụdị njikọ maka ibupu ebe nchekwa ahụ.

Ekwesiri ighota na nke a abughi nani ikpe nke ulo oru ahu choro ichikota data site na ozi-e ma obu ndi ozi ozugbo. Otú ọ dị, na nke a, anyị enweghị ike imetụta ụlọ ọrụ nke atọ nke na-enye akụkụ nke data naanị n'ụzọ dị otú a.

Apache ikuku

Iji wuo usoro ETL, anyị na-ejikarị Apache Airflow. Ka onye na-agụ akwụkwọ nke na-amaghị nkà na ụzụ a wee ghọta nke ọma otú o si ele ihe anya na n'ozuzu ya, m ga-akọwa di na nwunye nke mmeghe.

Apache Airflow bụ ikpo okwu efu nke ejiri wuo, mebie na nyochaa usoro ETL (Extract-Transform-Loading) na Python. Isi echiche dị na Airflow bụ eserese acyclic a na-eduzi, ebe akụkụ nke eserese ahụ bụ usoro akọwapụtara, na akụkụ nke eserese ahụ bụ njikwa njikwa ma ọ bụ ozi. Usoro nwere ike ịkpọ ọrụ Python ọ bụla, ma ọ bụ ọ nwere ike ịnwe mgbagha mgbagwoju anya site na ịkpọ ọtụtụ ọrụ n'usoro nke klas. Maka ọrụ ndị a na-emekarị, enweelarị ọtụtụ ihe emebere emebere nke enwere ike iji dị ka usoro. Mmepe ndị dị otú a gụnyere:

  • ndị na-arụ ọrụ - maka ịnyefe data site n'otu ebe gaa na nke ọzọ, dịka ọmụmaatụ, site na tebụl nchekwa data na ụlọ nkwakọba ihe data;
  • sensọ - maka ichere ihe omume nke ihe omume na-eduzi usoro nchịkwa na njedebe ndị na-esote nke eserese;
  • nko - maka ọrụ ndị dị ala, dịka ọmụmaatụ, iji nweta data site na tebụl nchekwa data (eji na nkwupụta);
  • na ihe ndị ọzọ.

Ọ gaghị abụ ihe na-ekwesịghị ekwesị ịkọwa Apache Airflow n'ụzọ zuru ezu n'isiokwu a. Enwere ike ilele mmalite mmalite ebe a ma ọ bụ ebe a.

Ngwa maka ịnweta data

Nke mbụ, iji dozie nsogbu ahụ, anyị kwesịrị ide nko nke anyị nwere ike:

  • jikọọ na email
  • chọta akwụkwọ ozi ziri ezi
  • nweta data site na leta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Echiche bụ nke a: anyị jikọọ, chọta akwụkwọ ozi ikpeazụ kachasị mkpa, ọ bụrụ na e nwere ndị ọzọ, anyị na-eleghara ha anya. A na-eji ọrụ a, n'ihi na akwụkwọ ozi emechaa nwere data niile nke ndị mbụ. Ọ bụrụ na nke a abụghị ikpe, mgbe ahụ ị nwere ike iweghachi ọtụtụ mkpụrụedemede niile, ma ọ bụ hazie nke mbụ, yana ndị ọzọ na ngafe ọzọ. N'ozuzu, ihe niile, dị ka mgbe niile, dabere na ọrụ ahụ.

Anyị na-agbakwunye ọrụ inyeaka abụọ na nko: maka nbudata faịlụ na maka nbudata faịlụ site na iji njikọ sitere na email. Site n'ụzọ, enwere ike ịkwaga ha na onye na-arụ ọrụ, ọ dabere na ugboro ole iji ọrụ a. Kedu ihe ọzọ ị ga-agbakwunye na nko ahụ, ọzọ, dabere na ọrụ ahụ: ọ bụrụ na a na-enweta faịlụ ozugbo na akwụkwọ ozi ahụ, ị ​​​​nwere ike ibudata mgbakwunye na leta ahụ, ọ bụrụ na enwetara data na leta ahụ, ịkwesịrị ịmegharị akwụkwọ ozi ahụ, wdg. N'okwu m, akwụkwọ ozi ahụ na-abịa na otu njikọ na ebe nchekwa, nke m kwesịrị itinye n'otu ebe ma malite usoro nhazi ọzọ.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Koodu dị mfe, yabụ na ọ naghị achọ nkọwa ọzọ. Aga m agwa gị maka ahịrị anwansi imap_conn_id. Apache Airflow na-echekwa paramita njikọ (nbanye, paswọọdụ, adreesị, na parampat ndị ọzọ) nke nwere ike ịnweta site na njirimara eriri. N'anya, njikwa njikọ dị ka nke a

Usoro ETL maka ịnweta data sitere na email na Apache Airflow

Sensọ chere data

Ebe ọ bụ na anyị amaworị ka esi jikọọ na ịnweta data site na mail, anyị nwere ike dee ihe mmetụta ugbu a iji chere ha. N'ọnọdụ m, ọ naghị arụ ọrụ iji dee onye ọrụ ozugbo nke ga-edozi data ahụ, ma ọ bụrụ na ọ bụla, n'ihi na usoro ndị ọzọ na-arụ ọrụ dabere na data enwetara site na mail, gụnyere ndị na-ewere data metụtara site na isi mmalite ndị ọzọ (API, telephony). , metrics webụ, wdg) wdg). Aga m enye gị ihe atụ. Onye ọrụ ọhụrụ apụtala na sistemụ CRM, ma anyị amabeghị maka UUID ya. Mgbe ahụ, mgbe ị na-agbalị ịnweta data sitere na telephony SIP, anyị ga-enweta oku ejikọtara na UUID ya, mana anyị agaghị enwe ike ịchekwa ma jiri ya mee ihe nke ọma. N'okwu ndị dị otú ahụ, ọ dị mkpa iburu n'uche ndabere nke data ahụ, karịsịa ma ọ bụrụ na ha sitere na isi mmalite dị iche iche. Ndị a bụ, n'ezie, ezughị oke iji chekwaa iguzosi ike n'ezi ihe data, mana n'ọnọdụ ụfọdụ ha dị mkpa. Ee, na ijigide akụ na ụba bụkwa ihe ezi uche na-adịghị na ya.

Ya mere, ihe mmetụta anyị ga-amalite vertices nke eserese ahụ ma ọ bụrụ na enwere ozi ọhụrụ na mail, ma kaa ozi gara aga dị ka ihe adịghị mkpa.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Anyị na-enweta ma jiri data

Iji nweta na hazie data, ị nwere ike dee onye ọrụ dị iche, ịnwere ike iji ndị emebere. Ebe ọ bụ na mgbagha ahụ dị ntakịrị - iji nweta data sitere na leta ahụ, dịka ọmụmaatụ, ana m atụ aro PythonOperator ọkọlọtọ.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Site n'ụzọ, ọ bụrụ na ozi ụlọ ọrụ gị dịkwa na mail.ru, mgbe ahụ ị gaghị enwe ike ịchọ akwụkwọ ozi site na isiokwu, onye na-ezipụ, wdg. Laa azụ na 2016, ha kwere nkwa iwebata ya, mana o doro anya na ha gbanwere obi ha. Adoziri m nsogbu a site na ịmepụta folda dị iche iche maka mkpụrụedemede ndị dị mkpa na ịmepụta ihe nzacha maka mkpụrụedemede ndị dị mkpa na interface weebụ mail. Ya mere, naanị mkpụrụedemede na ọnọdụ dị mkpa maka ọchụchọ, n'ọnọdụ m, naanị (Ahụghị) banye na nchekwa a.

N'ịchịkọta, anyị nwere usoro ndị a: anyị na-elele ma ọ bụrụ na e nwere mkpụrụedemede ọhụrụ na-emezu ọnọdụ ahụ, ọ bụrụ na e nwere, mgbe ahụ, anyị na-ebudata ebe nchekwa ahụ site na iji njikọ sitere na akwụkwọ ozi ikpeazụ.
N'okpuru ntụpọ ikpeazụ, a na-ahapụ ya na a ga-ewepụ ihe ndekọ a, a ga-ehichapụ data sitere na ebe nchekwa ahụ ma dozie ya, n'ihi ya, ihe niile ga-aga n'ihu na pipeline nke usoro ETL, mana nke a agabigalarị. oke nke isiokwu. Ọ bụrụ na ọ tụgharịrị bụrụ ihe na-adọrọ mmasị ma baa uru, m ga-eji obi ụtọ nọgide na-akọwa ngwọta ETL na akụkụ ha maka Apache Airflow.

isi: www.habr.com

Tinye a comment