Apache Airflow හි විද්‍යුත් තැපෑලෙන් දත්ත ලබා ගැනීම සඳහා ETL ක්‍රියාවලිය

Apache Airflow හි විද්‍යුත් තැපෑලෙන් දත්ත ලබා ගැනීම සඳහා ETL ක්‍රියාවලිය

තාක්‍ෂණය කෙතරම් දියුණු වුවත්, යල් පැන ගිය ප්‍රවේශයන් වැලක් සෑම විටම සංවර්ධනය පසුපස හඹා යයි. මෙය සුමට සංක්‍රාන්තියක්, මානව සාධක, තාක්ෂණික අවශ්‍යතා හෝ වෙනත් දෙයක් නිසා විය හැක. දත්ත සැකසීමේ ක්ෂේත්රයේ, දත්ත මූලාශ්ර මෙම කොටසෙහි වඩාත්ම හෙළිදරව් වේ. මේකෙන් ගැලවෙන්න අපි කොච්චර හීන මැව්වත් මේ වෙනකන් දත්ත වලින් කොටසක් එවන්නේ ක්ෂණික messenger සහ Email වලට මිසක් වැඩි පුරාවිද්‍යා ආකෘති ගැන කතා නොකර. ඔබට විද්‍යුත් තැපෑලෙන් දත්ත ලබා ගත හැකි ආකාරය නිරූපණය කරමින්, Apache Airflow සඳහා වන එක් විකල්පයක් විසුරුවා හැරීමට මම ඔබට ආරාධනා කරමි.

මුදලටය

අන්තර් පුද්ගල සන්නිවේදනයේ සිට සමාගම් අතර අන්තර්ක්‍රියා ප්‍රමිතීන් දක්වා බොහෝ දත්ත තවමත් විද්‍යුත් තැපෑල හරහා මාරු කරනු ලැබේ. දත්ත ලබා ගැනීම සඳහා අතුරු මුහුණතක් ලිවීමට හෝ මෙම තොරතුරු වඩාත් පහසු මූලාශ්‍රවලට ඇතුළත් කරන පුද්ගලයින් කාර්යාලයේ තැබීමට හැකි නම් හොඳයි, නමුත් බොහෝ විට මෙය කළ නොහැකි විය හැකිය. මා මුහුණ දුන් විශේෂිත කාර්යය වූයේ කුප්‍රකට CRM පද්ධතිය දත්ත ගබඩාවට සම්බන්ධ කිරීම සහ පසුව OLAP පද්ධතියට සම්බන්ධ කිරීමයි. එය ඓතිහාසිකව සිදු වූයේ අපගේ සමාගමට මෙම පද්ධතිය භාවිතා කිරීම විශේෂිත ව්‍යාපාරික ක්ෂේත්‍රයක පහසු විය. එමනිසා, මෙම තෙවන පාර්ශ්ව පද්ධතියෙන් ද දත්ත සමඟ ක්‍රියා කිරීමට සෑම කෙනෙකුටම සැබවින්ම අවශ්‍ය විය. පළමුවෙන්ම, ඇත්ත වශයෙන්ම, විවෘත API වෙතින් දත්ත ලබා ගැනීමේ හැකියාව අධ්යයනය කරන ලදී. අවාසනාවන්ත ලෙස, API විසින් අවශ්‍ය සියලුම දත්ත ලබා ගැනීම ආවරණය නොකළ අතර, සරලව කිවහොත්, එය බොහෝ ආකාරවලින් වංක වූ අතර, වඩාත් පුළුල් ක්‍රියාකාරීත්වයක් සැපයීමට තාක්ෂණික සහායට අවශ්‍ය හෝ අතරමග හමුවීමට නොහැකි විය. නමුත් මෙම පද්ධතිය මඟින් සංරක්ෂිතය ගොඩබෑම සඳහා සබැඳියක් ආකාරයෙන් තැපැල් මඟින් අතුරුදහන් වූ දත්ත වරින් වර ලබා ගැනීමට අවස්ථාව ලබා දුන්නේය.

