د اپاچي ایر فلو کې د بریښنالیک څخه ډیټا ترلاسه کولو لپاره د ETL پروسه

د اپاچي ایر فلو کې د بریښنالیک څخه ډیټا ترلاسه کولو لپاره د ETL پروسه

مهمه نده چې څومره ټیکنالوژي وده کوي، د پخوانیو تګلارو لړۍ تل د پرمختګ شاته ځي. دا کیدای شي د اسانه لیږد، بشري عوامل، تخنیکي اړتیاوو، یا بل څه له امله وي. د معلوماتو پروسس کولو په برخه کې، د معلوماتو سرچینې په دې برخه کې خورا ښکاره دي. مهمه نده چې موږ څومره له دې څخه د خلاصون خوب وینو ، مګر تر دې دمه د معلوماتو یوه برخه په فوري میسینجرونو او بریښنالیکونو لیږل کیږي ، نه د نورو لرغونو فارمیټونو یادونه. زه تاسو ته بلنه درکوم چې د اپاچي ایر فلو لپاره یو له انتخابونو څخه جلا کړئ، دا په ډاګه کوي چې تاسو څنګه کولی شئ د بریښنالیکونو څخه ډاټا واخلئ.

له تاریخ څخه دمخه

ډیری معلومات لاهم د بریښنالیک له لارې لیږدول کیږي، د خپلمنځي اړیکو څخه د شرکتونو ترمنځ د تعامل معیارونو ته. دا ښه ده که چیرې دا ممکنه وي چې د معلوماتو ترلاسه کولو لپاره انٹرفیس ولیکئ یا په دفتر کې خلک ځای په ځای کړئ څوک چې دا معلومات ډیرو مناسبو سرچینو ته ننوځي ، مګر ډیری وختونه دا ممکن ممکن نه وي. هغه ځانګړی دنده چې زه ورسره مخ وم د بدنام CRM سیسټم د ډیټا ګودام سره وصل کول او بیا د OLAP سیسټم سره. دا په تاریخي توګه پیښ شوي چې زموږ د شرکت لپاره د دې سیسټم کارول د سوداګرۍ په یوه ځانګړې ساحه کې اسانه وه. له همدې امله ، هرڅوک واقعیا غوښتل د دې وړتیا ولري چې د دې دریمې ډلې سیسټم څخه ډیټا سره هم کار وکړي. لومړی، البته، د خلاص API څخه د معلوماتو ترلاسه کولو امکان مطالعه شوی. له بده مرغه، API د ټولو اړینو معلوماتو ترلاسه کولو پوښښ نه و کړی، او په ساده شرایطو کې، دا په ډیری لارو کې ګډوډ و، او تخنیکي مالتړ نه غوښتل یا نیمه لاره پوره کړي ترڅو نور پراخ فعالیت چمتو کړي. مګر دې سیسټم دا فرصت برابر کړ چې په دوره توګه ورک شوي ډاټا د بریښنالیک له لارې د آرشیف د پورته کولو لپاره د لینک په بڼه ترلاسه کړي.

دا باید په یاد ولرئ چې دا یوازینۍ قضیه نه وه چې سوداګرۍ یې غوښتل د بریښنالیکونو یا فوري میسنجرونو څخه ډاټا راټول کړي. په هرصورت، پدې حالت کې، موږ نشو کولی د دریمې ډلې شرکت اغیزه وکړو چې یوازې پدې ډول د معلوماتو برخه چمتو کوي.

د اپاچي هوا جریان

د ETL پروسې رامینځته کولو لپاره ، موږ ډیری وختونه د اپاچي ایر فلو کاروو. د یو لوستونکي لپاره چې د دې ټیکنالوژۍ سره نا اشنا وي ترڅو ښه پوه شي چې دا په شرایطو او په عموم کې څنګه ښکاري ، زه به یو څو تعارفي توضیحات بیان کړم.

