عملية ETL للحصول على البيانات من البريد الإلكتروني في Apache Airflow

عملية ETL للحصول على البيانات من البريد الإلكتروني في Apache Airflow

بغض النظر عن مقدار التطور التكنولوجي ، فإن سلسلة من الأساليب التي عفا عليها الزمن تتخلف دائمًا وراء التطوير. قد يكون هذا بسبب الانتقال السلس أو العوامل البشرية أو الاحتياجات التكنولوجية أو أي شيء آخر. في مجال معالجة البيانات ، تعتبر مصادر البيانات هي الأكثر كشفًا في هذا الجزء. بغض النظر عن مدى حلمنا بالتخلص من هذا ، ولكن حتى الآن يتم إرسال جزء من البيانات في برامج المراسلة الفورية ورسائل البريد الإلكتروني ، ناهيك عن المزيد من التنسيقات القديمة. أدعوك لتفكيك أحد خيارات Apache Airflow ، موضحًا كيف يمكنك أخذ البيانات من رسائل البريد الإلكتروني.

قبل التاريخ

لا يزال يتم نقل الكثير من البيانات عبر البريد الإلكتروني ، من الاتصالات الشخصية إلى معايير التفاعل بين الشركات. إنه لأمر جيد إذا كان من الممكن كتابة واجهة للحصول على البيانات أو وضع الأشخاص في المكتب الذين سيدخلون هذه المعلومات في مصادر أكثر ملاءمة ، ولكن في كثير من الأحيان قد لا يكون هذا ممكنًا ببساطة. كانت المهمة المحددة التي واجهتها هي توصيل نظام CRM سيئ السمعة بمستودع البيانات ، ثم بنظام OLAP. لقد حدث تاريخيًا أن استخدام هذا النظام لشركتنا كان مناسبًا في مجال معين من الأعمال. لذلك ، أراد الجميع حقًا أن يكونوا قادرين على العمل مع البيانات من نظام الطرف الثالث هذا أيضًا. بادئ ذي بدء ، بالطبع ، تمت دراسة إمكانية الحصول على البيانات من واجهة برمجة التطبيقات المفتوحة. لسوء الحظ ، لم تغطي واجهة برمجة التطبيقات (API) الحصول على جميع البيانات الضرورية ، وبعبارات بسيطة ، كانت ملتوية من نواح كثيرة ، ولم يكن الدعم الفني يريد أو لا يمكن أن يفي في منتصف الطريق لتوفير وظائف أكثر شمولاً. لكن هذا النظام أتاح الفرصة لاستلام البيانات الناقصة بشكل دوري عن طريق البريد في شكل رابط لتفريغ الأرشيف.

وتجدر الإشارة إلى أن هذه لم تكن الحالة الوحيدة التي أرادت الشركة فيها جمع البيانات من رسائل البريد الإلكتروني أو برامج المراسلة الفورية. ومع ذلك ، في هذه الحالة ، لا يمكننا التأثير على شركة تابعة لجهة خارجية توفر جزءًا من البيانات بهذه الطريقة فقط.

أباتشي تدفق الهواء

لبناء عمليات ETL ، غالبًا ما نستخدم Apache Airflow. لكي يتمكن القارئ غير الملم بهذه التكنولوجيا من فهم كيفية ظهورها في السياق بشكل أفضل وبشكل عام ، سأصف اثنين من التمهيديين.

Apache Airflow هو نظام أساسي مجاني يستخدم لبناء عمليات ETL (Extract-Transform-Loading) وتنفيذها ومراقبتها في Python. المفهوم الرئيسي في Airflow هو الرسم البياني غير الدوري الموجه ، حيث تكون رؤوس الرسم البياني عمليات محددة ، وحواف الرسم البياني هي تدفق التحكم أو المعلومات. يمكن للعملية ببساطة استدعاء أي دالة Python ، أو يمكن أن يكون لها منطق أكثر تعقيدًا من استدعاء عدة وظائف بالتسلسل في سياق فئة. بالنسبة للعمليات الأكثر تكرارًا ، هناك بالفعل العديد من التطورات الجاهزة التي يمكن استخدامها كعمليات. وتشمل هذه التطورات:

  • عوامل التشغيل - لنقل البيانات من مكان إلى آخر ، على سبيل المثال ، من جدول قاعدة البيانات إلى مستودع البيانات ؛
  • أجهزة الاستشعار - لانتظار حدوث حدث معين وتوجيه تدفق التحكم إلى الرؤوس اللاحقة للرسم البياني ؛
  • الخطافات - للعمليات ذات المستوى الأدنى ، على سبيل المثال ، للحصول على البيانات من جدول قاعدة البيانات (المستخدمة في البيانات) ؛
  • إلخ

سيكون من غير المناسب وصف Apache Airflow بالتفصيل في هذه المقالة. يمكن الاطلاع على مقدمات موجزة هنا أو هنا.

ربط للحصول على البيانات

أولاً وقبل كل شيء ، لحل المشكلة ، نحتاج إلى كتابة خطاف يمكننا من خلاله:

  • الاتصال بالبريد الإلكتروني
  • ابحث عن الحرف الصحيح
  • تلقي البيانات من الرسالة.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

