
Bez względu na to, jak bardzo rozwinie się technologia, za rozwojem zawsze podąża szereg przestarzałych podejść. Może to być spowodowane płynnym przejściem, czynnikami ludzkimi, potrzebami technologicznymi lub czymś innym. W zakresie przetwarzania danych najbardziej odkrywcze w tej części są źródła danych. Nieważne, jak bardzo marzymy o pozbyciu się tego, ale póki co część danych przesyłana jest w komunikatorach internetowych i mailach, nie mówiąc już o bardziej archaicznych formatach. Zapraszam do zdemontowania jednej z opcji dla Apache Airflow, ilustrującej w jaki sposób można pobierać dane z maili.
prehistoria
Wiele danych, począwszy od komunikacji interpersonalnej, a skończywszy na standardach interakcji między firmami, w dalszym ciągu przesyłanych jest pocztą elektroniczną. Dobrze, jeśli potrafisz napisać interfejs do odbioru danych lub umieścić w biurze osoby, które będą wprowadzać te informacje do dogodniejszych źródeł, jednak często takiej możliwości może po prostu nie być. Konkretnym zadaniem przed jakim stanąłem było podłączenie znanego systemu CRM do hurtowni danych, a następnie do systemu OLAP. Tak się złożyło historycznie, że dla naszej firmy korzystanie z tego systemu było wygodne w jednym obszarze działalności. Dlatego wszyscy bardzo chcieli móc operować danymi również z tego zewnętrznego systemu. Przede wszystkim zbadano oczywiście możliwość pozyskania danych z otwartego API. Niestety API nie obejmowało pobrania wszystkich niezbędnych danych i najprościej mówiąc było pod wieloma względami oszukane, a pomoc techniczna nie chciała lub nie mogła wyjść naprzeciw, aby zapewnić bardziej kompleksową funkcjonalność. Ale ten system zapewniał możliwość okresowego otrzymywania brakujących danych pocztą w postaci linku do rozładunku archiwum.
Warto zaznaczyć, że nie był to jedyny przypadek, w którym firma chciała zebrać dane z e-maili czy komunikatorów internetowych. Jednak w tym przypadku nie mogliśmy wpłynąć na firmę zewnętrzną, która udostępnia część danych tylko w ten sposób.
Przepływ powietrza Apache
Do budowy procesów ETL najczęściej wykorzystujemy Apache Airflow. Aby czytelnik nieobeznany z tą technologią mógł lepiej zrozumieć jak ona wygląda w kontekście i ogólnie opiszę kilka wprowadzających.
Apache Airflow to bezpłatna platforma służąca do budowania, uruchamiania i monitorowania procesów ETL (Extract-Transform-Loading) w języku Python. Główną koncepcją w Airflow jest skierowany graf acykliczny, którego wierzchołki wykresu przedstawiają określone procesy, a krawędzie grafu przedstawiają przepływ kontroli lub informacji. Proces może po prostu wywołać dowolną funkcję Pythona lub może mieć bardziej złożoną logikę sekwencyjnego wywoływania kilku funkcji w kontekście klasy. W przypadku najpopularniejszych operacji istnieje już wiele gotowych rozwiązań, które można wykorzystać jako procesy. Takie zmiany obejmują:
- operatory - do przenoszenia danych z jednego miejsca do drugiego, np. z tabeli bazy danych do hurtowni danych;
- czujniki - oczekują na wystąpienie określonego zdarzenia i kierują przepływ sterowania na kolejne wierzchołki wykresu;
- hooki - do operacji niższego poziomu, np. w celu pobrania danych z tabeli bazy danych (używane w instrukcjach);
- itd.
Niewłaściwym byłoby szczegółowe opisywanie Apache Airflow w tym artykule. Można obejrzeć krótkie wprowadzenia lub .
Hak do pobierania danych
Przede wszystkim, aby rozwiązać problem, musimy napisać hak, za pomocą którego moglibyśmy:
- połącz się z e-mailem;
- znajdź właściwą literę
- otrzymać dane z pisma.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_idLogika jest następująca: łączymy, znajdujemy ostatnią najbardziej odpowiednią literę, jeśli są inne, ignorujemy je. Ta funkcja jest używana, ponieważ późniejsze litery zawierają wszystkie dane wcześniejszych. Jeśli tak nie jest, możesz zwrócić tablicę wszystkich liter lub przetworzyć pierwszą, a resztę w następnym przebiegu. Generalnie jak zawsze wszystko zależy od zadania.
Do haka dodajemy dwie funkcje pomocnicze: do pobrania pliku i do pobrania pliku za pomocą linku z listu. Nawiasem mówiąc, można je uwzględnić w operatorze, zależy to od częstotliwości korzystania z tej funkcjonalności. Co jeszcze dodać do haka, znowu zależy od zadania: jeśli pliki zostaną otrzymane natychmiast w liście, możesz pobrać załączniki do listu, jeśli w liście pojawią się dane, musisz przeanalizować list itp. W moim przypadku list przychodzi z jednym linkiem do archiwum, który muszę umieścić w określonym miejscu i rozpocząć dalszą obróbkę.
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)Kod jest prosty, więc nie wymaga dodatkowych wyjaśnień. Opowiem Ci tylko o magicznej linii imap_conn_id. Apache Airflow przechowuje parametry połączenia (login, hasło, adres i inne parametry), do których można uzyskać dostęp za pomocą identyfikatora tekstowego. Wizualnie zarządzanie połączeniami wygląda tak

