Kaʻina ETL no ka loaʻa ʻana o ka ʻikepili mai ka leka uila ma Apache Airflow

Kaʻina ETL no ka loaʻa ʻana o ka ʻikepili mai ka leka uila ma Apache Airflow

No ka nui o ka ʻenehana i hoʻomohala ʻia, ʻo ke kaula o nā ala kahiko e hele mau ana ma hope o ka hoʻomohala ʻana. Ma muli paha o ka hoʻololi maʻalahi, nā kumu kanaka, nā pono ʻenehana, a i ʻole kekahi mea ʻē aʻe. Ma ke kahua o ka hoʻoili ʻikepili, ʻike nui ʻia nā kumu ʻikepili i kēia ʻāpana. No ka nui o kā mākou moeʻuhane e hoʻopau i kēia, akā i kēia manawa ua hoʻouna ʻia kahi hapa o ka ʻikepili i nā ʻelele koke a me nā leka uila, ʻaʻole e haʻi i nā format archaic. Ke kono nei au iā ʻoe e wehe i kekahi o nā koho no Apache Airflow, e hōʻike ana pehea e hiki ai iā ʻoe ke lawe i ka ʻikepili mai nā leka uila.

prehistory

Hoʻololi ʻia ka nui o nā ʻikepili ma o ka leka uila, mai nā kamaʻilio interpersonal a i nā kūlana o ka pilina ma waena o nā hui. He mea maikaʻi inā hiki ke kākau i kahi interface no ka loaʻa ʻana o ka ʻikepili a i ʻole e hoʻokomo i nā poʻe i loko o ke keʻena e hoʻokomo i kēia ʻike i nā kumu maʻalahi, akā pinepine ʻaʻole hiki ke hiki. ʻO ka hana kikoʻī aʻu i alo ai, ʻo ia ka hoʻopili ʻana i ka ʻōnaehana CRM kaulana i ka waihona ʻikepili, a laila i ka ʻōnaehana OLAP. Ua hiki mai ka mōʻaukala no kā mākou ʻoihana ka hoʻohana ʻana i kēia ʻōnaehana ma kahi wahi o ka ʻoihana. No laila, makemake nui nā mea a pau e hiki ke hana me ka ʻikepili mai kēia ʻōnaehana ʻaoʻao ʻekolu pū kekahi. ʻO ka mea mua, ʻoiaʻiʻo, ua aʻo ʻia ka hiki ke loaʻa ka ʻikepili mai kahi API wehe. ʻO ka mea pōʻino, ʻaʻole i uhi ka API i ka loaʻa ʻana o nā ʻikepili pono a pau, a, ma nā ʻōlelo maʻalahi, ua kekee ma nā ʻano he nui, a ʻaʻole makemake a ʻaʻole hiki i ke kākoʻo ʻenehana ke hui i ka hapalua e hāʻawi i ka hana piha. Akā ua hāʻawi kēia ʻōnaehana i ka manawa e loaʻa i kēlā me kēia manawa ka ʻikepili i nalowale ma ka leka uila ma ke ʻano o kahi loulou no ka wehe ʻana i ka waihona.

Pono e hoʻomaopopoʻiaʻaʻole kēia wale nō ka hihia i makemake ai kaʻoihana e hōʻiliʻili i nāʻikepili mai nā leka uila a iʻole nāʻelele koke. Eia nō naʻe, i kēia hihia, ʻaʻole hiki iā mākou ke hoʻohuli i kahi hui ʻaoʻao ʻekolu e hāʻawi i kahi ʻāpana o ka ʻikepili ma kēia ala wale nō.

apache airflow

No ke kūkulu ʻana i nā kaʻina hana ETL, hoʻohana pinepine mākou iā Apache Airflow. I mea e hoʻomaopopo maikaʻi ai ka mea heluhelu i ʻike ʻole i kēia ʻenehana i ke ʻano o ka nānā ʻana i ka pōʻaiapili a ma ke ʻano nui, e wehewehe wau i kekahi mau mea hoʻolauna.

