ETL faiga mo le mauaina o faʻamatalaga mai imeli ile Apache Airflow

ETL faiga mo le mauaina o faʻamatalaga mai imeli ile Apache Airflow

E tusa lava po o le a le tele o tekinolosi e atiaʻe, o se manoa o auala tuai e masani ona uia i tua atu o le atinaʻe. Atonu e mafua mai i le sologa lelei o suiga, tulaga faaletagata, manaoga faatekinolosi, po o se isi mea. I le tulaga o faʻamaumauga o faʻamaumauga, o faʻamatalaga faʻamatalaga e sili ona faʻaalia i lenei vaega. E tusa lava po o le a le tele tatou te moemiti i le faʻaumatiaina o lenei mea, ae o se vaega o faʻamaumauga e auina atu i avefeʻau vave ma imeli, ae le o le taʻua atili o faʻasologa tuai. Ou te valaʻauina oe e faʻateʻaina se tasi o filifiliga mo Apache Airflow, faʻaalia pe faʻapefea ona e ave faʻamatalaga mai imeli.

prehistory

O le tele o faʻamatalaga o loʻo faʻafeiloaʻi pea e ala i le imeli, mai fesoʻotaʻiga faʻapitoa i tulaga o fegalegaleaiga i le va o kamupani. E lelei pe afai e mafai ona tusia se atinaʻe e maua ai faʻamatalaga pe tuʻu tagata i le ofisa o le a faʻapipiʻiina lenei faʻamatalaga i punaoa sili atu ona faigofie, ae masani lava e le mafai. O le galuega patino na ou feagai o le faʻafesoʻotaʻi lea o le CRM lauiloa i le fale teu oloa, ona sosoo ai lea ma le OLAP system. Na tupu i tala faasolopito e mo la matou kamupani o le faʻaogaina o lenei faiga sa faigofie i se vaega faapitoa o pisinisi. O le mea lea, na manaʻo tagata uma ina ia mafai ona faʻaogaina faʻamatalaga mai lenei faiga faʻavae lona tolu foi. Muamua lava, ioe, o le avanoa e maua ai faʻamatalaga mai se API tatala sa suʻesuʻeina. O le mea e leaga ai, e leʻi faʻapipiʻiina e le API le mauaina uma o faʻamatalaga talafeagai, ma, i se faaupuga faigofie, e tele auala na piʻopiʻo, ma e leʻi manaʻo le lagolago faʻapitoa pe le mafai ona faʻafeiloaʻi le afa e tuʻuina atu galuega faʻapitoa. Ae o lenei faiga na maua ai le avanoa e maua ai i lea taimi ma lea taimi faʻamatalaga o loʻo misi e ala i meli i le tulaga o se fesoʻotaʻiga mo le laʻuina o le faʻamaumauga.

E tatau ona maitauina e le na o le pau lea o le tulaga na manaʻo ai le pisinisi e aoina faʻamatalaga mai imeli poʻo avefeʻau vave. Ae ui i lea, i lenei tulaga, e le mafai ona matou faʻamalosia se kamupani lona tolu e tuʻuina atu se vaega o faʻamatalaga i lenei auala.

apache ea tafe

Ina ia fausia faiga ETL, e masani ona matou faʻaaogaina Apache Airflow. Ina ia mafai e se tagata faitau e le masani i lenei tekinolosi ona malamalama atili pe faʻapefea ona foliga i totonu o le tala ma lautele, o le a ou faʻamatalaina se lua o faʻamatalaga.

Apache Airflow o se faʻavae e leai se totogi e faʻaaogaina e fausia, faʻatino ma mataʻituina faiga ETL (Extract-Transform-Loading) i le Python. O le manatu autu i le Airflow o se kalafi acyclic faʻatonu, lea o pito o le kalafi o ni faiga faʻapitoa, ma o pito o le kalafi o le tafe o le pulea poʻo faʻamatalaga. O se fa'agasologa e mafai ona na'o le vala'au o so'o se galuega Python, pe mafai fo'i ona i ai se fa'alavelave lavelave mai le fa'asolosolo fa'asologa o le tele o galuega i totonu o le vasega. Mo fa'agaioiga masani, o lo'o i ai le tele o atina'e ua saunia e mafai ona fa'aaogaina e fai ma faiga. O ia atinae e aofia ai:

  • operators - mo le fesiitaiga o faʻamatalaga mai le tasi nofoaga i le isi, mo se faʻataʻitaʻiga, mai se laulau faʻamaumauga i se fale teu oloa;
  • sensors - mo le faʻatali mo le tupu mai o se mea na tupu ma faʻatonuina le tafe o le pulea i pito mulimuli o le kalafi;
  • matau - mo galuega maualalo, mo se faʻataʻitaʻiga, e maua ai faʻamatalaga mai se laulau faʻamaumauga (faʻaaogaina i faʻamatalaga);
  • ma isi.

E le talafeagai le faʻamatalaina o le Apache Airflow i auiliiliga i lenei tusiga. E mafai ona matamata i fa'amatalaga pupuu iinei poʻo iinei.

Matau mo le mauaina o faʻamatalaga

Muamua lava, ina ia foia le faafitauli, e tatau ona tatou tusia se matau e mafai ai ona tatou:

  • fa'afeso'ota'i ile imeli
  • su'e le mataitusi sa'o
  • maua faʻamatalaga mai le tusi.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

