Quy trình ETL để nhận dữ liệu từ email trong Apache Airflow

Quy trình ETL để nhận dữ liệu từ email trong Apache Airflow

Dù công nghệ có phát triển đến đâu thì một chuỗi các phương pháp lỗi thời vẫn luôn đi sau sự phát triển. Điều này có thể là do quá trình chuyển đổi suôn sẻ, yếu tố con người, nhu cầu công nghệ hoặc điều gì khác. Trong lĩnh vực xử lý dữ liệu, nguồn dữ liệu được tiết lộ nhiều nhất trong phần này. Cho dù chúng ta có mơ ước thoát khỏi điều này đến mức nào, nhưng cho đến nay một phần dữ liệu được gửi qua tin nhắn tức thời và email, chưa kể đến các định dạng cổ xưa hơn. Tôi mời bạn phân tách một trong các tùy chọn cho Luồng khí Apache, minh họa cách bạn có thể lấy dữ liệu từ email.

thời tiền sử

Rất nhiều dữ liệu vẫn được truyền qua e-mail, từ giao tiếp giữa các cá nhân đến tiêu chuẩn tương tác giữa các công ty. Thật tốt nếu có thể viết một giao diện để lấy dữ liệu hoặc đưa những người vào văn phòng, những người sẽ nhập thông tin này vào các nguồn thuận tiện hơn, nhưng thường thì điều này đơn giản là không thể thực hiện được. Nhiệm vụ cụ thể mà tôi phải đối mặt là kết nối hệ thống CRM khét tiếng với kho dữ liệu, sau đó với hệ thống OLAP. Nó đã xảy ra trong lịch sử rằng đối với công ty chúng tôi, việc sử dụng hệ thống này thuận tiện trong một lĩnh vực kinh doanh cụ thể. Do đó, mọi người thực sự muốn có thể hoạt động với dữ liệu từ hệ thống của bên thứ ba này. Tất nhiên, trước hết, khả năng lấy dữ liệu từ API mở đã được nghiên cứu. Thật không may, API không bao gồm việc lấy tất cả dữ liệu cần thiết và nói một cách đơn giản, nó bị sai lệch theo nhiều cách và bộ phận hỗ trợ kỹ thuật không muốn hoặc không thể đáp ứng nửa chừng để cung cấp chức năng toàn diện hơn. Nhưng hệ thống này đã cung cấp cơ hội để nhận định kỳ dữ liệu bị thiếu qua thư dưới dạng liên kết để dỡ kho lưu trữ.

Cần lưu ý rằng đây không phải là trường hợp duy nhất mà doanh nghiệp muốn thu thập dữ liệu từ email hoặc tin nhắn tức thời. Tuy nhiên, trong trường hợp này, chúng tôi không thể tác động đến một công ty bên thứ ba chỉ cung cấp một phần dữ liệu theo cách này.

Luồng khí Apache

Để xây dựng các quy trình ETL, chúng tôi thường sử dụng Apache Airflow. Để một độc giả không quen thuộc với công nghệ này hiểu rõ hơn về giao diện của nó trong ngữ cảnh và nói chung, tôi sẽ mô tả một vài điều giới thiệu.

Apache Airflow là một nền tảng miễn phí được sử dụng để xây dựng, thực thi và giám sát các quy trình ETL (Trích xuất-Chuyển đổi-Tải) trong Python. Khái niệm chính trong Luồng không khí là biểu đồ tuần hoàn có hướng, trong đó các đỉnh của biểu đồ là các quy trình cụ thể và các cạnh của biểu đồ là luồng điều khiển hoặc thông tin. Một quy trình có thể đơn giản gọi bất kỳ hàm Python nào hoặc nó có thể có logic phức tạp hơn từ việc gọi tuần tự một số hàm trong ngữ cảnh của một lớp. Đối với các hoạt động thường xuyên nhất, đã có nhiều phát triển làm sẵn có thể được sử dụng làm quy trình. Những phát triển như vậy bao gồm:

  • toán tử - để truyền dữ liệu từ nơi này sang nơi khác, ví dụ: từ bảng cơ sở dữ liệu sang kho dữ liệu;
  • cảm biến - để chờ sự xuất hiện của một sự kiện nhất định và hướng luồng điều khiển đến các đỉnh tiếp theo của biểu đồ;
  • móc - ví dụ như cho các hoạt động cấp thấp hơn để lấy dữ liệu từ bảng cơ sở dữ liệu (được sử dụng trong các câu lệnh);
  • vv

Sẽ không phù hợp nếu mô tả chi tiết Luồng không khí của Apache trong bài viết này. Giới thiệu ngắn gọn có thể được xem đây hoặc đây.

Móc để lấy dữ liệu

Trước hết, để giải quyết vấn đề, chúng ta cần viết một hook mà chúng ta có thể:

  • kết nối với email
  • tìm đúng chữ cái
  • nhận dữ liệu từ thư.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logic là thế này: chúng tôi kết nối, tìm chữ cái cuối cùng phù hợp nhất, nếu có những chữ cái khác, chúng tôi bỏ qua chúng. Hàm này được sử dụng vì các chữ cái sau chứa tất cả dữ liệu của các chữ cái trước đó. Nếu đây không phải là trường hợp, thì bạn có thể trả về một mảng gồm tất cả các chữ cái hoặc xử lý chữ cái đầu tiên và phần còn lại ở lượt tiếp theo. Nói chung, mọi thứ, như mọi khi, phụ thuộc vào nhiệm vụ.

