Як би сильно не розвивалися технології, за розвитком завжди тягнеться низка застарілих підходів. Це може бути обумовлено плавним переходом, людським фактором, технологічними потребами чи чимось іншим. В області обробки даних найбільш показовими у цій частині є джерела даних. Як би ми не мріяли цього позбутися, але поки частина даних пересилається в месенджерах та електронних листах, не кажучи і про більш архаїчні формати. Запрошую під кат розібрати один із варіантів для Apache Airflow, що ілюструє, як можна забирати дані з електронних листів.
Передісторія
Багато даних досі передаються електронною поштою, починаючи з міжособистісних комунікацій і закінчуючи стандартами взаємодії між компаніями. Добре, якщо вдається для отримання даних написати інтерфейс або посадити людей в офісі, які цю інформацію вноситимуть у зручніші джерела, але часто такої можливості може просто не бути. Конкретне завдання, з яким я зіткнувся, — це підключення відомої CRM системи до сховища даних, а далі — до системи OLAP. Так історично склалося, що для нашої компанії використання цієї системи було зручно в окремій області бізнесу. Тому всім дуже хотілося мати можливість оперувати даними і з цієї сторонньої системи, зокрема. Насамперед, звичайно, було вивчено можливість отримання даних з відкритого API. На жаль, API не покривало отримання всіх необхідних даних, та й, висловлюючись простою мовою, багато в чому кривувато, а технічна підтримка не захотіла або не змогла піти назустріч для надання більш вичерпного функціоналу. Зате дана система надавала можливість періодичного отримання даних на пошту у вигляді посилання для вивантаження архіву.
Слід зазначити, що це був єдиний кейс, яким бізнес хотів збирати дані з поштових листів чи месенджерів. Однак, у цьому випадку ми не могли вплинути на сторонню компанію, яка надає частину даних лише у такий спосіб.
Потік повітря Apache
Для побудови процесів ETL ми найчастіше використовуємо Apache Airflow. Для того, щоб читач, незнайомий з цією технологією, краще зрозумів, як це виглядає в контексті і в цілому, опишу пару вступних.
Apache Airflow – вільна платформа, яка використовується для побудови, виконання та моніторингу ETL (Extract-Transform-Loading) процесів мовою Python. Основним поняттям Airflow є орієнтований ациклічний граф, де вершини графа — конкретні процеси, а ребра графа — потік управління чи інформації. Процес може просто викликати будь-яку функцію Python, а може мати більш складну логіку з послідовного виклику декількох функцій в контексті класу. Для найчастіших операцій вже є безліч готових напрацювань, які можна використовувати як процеси. До таких напрацювань відносяться:
- оператори - для перегону даних з одного місця до іншого, наприклад з таблиці БД в сховище даних;
- сенсори - для очікування настання певної події та напрямки потоку управління у наступні вершини графа;
- хуки - для більш низькорівневих операцій, наприклад, для отримання даних із таблиці БД (використовуються в операторах);
- тощо.
Описувати Apache Airflow докладно у цій статті буде недоцільно. Короткі вступ можна подивитися
Хук для отримання даних
Насамперед, для вирішення завдання потрібно написати хук, за допомогою якого ми могли б:
- підключатися до електронної пошти;
- знаходити потрібний лист;
- отримувати дані з листа.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Логіка така: підключаємося, знаходимо останній найактуальніший лист, якщо є інші — ігноруємо їх. Використовується саме така функція, тому що пізніші листи містять усі дані ранніх. Якщо це не так, то можна повертати масив усіх листів або обробляти перший, а решта — при наступному проході. Загалом все як завжди залежить від завдання.
Додаємо до хука дві допоміжні функції: для завантаження файлу та для завантаження файлу за посиланням з листа. До речі, їх можна винести в оператор, це залежить від частоти використання цього функціоналу. Що ще дописувати в хук, знову ж таки, залежить від завдання: якщо в листі приходять одразу файли, то можна завантажувати додатки до листа, якщо дані приходять у листі, то треба ширяти лист і т.д. У моєму випадку лист надходить з одним посиланням на архів, який мені потрібно покласти в певне місце і запустити подальший процес обробки.
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Код простий, тому навряд чи потребує додаткових пояснень. Розповім лише про магічний рядок imap_conn_id. Apache Airflow зберігає параметри підключень (логін, пароль, адресу та інші параметри), до яких можна звертатись за рядковим ідентифікатором. Візуально керування підключеннями виглядає ось так
Сенсор для очікування даних
Оскільки ми вже вміємо підключатися та отримувати дані з пошти, тепер можемо написати сенсор для їхнього очікування. Написати відразу оператор, який оброблятиме дані, якщо вони є, в моєму випадку не вийшло, оскільки на підставі отриманих даних з пошти працюють й інші процеси, в тому числі, пов'язані дані, що беруть, з інших джерел (API, телефонія, веб метрики і і т.д.). Наведу приклад. У системі CRM з'явився новий користувач, і ми ще не знаємо про його UUID. Тоді при спробі отримати дані з SIP-телефонії ми отримаємо дзвінки, прив'язані до UUID, але коректно зберегти і використовувати їх не зможемо. У таких питаннях важливо мати на увазі залежність даних, особливо якщо вони з різних джерел. Це, звісно, недостатні заходи збереження цілісності даних, але у випадках необхідні. Та й у холосту позичати ресурси теж нераціонально.
Таким чином, наш сенсор буде запускати наступні вершини графа, якщо є свіжа інформація поштою, а також позначати неактуальною попередню інформацію.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Отримуємо та використовуємо дані
Для отримання та обробки даних можна написати окремий оператор, можна використовувати готові. Оскільки поки що логіка тривіальна — забрати дані з листа, то для прикладу пропоную стандартний PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления
До речі, якщо ваша корпоративна пошта теж на mail.ru, вам буде недоступний пошук листів по темі, відправнику і т.д. Вони ще в далекому 2016 році обіцяли ввести, але, мабуть, передумали. Я вирішив цю проблему, створивши під потрібні листи окрему папку та налаштувавши у веб-інтерфейсі пошти фільтр на потрібні листи. Таким чином, у цю папку потрапляють лише потрібні листи та умови для пошуку у моєму випадку просто (UNSEEN).
Резюмуючи, ми маємо таку послідовність: перевіряємо, чи є нові листи, які відповідають умовам, якщо є, то завантажуємо архів за посиланням з останнього листа.
Під останніми трьома крапками опущено, що цей архів буде розпакований, дані з архіву очищені і оброблені, і в результаті ця справа піде далі на конвеєр ETL процесу, але це вже виходить за рамки теми статті. Якщо вийшло цікаво та корисно, то з радістю продовжу описувати ETL рішення та їх частини для Apache Airflow.
Джерело: habr.com