المنطق هو هذا: نحن نتواصل ، نعثر على الحرف الأخير الأكثر صلة ، إذا كان هناك حرف آخر ، فإننا نتجاهلهم. يتم استخدام هذه الوظيفة ، لأن الأحرف اللاحقة تحتوي على جميع بيانات الرسائل السابقة. إذا لم يكن الأمر كذلك ، فيمكنك إرجاع مصفوفة من جميع الأحرف ، أو معالجة الحرف الأول والباقي في المرور التالي. بشكل عام ، كل شيء ، كما هو الحال دائمًا ، يعتمد على المهمة.

نضيف وظيفتين مساعدتين إلى الخطاف: لتنزيل ملف وتنزيل ملف باستخدام ارتباط من بريد إلكتروني. بالمناسبة ، يمكن نقلهم إلى المشغل ، ويعتمد ذلك على تكرار استخدام هذه الوظيفة. ما يجب إضافته إلى الخطاف ، مرة أخرى ، يعتمد على المهمة: إذا تم استلام الملفات على الفور في الرسالة ، فيمكنك تنزيل المرفقات بالحرف ، وإذا تم استلام البيانات في الرسالة ، فأنت بحاجة إلى تحليل الرسالة ، إلخ. في حالتي ، تأتي الرسالة مع رابط واحد للأرشيف ، والذي أحتاج إلى وضعه في مكان معين والبدء في عملية المعالجة الإضافية.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

الشفرة بسيطة ، لذا فهي بالكاد تحتاج إلى مزيد من الشرح. سأخبرك فقط عن الخط السحري imap_conn_id. يخزن Apache Airflow معلمات الاتصال (تسجيل الدخول وكلمة المرور والعنوان والمعلمات الأخرى) التي يمكن الوصول إليها عن طريق معرف السلسلة. بصريا ، تبدو إدارة الاتصال هكذا

عملية ETL للحصول على البيانات من البريد الإلكتروني في Apache Airflow

جهاز استشعار لانتظار البيانات

نظرًا لأننا نعرف بالفعل كيفية الاتصال واستقبال البيانات من البريد ، يمكننا الآن كتابة أداة استشعار لانتظارها. في حالتي ، لم ينجح الأمر في كتابة عامل على الفور يقوم بمعالجة البيانات ، إن وجدت ، لأن العمليات الأخرى تعمل بناءً على البيانات المستلمة من البريد ، بما في ذلك تلك التي تأخذ البيانات ذات الصلة من مصادر أخرى (API ، والمهاتفة ، ومقاييس الويب ، وما إلى ذلك). وما إلى ذلك). سأعطيك مثالا. ظهر مستخدم جديد في نظام CRM وما زلنا لا نعرف عن UUID الخاص به. بعد ذلك ، عند محاولة تلقي البيانات من SIP telephony ، سوف نتلقى مكالمات مرتبطة بـ UUID الخاص به ، لكننا لن نتمكن من حفظها واستخدامها بشكل صحيح. في مثل هذه الأمور ، من المهم أن تضع في اعتبارك اعتماد البيانات ، خاصة إذا كانت من مصادر مختلفة. هذه ، بالطبع ، تدابير غير كافية للحفاظ على سلامة البيانات ، لكنها ضرورية في بعض الحالات. نعم ، والتباطؤ في احتلال الموارد هو أيضًا غير منطقي.

وبالتالي ، سيطلق المستشعر الخاص بنا رؤوسًا لاحقة للرسم البياني إذا كانت هناك معلومات جديدة في البريد ، كما سيحدد المعلومات السابقة على أنها غير ذات صلة.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

نتلقى ونستخدم البيانات

لتلقي البيانات ومعالجتها ، يمكنك كتابة عامل منفصل ، يمكنك استخدام مشغلات جاهزة. نظرًا لأن المنطق لا يزال تافهًا - لأخذ البيانات من الحرف ، على سبيل المثال ، أقترح PythonOperator القياسي

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

بالمناسبة ، إذا كان بريد شركتك موجودًا أيضًا على mail.ru ، فلن تتمكن من البحث عن الرسائل حسب الموضوع أو المرسل وما إلى ذلك. مرة أخرى في عام 2016 ، وعدوا بتقديمه ، لكن على ما يبدو غيروا رأيهم. لقد قمت بحل هذه المشكلة عن طريق إنشاء مجلد منفصل للأحرف الضرورية وإعداد مرشح للأحرف الضرورية في واجهة الويب الخاصة بالبريد. وبالتالي ، فقط الحروف والشروط اللازمة للبحث ، في حالتي ، ببساطة (UNSEEN) تدخل في هذا المجلد.

للتلخيص ، لدينا التسلسل التالي: نتحقق مما إذا كانت هناك أحرف جديدة تستوفي الشروط ، إذا كانت موجودة ، ثم نقوم بتنزيل الأرشيف باستخدام الرابط من الحرف الأخير.
تحت النقاط الأخيرة ، تم حذف أن هذا الأرشيف سيتم تفكيكه ، وسيتم مسح البيانات من الأرشيف ومعالجتها ، ونتيجة لذلك ، سيذهب الأمر برمته إلى خط أنابيب عملية ETL ، ولكن هذا بالفعل يتجاوز نطاق المقال. إذا كان الأمر ممتعًا ومفيدًا ، فسأواصل بكل سرور وصف حلول ETL وأجزائها لـ Apache Airflow.

المصدر: www.habr.com

إضافة تعليق