He kahua manuahi ʻo Apache Airflow i hoʻohana ʻia e kūkulu, hoʻokō a nānā i nā kaʻina hana ETL (Extract-Transform-Loading) ma Python. ʻO ka manaʻo nui i ka Airflow he graph acyclic kuhikuhi, kahi o nā vertices o ka pakuhi he mau kaʻina kikoʻī, a ʻo nā kihi o ka pakuhi ke kahe o ka mana a i ʻole ka ʻike. Hiki i ke kaʻina hana ke kāhea wale aku i kekahi hana Python, a i ʻole hiki ke loaʻa i ka loiloi paʻakikī mai ke kāhea ʻana i kekahi mau hana i loko o ka pōʻaiapili o kahi papa. No nā hana maʻamau, ua nui nā hana i hoʻomākaukau ʻia i hiki ke hoʻohana ʻia ma ke ʻano he kaʻina hana. ʻO ia mau mea hoʻomohala:

  • nā mea hoʻohana - no ka hoʻoili ʻana i ka ʻikepili mai kahi wahi a i kekahi, no ka laʻana, mai kahi papa ʻikepili i kahi waihona ʻikepili;
  • nā mea ʻike - no ke kali ʻana i kahi hanana a me ke kuhikuhi ʻana i ke kahe o ka mana i nā vertices ma hope o ka pakuhi;
  • nā makau - no nā hana haʻahaʻa haʻahaʻa, no ka laʻana, e kiʻi i ka ʻikepili mai kahi papa ʻikepili (hoʻohana ʻia i nā ʻōlelo);
  • a pēlā aku nō.

He mea kūpono ʻole ka wehewehe kikoʻī ʻana iā Apache Airflow i kēia ʻatikala. Hiki ke ʻike ʻia nā hoʻolauna pōkole maanei ai ole ia, maanei.

Hook no ka loaʻa ʻana o ka ʻikepili

ʻO ka mea mua, e hoʻoponopono i ka pilikia, pono mākou e kākau i kahi makau e hiki ai iā mākou:

  • hoʻopili i ka leka uila
  • e imi i ka palapala kupono
  • loaʻa ka ʻikepili mai ka leka.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

ʻO kēia ka loiloi: hoʻopili mākou, ʻimi i ka leka hope loa e pili ana, inā he mau mea ʻē aʻe, haʻalele mākou iā lākou. Hoʻohana ʻia kēia hana, no ka mea aia nā leka hope i nā ʻikepili a pau o nā mea mua. Inā ʻaʻole kēia ka hihia, a laila hiki iā ʻoe ke hoʻihoʻi i kahi ʻano o nā leka a pau, a i ʻole e hana i ka mea mua, a ʻo ke koena ma ka pā aʻe. Ma keʻano laulā, nā mea a pau, e like me nā manawa a pau, pili i ka hana.

Hoʻohui mākou i ʻelua mau hana kōkua i ka makau: no ka hoʻoiho ʻana i kahi faila a no ka hoʻoiho ʻana i kahi faila me ka hoʻohana ʻana i kahi loulou mai kahi leka uila. Ma ke ala, hiki ke hoʻoneʻe ʻia i ka mea hoʻohana, pili ia i ka pinepine o ka hoʻohana ʻana i kēia hana. He aha hou aʻe e hoʻohui ai i ka makau, hou, e pili ana i ka hana: inā loaʻa koke nā faila i ka leka, a laila hiki iā ʻoe ke hoʻoiho i nā mea pili i ka leka, inā loaʻa ka ʻikepili i ka leka, a laila pono ʻoe e parse i ka leka, etc. I koʻu hihia, hele mai ka leka me hoʻokahi loulou i ka waihona, pono iaʻu e kau i kahi wahi a hoʻomaka i ke kaʻina hana hou.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

He maʻalahi ke code, no laila ʻaʻole pono e wehewehe hou. E haʻi wale wau iā ʻoe e pili ana i ka laina kilokilo imap_conn_id. Mālama ʻo Apache Airflow i nā ʻāpana pili (ke komo, ka ʻōlelo huna, ka helu wahi, a me nā ʻāpana ʻē aʻe) hiki ke ʻike ʻia e kahi mea hōʻike string. ʻIke ʻia, e like me kēia ka hoʻokele pili

Kaʻina ETL no ka loaʻa ʻana o ka ʻikepili mai ka leka uila ma Apache Airflow