Czujnik oczekiwania na dane
Skoro już wiemy jak się łączyć i odbierać dane z poczty, możemy teraz napisać czujnik, który będzie na to czekał. W moim przypadku od razu napisanie operatora, który będzie przetwarzał dane, jeśli takie będą, nie zadziałało, bo na danych otrzymanych z poczty działają inne procesy, w tym te, które pobierają powiązane dane z innych źródeł (API, telefonia, metryki sieciowe, itp.) itp.). Dam ci przykład. W systemie CRM pojawił się nowy użytkownik i nie znamy jeszcze jego UUID. Wtedy przy próbie odbioru danych z telefonii SIP otrzymamy połączenia powiązane z jej UUID, ale nie będziemy mogli ich poprawnie zapisać i wykorzystać. W takich pytaniach należy pamiętać o zależności danych, zwłaszcza jeśli pochodzą one z różnych źródeł. Są to oczywiście środki niewystarczające do zachowania integralności danych, ale w niektórych przypadkach są konieczne. Pożyczanie zasobów w czasie bezczynności jest również irracjonalne.
Dzięki temu nasz sensor będzie uruchamiał kolejne wierzchołki wykresu, jeśli w mailu pojawią się świeże informacje, a także oznaczy poprzednie informacje jako nieistotne.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return TrueOdbieranie i wykorzystywanie danych
Do odbierania i przetwarzania danych można napisać osobny operator, można skorzystać z gotowych. Ponieważ na razie logika jest banalna - aby pobrać dane np. z litery, sugeruję standardowy PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["home@home.ru"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управленияNawiasem mówiąc, jeśli Twój firmowy adres e-mail znajduje się również na mail.ru, nie będziesz mógł wyszukiwać listów według tematu, nadawcy itp. Obiecali wprowadzenie go w 2016 roku, ale najwyraźniej zmienili zdanie. Rozwiązałem ten problem, tworząc oddzielny folder na niezbędne listy i konfigurując filtr w interfejsie WWW poczty dla niezbędnych listów. Zatem w moim przypadku tylko niezbędne litery i warunki wyszukiwania po prostu (NIEWIDZONE) trafiają do tego folderu.
Reasumując, mamy następującą sekwencję: sprawdzamy, czy pojawiły się nowe litery spełniające warunki, jeśli tak, to pobieramy archiwum korzystając z linku z ostatniej litery.
Pod ostatnimi elipsami pominięto, że to archiwum zostanie rozpakowane, dane z archiwum zostaną wyczyszczone i przetworzone, a na koniec cała sprawa pójdzie dalej do potoku procesu ETL, ale to już wykracza poza zakres artykuł. Jeśli okazało się to ciekawe i przydatne, chętnie będę dalej opisywał rozwiązania ETL i ich części dla Apache Airflow.
Źródło: www.habr.com
