Apache Airflow میں ای میل سے ڈیٹا حاصل کرنے کے لیے ETL عمل

Apache Airflow میں ای میل سے ڈیٹا حاصل کرنے کے لیے ETL عمل

اس سے کوئی فرق نہیں پڑتا ہے کہ ٹکنالوجی کتنی ہی ترقی کرتی ہے، پرانے طریقوں کا ایک سلسلہ ہمیشہ ترقی کے پیچھے چلتا ہے۔ یہ ہموار منتقلی، انسانی عوامل، تکنیکی ضروریات، یا کسی اور چیز کی وجہ سے ہو سکتا ہے۔ ڈیٹا پروسیسنگ کے میدان میں، ڈیٹا کے ذرائع اس حصے میں سب سے زیادہ ظاہر ہوتے ہیں۔ اس بات سے کوئی فرق نہیں پڑتا ہے کہ ہم اس سے چھٹکارا پانے کے کتنے ہی خواب دیکھتے ہیں، لیکن اب تک ڈیٹا کا کچھ حصہ فوری میسنجرز اور ای میلز میں بھیجا جاتا ہے، مزید قدیم فارمیٹس کا ذکر نہیں کرنا۔ میں آپ کو Apache Airflow کے اختیارات میں سے ایک کو الگ کرنے کی دعوت دیتا ہوں، یہ بتاتا ہوں کہ آپ ای میلز سے ڈیٹا کیسے لے سکتے ہیں۔

پس منظر

باہمی رابطے سے لے کر کمپنیوں کے درمیان تعامل کے معیارات تک بہت سا ڈیٹا اب بھی ای میل کے ذریعے منتقل کیا جاتا ہے۔ یہ اچھا ہے اگر ڈیٹا حاصل کرنے کے لیے انٹرفیس لکھنا ممکن ہو یا ایسے لوگوں کو دفتر میں رکھا جائے جو اس معلومات کو زیادہ آسان ذرائع میں داخل کریں گے، لیکن اکثر ایسا ممکن نہیں ہوتا۔ مجھے جس مخصوص کام کا سامنا کرنا پڑا وہ بدنام زمانہ CRM سسٹم کو ڈیٹا گودام سے جوڑنا تھا، اور پھر OLAP سسٹم سے۔ تاریخی طور پر ایسا ہوا کہ ہماری کمپنی کے لیے اس نظام کا استعمال کاروبار کے کسی خاص علاقے میں آسان تھا۔ لہذا، ہر کوئی واقعی اس تھرڈ پارٹی سسٹم کے ڈیٹا کے ساتھ کام کرنے کے قابل ہونا چاہتا تھا۔ سب سے پہلے، یقینا، ایک کھلے API سے ڈیٹا حاصل کرنے کے امکان کا مطالعہ کیا گیا تھا۔ بدقسمتی سے، API نے تمام ضروری ڈیٹا حاصل کرنے کا احاطہ نہیں کیا، اور، سادہ الفاظ میں، یہ بہت سے طریقوں سے ٹیڑھا تھا، اور تکنیکی معاونت زیادہ جامع فعالیت فراہم کرنے کے لیے آدھے راستے سے نہیں چاہتی تھی یا پورا نہیں کر سکتی تھی۔ لیکن اس نظام نے وقتاً فوقتاً گمشدہ ڈیٹا کو ڈاک کے ذریعے محفوظ شدہ دستاویزات کو اتارنے کے لیے لنک کی صورت میں وصول کرنے کا موقع فراہم کیا۔

واضح رہے کہ یہ واحد معاملہ نہیں تھا جس میں کاروبار ای میلز یا انسٹنٹ میسنجر سے ڈیٹا اکٹھا کرنا چاہتا تھا۔ تاہم، اس معاملے میں، ہم کسی تیسرے فریق کی کمپنی پر اثر انداز نہیں ہو سکتے جو صرف اس طرح ڈیٹا کا حصہ فراہم کرتی ہے۔

اپاچی ایئر فلو

ای ٹی ایل کے عمل کو بنانے کے لیے، ہم اکثر اپاچی ایئر فلو کا استعمال کرتے ہیں۔ اس ٹیکنالوجی سے ناواقف قاری کو بہتر طور پر سمجھنے کے لیے کہ یہ سیاق و سباق اور عام طور پر کیسی نظر آتی ہے، میں چند تعارفی بیان کروں گا۔

Apache Airflow ایک مفت پلیٹ فارم ہے جو Python میں ETL (Extract-Transform-Loading) کے عمل کو بنانے، ان پر عملدرآمد اور نگرانی کے لیے استعمال ہوتا ہے۔ ایئر فلو میں بنیادی تصور ایک ڈائریکٹڈ ایسکلک گراف ہے، جہاں گراف کے عمودی حصے مخصوص عمل ہیں، اور گراف کے کنارے کنٹرول یا معلومات کا بہاؤ ہیں۔ ایک عمل آسانی سے کسی بھی Python فنکشن کو کال کرسکتا ہے، یا اس میں کلاس کے تناظر میں کئی فنکشنز کو ترتیب وار کال کرنے سے زیادہ پیچیدہ منطق ہوسکتی ہے۔ سب سے زیادہ کثرت سے آپریشنز کے لئے، پہلے سے ہی بہت سے تیار شدہ ترقیات ہیں جنہیں عمل کے طور پر استعمال کیا جا سکتا ہے. اس طرح کی پیشرفت میں شامل ہیں:

  • آپریٹرز - ڈیٹا کو ایک جگہ سے دوسری جگہ منتقل کرنے کے لیے، مثال کے طور پر، ڈیٹا بیس ٹیبل سے ڈیٹا گودام میں؛
  • سینسرز - کسی خاص واقعے کے وقوع پذیر ہونے کا انتظار کرنے اور کنٹرول کے بہاؤ کو گراف کے بعد کے عمودی حصوں تک پہنچانے کے لیے؛
  • ہکس - نچلی سطح کے آپریشنز کے لیے، مثال کے طور پر، ڈیٹا بیس ٹیبل سے ڈیٹا حاصل کرنے کے لیے (بیانات میں استعمال کیا جاتا ہے)؛
  • وغیرہ

اس مضمون میں اپاچی ایئر فلو کو تفصیل سے بیان کرنا نامناسب ہوگا۔ مختصر تعارف ملاحظہ کیا جا سکتا ہے۔ یہاں یا یہاں.

ڈیٹا حاصل کرنے کے لیے ہک

سب سے پہلے، مسئلہ کو حل کرنے کے لیے، ہمیں ایک ہک لکھنے کی ضرورت ہے جس کے ساتھ ہم کر سکتے ہیں:

  • ای میل سے جڑیں۔
  • صحیح خط تلاش کریں
  • خط سے ڈیٹا حاصل کریں۔

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

منطق یہ ہے: ہم جوڑتے ہیں، آخری سب سے زیادہ متعلقہ خط تلاش کرتے ہیں، اگر کوئی اور ہے تو ہم انہیں نظر انداز کر دیتے ہیں۔ یہ فنکشن استعمال کیا جاتا ہے، کیونکہ بعد کے حروف میں پہلے کے تمام اعداد و شمار ہوتے ہیں۔ اگر یہ معاملہ نہیں ہے، تو آپ تمام حروف کی ایک صف واپس کر سکتے ہیں، یا پہلے والے پر کارروائی کر سکتے ہیں، اور باقی اگلے پاس پر۔ عام طور پر، سب کچھ، ہمیشہ کی طرح، کام پر منحصر ہے.

ہم ہک میں دو معاون افعال شامل کرتے ہیں: فائل ڈاؤن لوڈ کرنے کے لیے اور ای میل سے لنک کا استعمال کرتے ہوئے فائل ڈاؤن لوڈ کرنے کے لیے۔ ویسے، وہ آپریٹر میں منتقل کیا جا سکتا ہے، یہ اس فعالیت کو استعمال کرنے کی فریکوئنسی پر منحصر ہے. ہک میں اور کیا شامل کرنا ہے، ایک بار پھر، کام پر منحصر ہے: اگر خط میں فائلیں فوری طور پر موصول ہوتی ہیں، تو آپ خط میں منسلکات ڈاؤن لوڈ کرسکتے ہیں، اگر خط میں ڈیٹا موصول ہوتا ہے، تو آپ کو خط کو پارس کرنے کی ضرورت ہے، وغیرہ میرے معاملے میں، خط آرکائیو کے ایک لنک کے ساتھ آتا ہے، جسے مجھے ایک مخصوص جگہ پر رکھ کر مزید کارروائی کا عمل شروع کرنے کی ضرورت ہے۔

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

کوڈ سادہ ہے، اس لیے اسے مزید وضاحت کی ضرورت نہیں ہے۔ میں صرف آپ کو جادوئی لائن imap_conn_id کے بارے میں بتاؤں گا۔ Apache Airflow کنکشن کے پیرامیٹرز (لاگ ان، پاس ورڈ، ایڈریس، اور دیگر پیرامیٹرز) کو اسٹور کرتا ہے جن تک سٹرنگ شناخت کنندہ کے ذریعے رسائی حاصل کی جا سکتی ہے۔ بصری طور پر، کنکشن کا انتظام اس طرح لگتا ہے۔

Apache Airflow میں ای میل سے ڈیٹا حاصل کرنے کے لیے ETL عمل