ʻIke e kali no ka ʻikepili

No ka mea ua ʻike mua mākou pehea e hoʻopili ai a loaʻa i ka ʻikepili mai ka leka uila, hiki iā mākou ke kākau i kahi sensor e kali iā lākou. I koʻu hihia, ʻaʻole hiki ke kākau koke i kahi mea hoʻohana nāna e hoʻoponopono i ka ʻikepili, inā he, no ka mea, hana nā kaʻina hana ʻē aʻe ma muli o ka ʻikepili i loaʻa mai ka leka uila, me nā mea e lawe i nā ʻikepili pili mai nā kumu ʻē aʻe (API, telephony. , anana pūnaewele, etc.). etc.). E hāʻawi wau iā ʻoe i laʻana. Ua ʻike ʻia kahi mea hoʻohana hou ma ka ʻōnaehana CRM, a ʻaʻole naʻe mākou i ʻike e pili ana i kāna UUID. A laila, i ka wā e hoʻāʻo ai e loaʻa ka ʻikepili mai ke kelepona SIP, e loaʻa iā mākou nā kelepona pili i kāna UUID, akā ʻaʻole hiki iā mākou ke mālama a hoʻohana pono iā lākou. Ma ia mau mea, he mea nui e hoʻomanaʻo i ka hilinaʻi ʻana o ka ʻikepili, ʻoiai inā mai nā kumu like ʻole. ʻO kēia, ʻoiaʻiʻo, ʻaʻole lawa nā ana e mālama ai i ka pono o ka ʻikepili, akā i kekahi mau mea pono lākou. ʻAe, a ʻo ka hoʻopaʻa wale ʻana i nā kumuwaiwai he mea noʻonoʻo.

No laila, e hoʻomaka kā mākou sensor i nā vertices ma hope o ka pakuhi inā loaʻa ka ʻike hou ma ka leka uila, a e hōʻailona hoʻi i ka ʻike mua he mea ʻole.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Loaʻa a hoʻohana mākou i ka ʻikepili

No ka loaʻa ʻana a me ka hana ʻana i ka ʻikepili, hiki iā ʻoe ke kākau i kahi mea hoʻohana kaʻawale, hiki iā ʻoe ke hoʻohana i nā mea i mākaukau. No ka mea he mea liʻiliʻi ka loiloi - e lawe i ka ʻikepili mai ka leka, no ka laʻana, manaʻo wau i ka PythonOperator maʻamau

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Ma ke ala, inā aia kāu leka uila ma mail.ru, a laila ʻaʻole hiki iā ʻoe ke ʻimi i nā leka ma ke kumuhana, mea hoʻouna, etc. I ka makahiki 2016, ua hoʻohiki lākou e hoʻolauna, akā ua hoʻololi i ko lākou manaʻo. Ua hoʻoponopono wau i kēia pilikia ma ka hanaʻana i kahi waihona kaʻawale no nā leka kūpono a me ka hoʻonohonohoʻana i kahi kānana no nā leka pono i ka pūnaewele pūnaewele leka. No laila, ʻo nā leka pono wale nō a me nā kūlana no ka ʻimi ʻana, i koʻu hihia, e komo wale (UNSEEN) i kēia waihona.

ʻO ka hōʻuluʻulu ʻana, loaʻa iā mākou ke kaʻina aʻe: nānā mākou inā he mau leka hou e kū ana i nā kūlana, inā aia, a laila hoʻoiho mākou i ka waihona me ka hoʻohana ʻana i ka loulou mai ka leka hope loa.
Ma lalo o nā kiko hope loa, ʻaʻole e wehe ʻia kēia waihona, e hoʻomaʻemaʻe ʻia nā ʻikepili mai ka waihona a hoʻomaʻemaʻe ʻia, a ʻo ka hopena, e hele hou aku ka mea āpau i ka pipeline o ke kaʻina hana ETL, akā ua hala kēia. ka laulā o ka ʻatikala. Inā lilo ia i mea hoihoi a pono, a laila e hauʻoli wau e wehewehe i nā hopena ETL a me kā lākou mau ʻāpana no Apache Airflow.

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka