Apache Airflow ۾ اي ميل مان ڊيٽا حاصل ڪرڻ لاءِ ETL عمل

Apache Airflow ۾ اي ميل مان ڊيٽا حاصل ڪرڻ لاءِ ETL عمل

ڪابه ڳالهه نه آهي ته ٽيڪنالاجي ڪيتري به ترقي ڪري ٿي، پراڻن طريقن جو هڪ سلسلو هميشه ترقي جي پويان پيچرو آهي. اهو ٿي سگهي ٿو هڪ آسان منتقلي، انساني عنصر، ٽيڪنالاجي ضرورتن، يا ٻيو ڪجهه. ڊيٽا پروسيسنگ جي ميدان ۾، ڊيٽا جا ذريعا هن حصي ۾ سڀ کان وڌيڪ ظاهر ڪن ٿا. اسان ان مان نجات حاصل ڪرڻ جا ڪيترا به خواب ڏسندا آهيون، پر اڃا تائين ڊيٽا جو حصو انسٽنٽ ميسينجرز ۽ اي ميلن ۾ موڪليو ويندو آهي، نه ته وڌيڪ آرڪيڪ فارميٽ جو ذڪر ڪجي. مان توهان کي دعوت ڏيان ٿو اپاچي ايئر فلو لاءِ اختيارن مان هڪ کي ڌار ڪرڻ لاءِ، بيان ڪيو ته توهان اي ميلن مان ڊيٽا ڪيئن وٺي سگهو ٿا.

prehistory

ڪافي ڊيٽا اڃا تائين اي ميل ذريعي منتقل ڪئي وئي آهي، ذاتي رابطي کان وٺي ڪمپنين جي وچ ۾ رابطي جي معيار تائين. اهو سٺو آهي جيڪڏهن اهو ممڪن آهي ته ڊيٽا حاصل ڪرڻ لاء هڪ انٽرفيس لکڻ يا آفيس ۾ ماڻهن کي رکڻ لاء جيڪي هن معلومات کي وڌيڪ آسان ذريعن ۾ داخل ڪندا، پر اڪثر ڪري اهو ممڪن ناهي. مخصوص ڪم جيڪو مون کي منهن ڏيڻو پيو، بدنام CRM سسٽم کي ڊيٽا گودام سان ڳنڍڻ، ۽ پوء OLAP سسٽم سان. اهو تاريخي طور تي ٿيو آهي ته اسان جي ڪمپني لاء هن سسٽم جو استعمال ڪاروبار جي هڪ خاص علائقي ۾ آسان هو. تنهن ڪري، هرڪو واقعي چاهيو ته هن ٽئين پارٽي جي سسٽم مان ڊيٽا سان گڏ ڪم ڪرڻ جي قابل ٿي. سڀ کان پهريان، يقينا، هڪ کليل API مان ڊيٽا حاصل ڪرڻ جو امڪان اڀياس ڪيو ويو. بدقسمتي سان، API سڀني ضروري ڊيٽا حاصل ڪرڻ کي ڍڪي نه ڏني، ۽، سادي اصطلاحن ۾، اهو ڪيترن ئي طريقن سان ڀريل هو، ۽ ٽيڪنيڪل سپورٽ نه چاهيو يا اڌ رستي کي پورو نه ڪري سگهيو وڌيڪ جامع ڪارڪردگي مهيا ڪرڻ لاء. پر هن سسٽم کي وقتي طور تي غائب ڊيٽا وصول ڪرڻ جو موقعو فراهم ڪيو ويو ميل ذريعي آرڪائيو کي لوڊ ڪرڻ لاء لنڪ جي صورت ۾.

اهو ياد رکڻ گهرجي ته اهو واحد ڪيس نه هو جنهن ۾ ڪاروبار اي ميلن يا فوري پيغامن کان ڊيٽا گڏ ڪرڻ چاهيندو هو. بهرحال، هن معاملي ۾، اسان ٽئين پارٽي جي ڪمپني تي اثر انداز نه ڪري سگهون ٿا جيڪو صرف هن طريقي سان ڊيٽا جو حصو مهيا ڪري ٿو.

Apache ايئر فلو

ETL پروسيس کي تعمير ڪرڻ لاء، اسان اڪثر ڪري استعمال ڪندا آهيون Apache Airflow. هڪ پڙهندڙ لاءِ جيڪو هن ٽيڪنالاجي کان ناواقف آهي اهو بهتر سمجهڻ لاءِ ته اهو ڪيئن نظر اچي ٿو ان حوالي سان ۽ عام طور تي، مان ڪجهه تعارفي بيان ڪندس.

Apache Airflow هڪ مفت پليٽ فارم آهي جيڪو استعمال ڪيو ويندو آهي تعمير ڪرڻ، عمل ڪرڻ ۽ مانيٽر ڪرڻ لاءِ ETL (Extract-Transform-Loading) پروسيس پائٿون ۾. ايئر فلو ۾ بنيادي تصور هڪ هدايت ٿيل ايڪيڪلڪ گراف آهي، جتي گراف جي چوٽي مخصوص عمل آهن، ۽ گراف جي ڪنارن تي ڪنٽرول يا معلومات جي وهڪري آهي. هڪ عمل صرف ڪنهن به Python فنڪشن کي ڪال ڪري سگهي ٿو، يا ان ۾ وڌيڪ پيچيده منطق ٿي سگهي ٿو ترتيب سان ڪيترن ئي افعال کي هڪ طبقي جي حوالي سان سڏڻ کان. سڀ کان وڌيڪ عملن لاء، اڳ ۾ ئي ڪيترائي تيار ڪيل ترقيات آهن جيڪي پروسيس جي طور تي استعمال ڪري سگھجن ٿيون. اهڙيون ترقيون شامل آهن:

  • آپريٽرز - ڊيٽا جي منتقلي لاء هڪ جڳهه کان ٻئي ڏانهن، مثال طور، ڊيٽابيس جي ٽيبل کان ڊيٽا گودام تائين؛
  • sensors - هڪ خاص واقعي جي واقعن جي انتظار ۾ ۽ ڪنٽرول جي وهڪري کي گراف جي ايندڙ چوڪن ڏانهن هدايت ڪرڻ لاء؛
  • ٿلهو - هيٺين سطح جي عملن لاءِ، مثال طور، ڊيٽابيس ٽيبل مان ڊيٽا حاصل ڪرڻ (بيانن ۾ استعمال ٿيل)؛
  • ۽ وغيره تي.

هن آرٽيڪل ۾ تفصيل سان Apache Airflow کي بيان ڪرڻ غير مناسب ٿيندو. مختصر تعارف ڏسي سگھجي ٿو هتي يا هتي.

ڊيٽا حاصل ڪرڻ لاء ٿلهو

سڀ کان پهريان، مسئلو حل ڪرڻ لاء، اسان کي هڪ ٿلهو لکڻ جي ضرورت آهي جنهن سان اسين ڪري سگهون ٿا:

  • اي ميل سان ڳنڍيو
  • صحيح خط ڳوليو
  • خط مان ڊيٽا حاصل ڪريو.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

منطق هي آهي: اسان ڳنڍيون ٿا، آخري سڀ کان وڌيڪ لاڳاپيل خط ڳوليو، جيڪڏهن ٻيا آهن، اسان انهن کي نظرانداز ڪريون ٿا. هي فنڪشن استعمال ڪيو ويو آهي، ڇاڪاڻ ته بعد ۾ خط اڳين جي سڀني ڊيٽا تي مشتمل آهي. جيڪڏهن اهو معاملو نه آهي، ته پوء توهان سڀني اکرن جي هڪ صف کي واپس ڪري سگهو ٿا، يا پهرين کي پروسيس ڪري سگهو ٿا، ۽ باقي ايندڙ پاس تي. عام طور تي، هر شيء، هميشه وانگر، ڪم تي منحصر آهي.

اسان ٿلهو ۾ ٻه معاون افعال شامل ڪندا آهيون: هڪ فائل ڊائون لوڊ ڪرڻ ۽ اي ميل مان لنڪ استعمال ڪندي فائل ڊائون لوڊ ڪرڻ لاءِ. رستي ۾، اهي آپريٽر ڏانهن منتقل ڪري سگھجن ٿيون، اهو هن ڪارڪردگي کي استعمال ڪرڻ جي تعدد تي منحصر آهي. ٿلهو ۾ شامل ڪرڻ لاء ٻيو ڇا آهي، ٻيهر، ڪم تي منحصر آهي: جيڪڏهن فائلون خط ۾ فوري طور تي وصول ڪيون ويون آهن، پوء توهان خط ۾ منسلڪ ڊائون لوڊ ڪري سگهو ٿا، جيڪڏهن ڊيٽا خط ۾ ملي ٿي، پوء توهان کي خط کي پارس ڪرڻو پوندو، وغيره منهنجي صورت ۾، خط هڪ آرڪائيو جي لنڪ سان گڏ اچي ٿو، جنهن کي مون کي هڪ خاص جاء تي رکڻو پوندو ۽ وڌيڪ پروسيسنگ جي عمل کي شروع ڪرڻ جي ضرورت آهي.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

ڪوڊ سادو آهي، تنهنڪري ان کي وڌيڪ وضاحت جي ضرورت آهي. مان صرف توهان کي جادو لائن جي باري ۾ ٻڌايان ٿو imap_conn_id. Apache Airflow اسٽور ڪنيڪشن پيٽرولر (لاگ ان، پاسورڊ، ايڊريس، ۽ ٻيا پيٽرولر) جيڪي هڪ اسٽرنگ سڃاڻپ ڪندڙ طرفان رسائي سگهجن ٿيون. بصري طور، ڪنيڪشن جو انتظام هن طرح نظر اچي ٿو