د اپاچي ایر فلو یو وړیا پلیټ فارم دی چې په پایتون کې د ETL (استخراج-ترانسفارم-لوډینګ) پروسې جوړولو ، اجرا کولو او څارلو لپاره کارول کیږي. د هوا جریان کې اصلي مفهوم یو لارښود ایسکلیک ګراف دی ، چیرې چې د ګراف عمودی ځانګړي پروسې دي ، او د ګراف څنډې د کنټرول یا معلوماتو جریان دي. یوه پروسه کولی شي په ساده ډول د Python فنکشن ته زنګ ووهي، یا دا د ټولګي په شرایطو کې په ترتیب سره د څو فنکشنونو غږولو څخه ډیر پیچلي منطق لري. د ډیری پرله پسې عملیاتو لپاره ، دمخه ډیری چمتو شوي پرمختګونه شتون لري چې د پروسو په توګه کارول کیدی شي. دا ډول پرمختګونه عبارت دي له:

  • چلونکي - د یو ځای څخه بل ځای ته د معلوماتو لیږدولو لپاره، د بیلګې په توګه، د ډیټابیس میز څخه د معلوماتو ګودام ته؛
  • سینسرونه - د یوې ټاکلې پیښې پیښې ته انتظار کولو لپاره او د ګراف راتلونکو څنډو ته د کنټرول جریان لارښود کول؛
  • hooks - د ټیټې کچې عملیاتو لپاره، د بیلګې په توګه، د ډیټابیس میز څخه ډاټا ترلاسه کول (په بیانونو کې کارول کیږي)؛
  • او داسې نور.

دا به نامناسب وي چې په دې مقاله کې د اپاچي هوایی جریان په تفصیل سره تشریح کړئ. لنډ پیژندنه لیدل کیدی شي دلته او یا دلته.

د معلوماتو ترلاسه کولو لپاره هک

تر ټولو لومړی، د ستونزې د حل لپاره، موږ باید یو هک ولیکئ چې موږ یې کولی شو:

  • بریښنالیک سره وصل کړئ
  • سم لیک پیدا کړئ
  • د لیک څخه معلومات ترلاسه کړئ.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

منطق دا دی: موږ وصل کوو، وروستی خورا اړونده لیک ومومئ، که نور شتون ولري، موږ یې له پامه غورځوو. دا فنکشن کارول کیږي، ځکه چې وروسته لیکونه د مخکینیو لیکو ټول معلومات لري. که دا قضیه نده، نو تاسو کولی شئ د ټولو لیکونو لړۍ بیرته راوباسئ، یا لومړی یې پروسس کړئ، او پاتې نور په راتلونکي پاس کې. په عموم کې، هر څه، د تل په څیر، په دنده پورې اړه لري.

موږ په هک کې دوه مرستندویه دندې اضافه کوو: د فایل ډاونلوډ کولو لپاره او د بریښنالیک څخه د لینک په کارولو سره د فایل ډاونلوډ کولو لپاره. د لارې په توګه، دوی آپریټر ته لیږدول کیدی شي، دا د دې فعالیت کارولو فریکونسۍ پورې اړه لري. په هک کې نور څه اضافه کول ، بیا په دنده پورې اړه لري: که چیرې فایلونه سمدلاسه په لیک کې ترلاسه شي ، نو تاسو کولی شئ لیک ته ضمیمه ډاونلوډ کړئ ، که چیرې معلومات په لیک کې ترلاسه شي ، نو تاسو اړتیا لرئ لیک پارس کړئ ، etc زما په قضیه کې، لیک آرشیف ته د یوې لینک سره راځي، کوم چې زه اړتیا لرم چې یو مشخص ځای کې واچوم او د پروسس نور بهیر پیل کړم.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

کوډ ساده دی، نو دا په سختۍ سره نور وضاحت ته اړتیا لري. زه به تاسو ته یوازې د جادو لاین imap_conn_id په اړه ووایم. د اپاچي ایر فلو د پیوستون پیرامیټونه ذخیره کوي (ننوتل ، پټنوم ، پته او نور پیرامیټونه) چې د تار پیژندونکي لخوا لاسرسی کیدی شي. په لید کې، د ارتباط مدیریت داسې ښکاري