Chúng tôi thêm hai chức năng phụ trợ vào hook: để tải xuống tệp và tải xuống tệp bằng liên kết từ email. Nhân tiện, chúng có thể được chuyển đến người vận hành, điều này phụ thuộc vào tần suất sử dụng chức năng này. Một lần nữa, những gì khác để thêm vào móc phụ thuộc vào nhiệm vụ: nếu các tệp được nhận ngay trong thư, thì bạn có thể tải xuống các tệp đính kèm vào thư, nếu dữ liệu được nhận trong thư, thì bạn cần phân tích cú pháp của thư, vân vân. Trong trường hợp của tôi, bức thư đi kèm với một liên kết đến kho lưu trữ mà tôi cần đặt ở một nơi nhất định và bắt đầu quá trình xử lý tiếp theo.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Mã này đơn giản nên hầu như không cần giải thích thêm. Tôi sẽ chỉ cho bạn biết về dòng ma thuật imap_conn_id. Apache Airflow lưu trữ các tham số kết nối (đăng nhập, mật khẩu, địa chỉ và các tham số khác) có thể được truy cập bằng mã định danh chuỗi. Trực quan, quản lý kết nối trông như thế này

Quy trình ETL để nhận dữ liệu từ email trong Apache Airflow

Cảm biến chờ dữ liệu

Vì chúng ta đã biết cách kết nối và nhận dữ liệu từ mail nên bây giờ chúng ta có thể viết một cảm biến để chờ chúng. Trong trường hợp của tôi, việc viết ngay một toán tử sẽ xử lý dữ liệu, nếu có, sẽ không hiệu quả vì các quy trình khác hoạt động dựa trên dữ liệu nhận được từ thư, bao gồm cả những quy trình lấy dữ liệu liên quan từ các nguồn khác (API, điện thoại , số liệu web, v.v.). v.v.). Tôi sẽ cho bạn một ví dụ. Một người dùng mới đã xuất hiện trong hệ thống CRM và chúng tôi vẫn chưa biết về UUID của anh ấy. Sau đó, khi cố gắng nhận dữ liệu từ điện thoại SIP, chúng tôi sẽ nhận được các cuộc gọi được liên kết với UUID của nó, nhưng chúng tôi sẽ không thể lưu và sử dụng chúng một cách chính xác. Trong những vấn đề như vậy, điều quan trọng là phải ghi nhớ sự phụ thuộc của dữ liệu, đặc biệt nếu chúng đến từ các nguồn khác nhau. Tất nhiên, đây là những biện pháp không đủ để bảo vệ tính toàn vẹn của dữ liệu, nhưng trong một số trường hợp, chúng là cần thiết. Vâng, và chạy không tải để chiếm tài nguyên cũng là không hợp lý.

Do đó, cảm biến của chúng tôi sẽ khởi chạy các đỉnh tiếp theo của biểu đồ nếu có thông tin mới trong thư và cũng đánh dấu thông tin trước đó là không liên quan.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Chúng tôi nhận và sử dụng dữ liệu

Để nhận và xử lý dữ liệu, bạn có thể viết toán tử riêng, bạn có thể sử dụng toán tử làm sẵn. Vì logic vẫn còn tầm thường - chẳng hạn để lấy dữ liệu từ chữ cái, tôi đề xuất PythonOperator tiêu chuẩn

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Nhân tiện, nếu thư công ty của bạn cũng có trên mail.ru, thì bạn sẽ không thể tìm kiếm thư theo chủ đề, người gửi, v.v. Trở lại năm 2016, họ hứa sẽ giới thiệu nó, nhưng dường như họ đã đổi ý. Tôi đã giải quyết vấn đề này bằng cách tạo một thư mục riêng cho các chữ cái cần thiết và thiết lập bộ lọc cho các chữ cái cần thiết trong giao diện web thư. Do đó, chỉ những chữ cái và điều kiện cần thiết để tìm kiếm, trong trường hợp của tôi, chỉ cần (UNSEEN) vào thư mục này.

Tóm lại, chúng ta có trình tự sau: chúng ta kiểm tra xem có chữ cái mới nào đáp ứng các điều kiện hay không, nếu có thì chúng ta tải xuống kho lưu trữ bằng liên kết từ chữ cái cuối cùng.
Trong các dấu chấm cuối cùng, người ta bỏ qua rằng kho lưu trữ này sẽ được giải nén, dữ liệu từ kho lưu trữ sẽ bị xóa và xử lý, và kết quả là toàn bộ nội dung sẽ đi xa hơn đến đường dẫn của quy trình ETL, nhưng điều này đã vượt quá phạm vi bài viết. Nếu nó trở nên thú vị và hữu ích, thì tôi sẽ sẵn lòng tiếp tục mô tả các giải pháp ETL và các bộ phận của chúng cho Luồng khí Apache.

Nguồn: www.habr.com

Thêm một lời nhận xét