ව්‍යාපාරයට ඊමේල් හෝ ක්ෂණික පණිවිඩකරුවන්ගෙන් දත්ත රැස් කිරීමට අවශ්‍ය වූ එකම අවස්ථාව මෙය නොවන බව සඳහන් කළ යුතුය. කෙසේ වෙතත්, මෙම අවස්ථාවේදී, අපට මේ ආකාරයෙන් පමණක් දත්ත කොටසක් සපයන තෙවන පාර්ශවීය සමාගමකට බලපෑම් කළ නොහැකි විය.

apache වායු ප්රවාහය

ETL ක්‍රියාවලි ගොඩනැගීම සඳහා, අපි බොහෝ විට Apache Airflow භාවිතා කරමු. මෙම තාක්ෂණය ගැන නුහුරු නුපුරුදු පාඨකයෙකුට සන්දර්භය තුළ සහ පොදුවේ පෙනෙන ආකාරය වඩා හොඳින් අවබෝධ කර ගැනීම සඳහා, මම හඳුන්වාදීමේ කිහිපයක් විස්තර කරමි.

Apache Airflow යනු Python හි ETL (Extract-Transform-Loading) ක්‍රියාවලි තැනීමට, ක්‍රියාත්මක කිරීමට සහ අධීක්ෂණය කිරීමට භාවිතා කරන නිදහස් වේදිකාවකි. වායු ප්‍රවාහයේ ප්‍රධාන සංකල්පය අධ්‍යක්ෂණය කරන ලද ඇක්‍ලික් ප්‍රස්ථාරයකි, එහිදී ප්‍රස්ථාරයේ සිරස් නිශ්චිත ක්‍රියාවලීන් වන අතර ප්‍රස්ථාරයේ දාර යනු පාලනයේ හෝ තොරතුරු ප්‍රවාහයයි. ක්‍රියාවලියකට ඕනෑම පයිතන් ශ්‍රිතයක් සරලව හැඳින්විය හැක, නැතහොත් පන්තියක සන්දර්භය තුළ ශ්‍රිත කිහිපයක් අනුක්‍රමිකව ඇමතීමෙන් ඊට වඩා සංකීර්ණ තර්කනයක් තිබිය හැක. වඩාත් නිරන්තර මෙහෙයුම් සඳහා, ක්‍රියාවලි ලෙස භාවිතා කළ හැකි බොහෝ සූදානම් කළ වර්ධනයන් දැනටමත් තිබේ. එවැනි වර්ධනයන් ඇතුළත් වේ:

  • ක්රියාකරුවන් - එක් ස්ථානයක සිට තවත් ස්ථානයකට දත්ත මාරු කිරීම සඳහා, උදාහරණයක් ලෙස, දත්ත සමුදා වගුවක සිට දත්ත ගබඩාවකට;
  • සංවේදක - යම් සිදුවීමක් සිදු වන තෙක් බලා සිටීම සහ ප්‍රස්ථාරයේ ඊළඟ සිරස් වෙත පාලන ප්‍රවාහය යොමු කිරීම;
  • කොකු - පහළ මට්ටමේ මෙහෙයුම් සඳහා, උදාහරණයක් ලෙස, දත්ත සමුදා වගුවකින් දත්ත ලබා ගැනීම සඳහා (ප්රකාශවල භාවිතා වේ);
  • සහ එසේ ය.

මෙම ලිපියේ Apache Airflow විස්තරාත්මකව විස්තර කිරීම නුසුදුසු වනු ඇත. කෙටි හැඳින්වීම් නැරඹිය හැකිය මෙහි හෝ මෙහි.

දත්ත ලබා ගැනීම සඳහා කොක්ක

පළමුවෙන්ම, ගැටළුව විසඳීම සඳහා, අපට හැකි කොක්කක් ලිවිය යුතුය:

  • ඊමේල් වෙත සම්බන්ධ වන්න
  • නිවැරදි ලිපිය සොයා ගන්න
  • ලිපියෙන් දත්ත ලබා ගන්න.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