Apache Airflow ۾ اي ميل مان ڊيٽا حاصل ڪرڻ لاءِ ETL عمل

ڊيٽا لاء انتظار ڪرڻ لاء سينسر

جيئن ته اسان اڳ ۾ ئي ڄاڻون ٿا ته ميل مان ڊيٽا ڪيئن ڳنڍڻ ۽ وصول ڪرڻ، اسان هاڻي انهن لاء انتظار ڪرڻ لاء هڪ سينسر لکي سگهون ٿا. منهنجي صورت ۾، اهو ڪم نه ڪيو ويو ته هڪ آپريٽر کي لکڻ لاء صحيح طور تي جيڪو ڊيٽا کي پروسيس ڪندو، جيڪڏهن ڪو هجي، ڇاڪاڻ ته ٻيا عمل ميل مان حاصل ڪيل ڊيٽا جي بنياد تي ڪم ڪن ٿا، بشمول اهي جيڪي ٻين ذريعن کان لاڳاپيل ڊيٽا وٺندا آهن (API، telephony ، ويب ميٽرڪس، وغيره) وغيره). مان توهان کي هڪ مثال ڏيندس. CRM سسٽم ۾ هڪ نئون صارف ظاهر ٿيو آهي، ۽ اسان اڃا تائين هن جي UUID بابت نٿا ڄاڻون. پوء، جڏهن SIP ٽيليفون مان ڊيٽا حاصل ڪرڻ جي ڪوشش ڪندا، اسان ان جي UUID سان ڳنڍيل ڪالون وصول ڪنداسين، پر اسان انهن کي صحيح طريقي سان محفوظ ۽ استعمال ڪرڻ جي قابل نه هوندا. اهڙين معاملن ۾، ڊيٽا جي انحصار کي ذهن ۾ رکڻ ضروري آهي، خاص طور تي جيڪڏهن اهي مختلف ذريعن کان آهن. اهي، يقينا، ڊيٽا جي سالميت کي محفوظ ڪرڻ لاء ڪافي قدم آهن، پر ڪجهه حالتن ۾ اهي ضروري آهن. ها، ۽ وسيلن تي قبضو ڪرڻ به غير منطقي آهي.

اهڙيءَ طرح، اسان جو سينسر گراف جي ايندڙ چوڪن کي لانچ ڪندو جيڪڏهن ميل ۾ تازي معلومات هجي، ۽ اڳئين معلومات کي غير لاڳاپيل طور نشان لڳايو.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

اسان ڊيٽا وصول ۽ استعمال ڪندا آهيون

ڊيٽا حاصل ڪرڻ ۽ پروسيس ڪرڻ لاء، توهان هڪ الڳ آپريٽر لکي سگهو ٿا، توهان استعمال ڪري سگهو ٿا تيار ڪيل. هن وقت تائين منطق معمولي آهي - خط مان ڊيٽا وٺڻ لاءِ، مثال طور، مان تجويز ڪريان ٿو معياري PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

رستي ۾، جيڪڏهن توهان جو ڪارپوريٽ ميل پڻ mail.ru تي آهي، ته پوء توهان موضوع، موڪليندڙ، وغيره ذريعي خط ڳولڻ جي قابل نه هوندا. 2016 ۾ واپس، انهن ان کي متعارف ڪرائڻ جو واعدو ڪيو، پر ظاهري طور تي انهن جي ذهنن کي تبديل ڪيو. مون ضروري خطن لاءِ الڳ فولڊر ٺاهي ۽ ميل ويب انٽرفيس ۾ ضروري اکرن لاءِ فلٽر قائم ڪري اهو مسئلو حل ڪيو. اهڙيءَ طرح، صرف ضروري اکر ۽ ڳولها لاءِ شرطون، منهنجي صورت ۾، رڳو (UNSEEN) هن فولڊر ۾ حاصل ڪريو.

اختصار ڪندي، اسان وٽ هيٺيون ترتيب آهي: اسان چيڪ ڪريون ٿا ته ڪي نوان اکر آهن جيڪي شرطن کي پورا ڪن ٿا، جيڪڏهن آهن، ته پوءِ اسان آخري خط جي لنڪ استعمال ڪندي آرڪائيو ڊائون لوڊ ڪريون ٿا.
آخري نقطن جي تحت، اهو ختم ڪيو ويو آهي ته هي آرڪائيو انپيڪ ڪيو ويندو، آرڪائيو مان ڊيٽا صاف ۽ پروسيس ڪيو ويندو، ۽ نتيجي ۾، سڄي شيء ETL پروسيس جي پائپ لائن ڏانهن اڳتي وڌندي، پر اهو اڳ ۾ ئي آهي. مضمون جو دائرو. جيڪڏهن اهو دلچسپ ۽ ڪارائتو ٿي ويو، ته پوء آئون خوشيء سان ETL حل ۽ اپاچي ايئر فلو لاء انهن جا حصا بيان ڪرڻ جاري رکندس.

جو ذريعو: www.habr.com

تبصرو شامل ڪريو