د اپاچي ایر فلو کې د بریښنالیک څخه ډیټا ترلاسه کولو لپاره د ETL پروسه

د معلوماتو انتظار کولو لپاره سینسر

څنګه چې موږ دمخه پوهیږو چې څنګه د بریښنالیک څخه ډیټا وصل او ترلاسه کړو ، موږ اوس کولی شو د دوی انتظار کولو لپاره سینسر ولیکو. زما په قضیه کې، دا کار نه کوي چې سمدلاسه یو آپریټر ولیکئ چې ډاټا پروسس کړي، که کوم وي، ځکه چې نورې پروسې د میل څخه ترلاسه شوي معلوماتو پراساس کار کوي، پشمول هغه چې د نورو سرچینو څخه اړوند معلومات اخلي (API، ټیلفوني , web metrics, etc.). etc.). زه به تاسو ته یو مثال درکړم. یو نوی کارن د CRM سیسټم کې څرګند شوی، او موږ لاهم د هغه د UUID په اړه نه پوهیږو. بیا، کله چې د SIP ټیلیفوني څخه ډاټا ترلاسه کولو هڅه وکړو، موږ به د دې UUID سره تړلي تلیفونونه ترلاسه کړو، مګر موږ به ونه توانیږو چې خوندي او په سمه توګه یې وکاروو. په داسې مسلو کې، دا مهمه ده چې د معلوماتو انحصار په پام کې ونیسئ، په ځانګړې توګه که دوی د مختلفو سرچینو څخه وي. دا، البته، د معلوماتو بشپړتیا ساتلو لپاره ناکافي اقدامات دي، مګر په ځینو مواردو کې دا اړین دي. هو، او د منابعو په نیولو کې سستي کول هم غیر منطقي دي.

په دې توګه، زموږ سینسر به د ګراف راتلونکی عمودی پیل کړي که چیرې په بریښنالیک کې تازه معلومات شتون ولري، او همدارنګه پخواني معلومات د غیر معقول په توګه په نښه کړي.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

موږ ډاټا ترلاسه کوو او کاروو

د معلوماتو ترلاسه کولو او پروسس کولو لپاره، تاسو کولی شئ یو جلا آپریټر ولیکئ، تاسو کولی شئ چمتو شوي وکاروئ. له هغه ځایه چې تر دې دمه منطق کوچنی دی - د لیک څخه ډاټا اخیستلو لپاره ، د مثال په توګه ، زه وړاندیز کوم معیاري PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

په هرصورت، که ستاسو کارپوریټ بریښنالیک هم په mail.ru کې وي، نو تاسو به د موضوع، لیږونکي، او داسې نور په واسطه د لیکونو لټون ونه کړئ. بیرته په 2016 کې، دوی ژمنه وکړه چې دا معرفي کړي، مګر ظاهرا خپل فکر بدل کړ. ما دا ستونزه د اړینو لیکونو لپاره جلا فولډر رامینځته کولو او د بریښنالیک ویب انٹرفیس کې د اړینو لیکونو لپاره د فلټر په جوړولو سره حل کړه. په دې توګه، د لټون لپاره یوازې اړین لیکونه او شرایط، زما په قضیه کې، په ساده ډول (نه لیدل شوي) دې فولډر ته راځي.

لنډیز، موږ لاندې ترتیب لرو: موږ ګورو چې ایا نوي لیکونه شتون لري چې شرایط پوره کوي، که شتون ولري، نو موږ د وروستي لیک څخه د لینک په کارولو سره آرشیف ډاونلوډ کوو.
د وروستي نقطو لاندې ، دا پریښودل شوي چې دا آرشیف به خلاص شي ، د آرشیف څخه ډاټا به پاکه او پروسس شي ، او په پایله کې به ټول شی د ETL پروسې پایپ لاین ته لاړ شي ، مګر دا لا دمخه دی. د مقالې ساحه که دا په زړه پوري او ګټور وګرځي ، نو زه به په خوښۍ سره د اپاچي ایر فلو لپاره د ETL حلونو او د دوی برخو تشریح کولو ته دوام ورکړم.

سرچینه: www.habr.com

Add a comment