තර්කය මෙයයි: අපි සම්බන්ධ කරමු, අවසාන වඩාත්ම අදාළ ලිපිය සොයා ගන්න, වෙනත් අය තිබේ නම්, අපි ඒවා නොසලකා හරිමු. මෙම ශ්‍රිතය භාවිතා කරනුයේ, පසු අකුරු වල පෙර ලිපි වල සියලුම දත්ත අඩංගු වන බැවිනි. මෙය එසේ නොවේ නම්, ඔබට සියලු අකුරු අරාවක් ආපසු ලබා දිය හැකිය, නැතහොත් පළමු එක සැකසීමට සහ ඉතිරිය ඊළඟ පාස් එකේ. පොදුවේ, සෑම දෙයක්ම, සෑම විටම, කාර්යය මත රඳා පවතී.

අපි කොක්කයට සහායක කාර්යයන් දෙකක් එකතු කරමු: ගොනුවක් බාගත කිරීම සඳහා සහ ඊමේල් එකකින් සබැඳියක් භාවිතයෙන් ගොනුවක් බාගත කිරීම සඳහා. මාර්ගය වන විට, ඒවා ක්රියාකරු වෙත ගෙන යා හැකිය, එය මෙම ක්රියාකාරිත්වය භාවිතා කිරීමේ වාර ගණන මත රඳා පවතී. කොක්කයට එකතු කළ යුතු තවත් දේ, නැවතත්, කාර්යය මත රඳා පවතී: ලිපිගොනු වහාම ලිපියට ලැබුනේ නම්, ඔබට ලිපියට ඇමුණුම් බාගත කළ හැකිය, ලිපියට දත්ත ලැබුනේ නම්, ඔබ ලිපිය විග්‍රහ කළ යුතුය, ආදිය මගේ නඩුවේදී, ලිපිය ලේඛනාගාරයට එක් සබැඳියක් සමඟ පැමිණේ, එය මට නිශ්චිත ස්ථානයක තබා වැඩිදුර සැකසුම් ක්‍රියාවලිය ආරම්භ කළ යුතුය.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

කේතය සරලයි, එබැවින් එයට වැඩිදුර පැහැදිලි කිරීමක් අවශ්‍ය නොවේ. imap_conn_id මැජික් රේඛාව ගැන මම ඔබට කියන්නම්. Apache Airflow තන්තු හඳුනාගැනීමක් මඟින් ප්‍රවේශ විය හැකි සම්බන්ධතා පරාමිතීන් (පිවිසුම්, මුරපදය, ලිපිනය සහ අනෙකුත් පරාමිති) ගබඩා කරයි. දෘශ්යමය වශයෙන්, සම්බන්ධතා කළමනාකරණය මේ ආකාරයෙන් පෙනේ

Apache Airflow හි විද්‍යුත් තැපෑලෙන් දත්ත ලබා ගැනීම සඳහා ETL ක්‍රියාවලිය

දත්ත සඳහා රැඳී සිටීමට සංවේදකය

අපි දැනටමත් තැපෑලෙන් දත්ත සම්බන්ධ කර ගැනීමට සහ ලබා ගන්නා ආකාරය දන්නා බැවින්, දැන් අපට ඒවා බලා සිටීමට සංවේදකයක් ලිවිය හැකිය. මගේ නඩුවේදී, වෙනත් ක්‍රියාවලීන් වෙනත් මූලාශ්‍රවලින් (API, දුරකථන) ලබා ගන්නා ඒවා ඇතුළුව තැපෑලෙන් ලැබෙන දත්ත මත පදනම්ව ක්‍රියා කරන බැවින්, දත්ත තිබේ නම්, එය ක්‍රියාකරවන ක්‍රියාකරුවෙකු ලිවීමට ක්‍රියා කළේ නැත. , වෙබ් මිතික, ආදිය.) ආදිය). මම ඔබට උදාහරණයක් දෙන්නම්. CRM පද්ධතිය තුළ නව පරිශීලකයෙකු පෙනී සිට ඇති අතර, ඔහුගේ UUID ගැන අපි තවමත් නොදනිමු. එවිට, SIP දුරකථනයෙන් දත්ත ලබා ගැනීමට උත්සාහ කරන විට, එහි UUID සමඟ සම්බන්ධ කර ඇති ඇමතුම් අපට ලැබෙනු ඇත, නමුත් අපට ඒවා සුරැකීමට සහ නිවැරදිව භාවිතා කිරීමට නොහැකි වනු ඇත. එවැනි කාරණාවලදී, දත්තවල යැපීම මතක තබා ගැනීම වැදගත්ය, විශේෂයෙන් ඒවා විවිධ මූලාශ්රවලින් නම්. ඇත්ත වශයෙන්ම, මේවා දත්ත අඛණ්ඩතාව ආරක්ෂා කිරීම සඳහා ප්රමාණවත් පියවරයන් නොවේ, නමුත් සමහර අවස්ථාවලදී ඒවා අවශ්ය වේ. ඔව්, සම්පත් අත්පත් කර ගැනීමට නිෂ්ක්‍රීය වීම ද අතාර්කික ය.