O le manatu o lenei: matou te faʻafesoʻotaʻi, suʻe le tusi sili ona talafeagai, pe a iai isi, matou te le amanaiaina i latou. O loʻo faʻaaogaina lenei galuega, aua o mataʻitusi mulimuli ane o loʻo i ai faʻamatalaga uma o muamua. Afai e le o le tulaga lea, e mafai ona e toe faʻafoʻi se faʻasologa o mataʻitusi uma, pe faʻagasolo le muamua, ma le isi i le isi pasi. I se tulaga lautele, o mea uma, pei o taimi uma, e faʻalagolago i le galuega.

Matou te faʻaopoopoina ni galuega fesoasoani se lua i le matau: mo le siiina o se faila ma mo le siiina o se faila e faʻaaoga ai se fesoʻotaʻiga mai se imeli. I le auala, e mafai ona latou siitia atu i le tagata faʻafoe, e faʻalagolago i le tele o le faʻaaogaina o lenei gaioiga. O le a se isi mea e faʻaopoopo i le matau, toe faʻalagolago i le galuega: afai e maua vave faila i le tusi, ona mafai lea ona e sii maia faʻapipiʻi i le tusi, pe a maua faʻamatalaga i le tusi, ona e manaʻomia lea e faʻasalalau le tusi, ma isi. I loʻu tulaga, o le tusi e sau ma le tasi soʻotaga i le faʻamaumauga, lea ou te manaʻomia e tuʻu i se nofoaga patino ma amata le faʻagasologa o le faagasologa.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

E faigofie le code, o lea e tau le manaʻomia ai se faʻamatalaga atili. O le a ou taʻuina atu ia te oe le laina faʻailoga imap_conn_id. O lo'o teuina e Apache Airflow fa'amaufa'ailoga feso'ota'iga (login, password, address, ma isi fa'amaufa'ailoga) e mafai ona maua e se fa'ailoga manoa. I le va'aia, e fa'apea le pulega o feso'ota'iga

ETL faiga mo le mauaina o faʻamatalaga mai imeli ile Apache Airflow

Sensor e fa'atali mo fa'amaumauga

Talu ai ua uma ona matou iloa faʻafesoʻotaʻi ma maua faʻamatalaga mai meli, ua mafai nei ona matou tusia se masini e faʻatali mo i latou. I loʻu tulaga, e leʻi aoga le tusi vave o se tagata faʻatautaia o le a faʻagasolo faʻamaumauga, pe a iai, ona o isi gaioiga e faʻavae i luga o faʻamatalaga na maua mai le meli, e aofia ai ma i latou e ave faʻamatalaga faʻamatalaga mai isi punaoa (API, telefoni. , metrics web, etc.). etc.). Ou te avatu ia te oe se faataitaiga. Ua aliali mai se tagata fou i le CRM system, ma matou te le iloa lava e uiga i lana UUID. Ma, pe a taumafai e maua faʻamatalaga mai le SIP telefoni, o le a matou mauaina ni telefoni e fesoʻotaʻi ma lona UUID, ae o le a le mafai ona matou faʻasaoina ma faʻaoga saʻo. I ia mataupu, e taua le manatua o le faʻalagolago i faʻamaumauga, aemaise lava pe a fai e mai i punaoa eseese. O nei mea, ioe, e le lava ni faiga e faʻasaoina ai le faʻamaoni o faʻamaumauga, ae i nisi tulaga e manaʻomia. Ioe, ma o le nofonofovale e nofoia punaoa e le mafaufau foi.

O le mea lea, o le a faʻalauiloa e le matou sensor ia pito mulimuli o le kalafi pe a iai ni faʻamatalaga fou i le meli, ma faʻailogaina faʻamatalaga muamua e le taua.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Matou te mauaina ma faʻaaogaina faʻamaumauga

Ina ia maua ma faʻagasolo faʻamaumauga, e mafai ona e tusia se isi tagata faʻapitoa, e mafai ona e faʻaogaina mea ua saunia. Talu ai e oʻo mai i le taimi nei e le taua tele le manatu - e ave faʻamatalaga mai le tusi, mo se faʻataʻitaʻiga, ou te fautua atu le PythonOperator masani

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

I le auala, afai o lau meli faʻatasi o loʻo i luga ole mail.ru, o le a le mafai ona e suʻeina tusi e ala i le mataupu, sender, ma isi. I tua i le 2016, na latou folafola atu e faʻafeiloaʻi, ae foliga mai na suia o latou mafaufau. Na ou foia lenei faafitauli e ala i le fatuina o se pusa eseese mo mataitusi talafeagai ma le setiina o se faamama mo mataitusi talafeagai i le upega tafaʻilagi meli. O le mea lea, naʻo mataʻitusi talafeagai ma tulaga mo le sailiga, i loʻu tulaga, faigofie (UNSEEN) alu i totonu o lenei pusa.

Aotelega, o loʻo i ai le faʻasologa o loʻo i lalo: matou te siaki pe o iai ni mataitusi fou e fetaui ma tulaga, pe a iai, ona matou siiina lea o le archive e faʻaaoga ai le sootaga mai le tusi mulimuli.
I lalo o togi mulimuli, o le a le faʻaaogaina o lenei faʻamaumauga o le a tatalaina, o faʻamaumauga mai le faʻamaumauga o le a faʻamama ma faʻagasolo, ma o se taunuuga, o le a alu atu le mea atoa i le paipa o le faagasologa o le ETL, ae o lenei ua sili atu. le lautele o le tusiga. Afai e foliga mai e manaia ma aoga, ona ou fiafia lea e faʻamatalaina fofo ETL ma a latou vaega mo Apache Airflow.

puna: www.habr.com

Faaopoopo i ai se faamatalaga