在 Apache Airflow 中從電子郵件中獲取數據的 ETL 過程

在 Apache Airflow 中從電子郵件中獲取數據的 ETL 過程

無論技術如何發展,一連串過時的方法總是落後於發展。 這可能是由於平穩過渡、人為因素、技術需求或其他原因。 在數據處理領域,數據源是這部分最具啟發性的部分。 不管我們多麼夢想擺脫它,但到目前為止,部分數據是通過即時通訊和電子郵件發送的,更不用說更古老的格式了。 我邀請您拆解 Apache Airflow 的其中一個選項,說明如何從電子郵件中獲取數據。

許多數據仍然通過電子郵件傳輸,從人際通信到公司之間的交互標準。 如果可以編寫一個接口來獲取數據,或者讓辦公室裡的人將這些信息輸入到更方便的來源,這很好,但通常這可能根本不可能。 我面臨的具體任務是將臭名昭著的 CRM 系統連接到數據倉庫,然後再連接到 OLAP 系統。 歷史上確實如此,對於我們公司來說,在特定業務領域使用該系統很方便。 所以大家也非常希望能夠操作這個第三方系統的數據。 首先,當然是研究了從開放 API 獲取數據的可能性。 不幸的是,API並沒有涵蓋獲取所有必要的數據,簡單來說,它在很多方面都是歪的,技術支持不想或不能半途而廢來提供更全面的功能。 但是該系統提供了以卸載檔案鏈接的形式通過郵件定期接收丟失數據的機會。

應該指出的是,這並不是企業希望從電子郵件或即時通訊工具中收集數據的唯一案例。 但是,在這種情況下,我們無法影響僅以這種方式提供部分數據的第三方公司。

阿帕奇氣流

要構建 ETL 過程,我們最常使用 Apache Airflow。 為了讓不熟悉這項技術的讀者更好地理解它在上下文中和一般情況下的樣子,我將描述幾個介紹性的。

Apache Airflow 是一個免費平台,用於在 Python 中構建、執行和監控 ETL(提取-轉換-加載)過程。 Airflow 中的主要概念是有向無環圖,其中圖的頂點是特定的進程,圖的邊是控製或信息流。 進程可以簡單地調用任何 Python 函數,也可以通過在類的上下文中順序調用多個函數而具有更複雜的邏輯。 對於最頻繁的操作,已經有很多現成的開發可以作為流程。 這些發展包括:

  • 運算符——用於將數據從一個地方傳輸到另一個地方,例如,從數據庫表到數據倉庫;
  • 傳感器 - 用於等待特定事件的發生並將控制流引導至圖形的後續頂點;
  • 鉤子 - 用於較低級別的操作,例如,從數據庫表中獲取數據(在語句中使用);
  • 等等

在本文中詳細描述 Apache Airflow 是不合適的。 可以查看簡介 這裡這裡.

獲取數據的鉤子

首先,要解決這個問題,我們需要寫一個鉤子,我們可以用它:

  • 連接到電子郵件
  • 找到合適的字母
  • 從信中接收數據。

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

邏輯是這樣的:我們連接,找到最後一個最相關的字母,如果還有其他字母,我們將忽略它們。 使用此功能,因為後面的字母包含前面字母的所有數據。 如果不是這種情況,那麼您可以返回一個包含所有字母的數組,或者處理第一個字母,然後在下一次處理其餘字母。 總的來說,一切都一如既往地取決於任務。

我們向掛鉤添加兩個輔助功能:用於下載文件和使用電子郵件中的鏈接下載文件。 順便說一句,它們可以移交給運營商,這取決於使用此功能的頻率。 還有什麼要添加到鉤子,同樣,取決於任務:如果文件在信件中立即收到,那麼你可以下載附件到信件,如果數據在信件中收到,那麼你需要解析信件, ETC。 就我而言,這封信帶有一個指向檔案的鏈接,我需要將其放在某個地方並開始進一步的處理過程。

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

代碼很簡單,所以幾乎不需要進一步解釋。 我將只告訴您神奇的行 imap_conn_id。 Apache Airflow 存儲可以通過字符串標識符訪問的連接參數(登錄名、密碼、地址和其他參數)。 在視覺上,連接管理看起來像這樣

在 Apache Airflow 中從電子郵件中獲取數據的 ETL 過程

等待數據的傳感器

由於我們已經知道如何連接和接收來自郵件的數據,我們現在可以編寫一個傳感器來等待它們。 在我的例子中,立即編寫一個操作員來處理數據(如果有的話)是行不通的,因為其他進程基於從郵件接收的數據工作,包括那些從其他來源(API、電話)獲取相關數據的進程、網絡指標等)。等等)。 我給你舉個例子。 CRM系統中出現了一個新用戶,我們還不知道他的UUID。 然後,當嘗試從 SIP 電話接收數據時,我們將收到與其 UUID 綁定的調用,但我們將無法正確保存和使用它們。 在這種情況下,重要的是要記住數據的依賴性,尤其是當它們來自不同來源時。 當然,這些不足以保護數據完整性,但在某些情況下它們是必要的。 是的,閒置佔用資源也是不合理的。

因此,如果郵件中有新信息,我們的傳感器將啟動圖形的後續頂點,並將先前的信息標記為不相關。

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

我們接收和使用數據

接收和處理數據,可以單獨寫一個operator,也可以用現成的。 由於到目前為止邏輯是微不足道的——例如從信中獲取數據,我建議使用標準的 PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

順便說一句,如果您的公司郵件也在 mail.ru 上,那麼您將無法按主題、發件人等搜索信件。 早在 2016 年,他們就承諾推出它,但顯然改變了主意。 我通過為必要的信件創建一個單獨的文件夾並在郵件 Web 界面中為必要的信件設置過濾器來解決這個問題。 因此,只有搜索所需的字母和條件,在我的例子中,只是 (UNSEEN) 進入這個文件夾。

總而言之,我們有以下順序:我們檢查是否有滿足條件的新信件,如果有,則我們使用最後一封信件中的鏈接下載檔案。
最後一點,省略了這個archive會被解包,archive中的數據會被清空處理,結果整個事情會更進一步到ETL流程的pipeline,但這已經超出了文章的範圍。 如果結果有趣且有用,那麼我將很樂意繼續描述 Apache Airflow 的 ETL 解決方案及其組成部分。

來源: www.habr.com

添加評論