මේ අනුව, තැපෑලෙහි නැවුම් තොරතුරු තිබේ නම් අපගේ සංවේදකය ප්‍රස්ථාරයේ පසු ශීර්ෂයන් දියත් කරනු ඇති අතර පෙර තොරතුරු අදාළ නොවන ලෙස සලකුණු කරයි.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

අපි දත්ත ලබාගෙන භාවිතා කරමු

දත්ත ලබා ගැනීමට සහ සැකසීමට, ඔබට වෙනම ක්රියාකරුවෙකු ලිවිය හැකිය, ඔබට සූදානම් කළ ඒවා භාවිතා කළ හැකිය. තර්කය තවමත් සුළුපටු බැවින් - ලිපියෙන් දත්ත ගැනීමට, උදාහරණයක් ලෙස, මම සම්මත PythonOperator යෝජනා කරමි

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

මාර්ගය වන විට, ඔබේ ආයතනික තැපෑල mail.ru හි තිබේ නම්, ඔබට විෂය, යවන්නා යනාදිය අනුව ලිපි සෙවීමට නොහැකි වනු ඇත. 2016 දී ඔවුන් එය හඳුන්වා දීමට පොරොන්දු වූ නමුත් පෙනෙන විදිහට ඔවුන්ගේ මනස වෙනස් විය. අවශ්‍ය ලිපි සඳහා වෙනම ෆෝල්ඩරයක් සාදා තැපැල් වෙබ් අතුරු මුහුණතේ අවශ්‍ය අකුරු සඳහා පෙරනයක් සැකසීමෙන් මම මෙම ගැටළුව විසඳුවෙමි. මේ අනුව, සෙවීම සඳහා අවශ්‍ය ලිපි සහ කොන්දේසි පමණක්, මගේ නඩුවේදී, සරලව (UNSEEN) මෙම ෆෝල්ඩරයට ඇතුළු වන්න.

සාරාංශගත කිරීම, අපට පහත අනුපිළිවෙල තිබේ: කොන්දේසි සපුරාලන නව අකුරු තිබේදැයි අපි පරීක්ෂා කරමු, තිබේ නම්, අපි අවසාන ලිපියෙන් සබැඳිය භාවිතා කර සංරක්ෂිතය බාගත කරමු.
අවසාන තිත් යටතේ, මෙම සංරක්ෂිතය අසුරනු ඇති බව මග හැර ඇත, ලේඛනාගාරයේ දත්ත ඉවත් කර සකසනු ලැබේ, එහි ප්‍රතිඵලයක් ලෙස, සමස්ත දෙයම ETL ක්‍රියාවලියේ නල මාර්ගයට ඉදිරියට යනු ඇත, නමුත් මෙය දැනටමත් ඉක්මවා ඇත ලිපියේ විෂය පථය. එය සිත්ගන්නාසුළු හා ප්‍රයෝජනවත් වූවා නම්, Apache Airflow සඳහා ETL විසඳුම් සහ ඒවායේ කොටස් විස්තර කිරීමට මම සතුටින් දිගටම කටයුතු කරමි.

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න