ڈیٹا کا انتظار کرنے کے لیے سینسر

چونکہ ہم پہلے ہی جانتے ہیں کہ میل سے ڈیٹا کیسے جوڑنا اور وصول کرنا ہے، اس لیے اب ہم ان کا انتظار کرنے کے لیے ایک سینسر لکھ سکتے ہیں۔ میرے معاملے میں، اس نے فوری طور پر ایک آپریٹر لکھنا کام نہیں کیا جو ڈیٹا پر کارروائی کرے گا، اگر کوئی ہو، کیونکہ دیگر عمل میل سے موصول ہونے والے ڈیٹا کی بنیاد پر کام کرتے ہیں، بشمول وہ جو دوسرے ذرائع سے متعلقہ ڈیٹا لیتے ہیں (API، ٹیلی فونی ، ویب میٹرکس، وغیرہ) وغیرہ)۔ میں آپ کو ایک مثال دوں گا۔ CRM سسٹم میں ایک نیا صارف ظاہر ہوا ہے، اور ہم ابھی تک اس کے UUID کے بارے میں نہیں جانتے ہیں۔ پھر، جب SIP ٹیلی فونی سے ڈیٹا وصول کرنے کی کوشش کی جائے گی، تو ہمیں اس کے UUID سے منسلک کالیں موصول ہوں گی، لیکن ہم انہیں صحیح طریقے سے محفوظ اور استعمال نہیں کر پائیں گے۔ ایسے معاملات میں ڈیٹا کے انحصار کو ذہن میں رکھنا ضروری ہے، خاص طور پر اگر وہ مختلف ذرائع سے ہوں۔ یہ یقیناً ڈیٹا کی سالمیت کو محفوظ رکھنے کے لیے ناکافی اقدامات ہیں، لیکن بعض صورتوں میں یہ ضروری ہیں۔ ہاں، اور وسائل پر قبضہ کرنے کے لیے سستی کرنا بھی غیر معقول ہے۔

اس طرح، اگر میل میں تازہ معلومات ہوں تو ہمارا سینسر گراف کے بعد کے چوٹیوں کو شروع کرے گا، اور پچھلی معلومات کو بھی غیر متعلقہ کے طور پر نشان زد کرے گا۔

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

ہم ڈیٹا وصول کرتے اور استعمال کرتے ہیں۔

ڈیٹا حاصل کرنے اور اس پر کارروائی کرنے کے لیے، آپ الگ آپریٹر لکھ سکتے ہیں، آپ ریڈی میڈ استعمال کر سکتے ہیں۔ چونکہ اب تک منطق معمولی ہے - خط سے ڈیٹا لینے کے لیے، مثال کے طور پر، میں تجویز کرتا ہوں معیاری PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

ویسے، اگر آپ کا کارپوریٹ میل mail.ru پر بھی ہے، تو آپ موضوع، بھیجنے والے وغیرہ کے لحاظ سے خطوط تلاش نہیں کر سکیں گے۔ واپس 2016 میں، انہوں نے اسے متعارف کرانے کا وعدہ کیا تھا، لیکن بظاہر ان کا ذہن بدل گیا۔ میں نے ضروری حروف کے لیے الگ فولڈر بنا کر اور میل ویب انٹرفیس میں ضروری حروف کے لیے فلٹر ترتیب دے کر اس مسئلے کو حل کیا۔ اس طرح، تلاش کے لیے صرف ضروری حروف اور شرائط، میرے معاملے میں، اس فولڈر میں صرف (نظر نہ آنے والے) آتے ہیں۔

خلاصہ کرتے ہوئے، ہمارے پاس مندرجہ ذیل ترتیب ہے: ہم چیک کرتے ہیں کہ آیا ایسے نئے حروف ہیں جو شرائط کو پورا کرتے ہیں، اگر ہیں، تو ہم آخری خط کے لنک کا استعمال کرتے ہوئے آرکائیو ڈاؤن لوڈ کرتے ہیں۔
آخری نقطوں کے تحت، یہ چھوڑ دیا گیا ہے کہ اس آرکائیو کو پیک کیا جائے گا، آرکائیو سے ڈیٹا کو صاف اور پروسیس کیا جائے گا، اور اس کے نتیجے میں، پوری چیز ای ٹی ایل کے عمل کی پائپ لائن تک جائے گی، لیکن یہ پہلے سے ہی آگے ہے۔ مضمون کا دائرہ کار اگر یہ دلچسپ اور مفید نکلا، تو میں خوشی سے اپاچی ایئر فلو کے لیے ای ٹی ایل حل اور ان کے پرزوں کو بیان کرتا رہوں گا۔

ماخذ: www.habr.com

نیا تبصرہ شامل کریں