Bez obzira na to koliko se tehnologija razvija, niz zastarjelih pristupa uvijek zaostaje za razvojem. To može biti zbog glatke tranzicije, ljudskih faktora, tehnoloških potreba ili nečeg drugog. U oblasti obrade podataka izvori podataka su u ovom dijelu najizrazitiji. Koliko god sanjali da se toga riješimo, ali za sada se dio podataka šalje u instant messengerima i mejlovima, da ne govorimo o arhaičnijim formatima. Pozivam vas da rastavite jednu od opcija za Apache Airflow, ilustrujući kako možete preuzeti podatke iz e-pošte.
prapovijest
Mnogo podataka se i dalje prenosi putem e-pošte, od međuljudske komunikacije do standarda interakcije između kompanija. Dobro je ako je moguće napisati interfejs za dobijanje podataka ili staviti ljude u kancelariju koji će te informacije unositi u pogodnije izvore, ali često to jednostavno nije moguće. Konkretan zadatak sa kojim sam se suočio bio je povezivanje ozloglašenog CRM sistema sa skladištem podataka, a zatim i sa OLAP sistemom. Istorijski se dogodilo da je za našu kompaniju upotreba ovog sistema bila zgodna u određenoj oblasti poslovanja. Stoga su svi zaista željeli da mogu raditi i sa podacima iz ovog sistema treće strane. Prije svega, naravno, proučavana je mogućnost dobivanja podataka iz otvorenog API-ja. Nažalost, API nije pokrivao dobijanje svih potrebnih podataka, i, jednostavno rečeno, bio je na mnogo načina kriv, a tehnička podrška nije htela ili nije mogla da izađe u susret kako bi pružila sveobuhvatniju funkcionalnost. Ali ovaj sistem je pružio mogućnost da se podaci koji nedostaju povremeno primaju poštom u obliku veze za istovar arhive.
Treba napomenuti da ovo nije bio jedini slučaj u kojem je kompanija željela prikupiti podatke iz e-mailova ili instant messengera. Međutim, u ovom slučaju nismo mogli utjecati na treću kompaniju koja samo na ovaj način daje dio podataka.
apache airflow
Za izgradnju ETL procesa najčešće koristimo Apache Airflow. Kako bi čitatelj koji nije upoznat s ovom tehnologijom bolje razumio kako ona izgleda u kontekstu i općenito, opisati ću nekoliko uvodnih.
Apache Airflow je besplatna platforma koja se koristi za izgradnju, izvršavanje i praćenje ETL (Extract-Transform-Loading) procesa u Pythonu. Glavni koncept u Airflow-u je usmjereni aciklični graf, gdje su vrhovi grafa specifični procesi, a ivice grafa su tok kontrole ili informacija. Proces može jednostavno pozvati bilo koju Python funkciju ili može imati složeniju logiku od sekvencijalnog pozivanja nekoliko funkcija u kontekstu klase. Za najčešće operacije već postoji mnogo gotovih razvoja koji se mogu koristiti kao procesi. Takvi razvoji uključuju:
- operatori - za prenos podataka sa jednog mesta na drugo, na primer, iz tabele baze podataka u skladište podataka;
- senzori - za čekanje na pojavu određenog događaja i usmjeravanje toka kontrole na naredne vrhove grafa;
- kukice - za operacije nižeg nivoa, na primjer, za dobivanje podataka iz tablice baze podataka (koristi se u izjavama);
- i tako dalje.
Bilo bi neprikladno detaljno opisati Apache Airflow u ovom članku. Kratke uvode možete pogledati
Kuka za dobijanje podataka
Prije svega, da bismo riješili problem, moramo napisati udicu s kojom bismo mogli:
- povežite se na email
- pronađite pravo slovo
- primati podatke iz pisma.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Logika je sledeća: povezujemo se, nalazimo poslednje najrelevantnije slovo, ako ima drugih, ignorišemo ih. Ova funkcija se koristi jer kasnija slova sadrže sve podatke ranijih. Ako to nije slučaj, onda možete vratiti niz svih slova ili obraditi prvo, a ostatak na sljedećem prolazu. Općenito, sve, kao i uvijek, ovisi o zadatku.
Dodamo dvije pomoćne funkcije na kuku: za preuzimanje datoteke i za preuzimanje datoteke pomoću veze iz e-pošte. Usput, mogu se premjestiti kod operatera, ovisi o učestalosti korištenja ove funkcionalnosti. Šta još dodati na kuku, opet, ovisi o zadatku: ako se datoteke odmah primaju u pismu, tada možete preuzeti priloge pismu, ako su podaci primljeni u pismu, onda morate raščlaniti pismo, itd. U mom slučaju pismo dolazi sa jednom vezom do arhive koju trebam staviti na određeno mjesto i krenuti u dalju obradu.
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Kod je jednostavan, tako da mu nije potrebno dodatno objašnjenje. Reći ću vam samo o magičnoj liniji imap_conn_id. Apache Airflow pohranjuje parametre veze (prijava, lozinka, adresa i drugi parametri) kojima se može pristupiti pomoću identifikatora niza. Vizuelno, upravljanje vezama izgleda ovako
Senzor čeka podatke
Pošto već znamo kako se povezati i primati podatke iz pošte, sada možemo napisati senzor da ih čeka. U mom slučaju nije išlo odmah napisati operatera koji će obraditi podatke, ako ih ima, jer drugi procesi rade na osnovu podataka primljenih iz pošte, uključujući i one koji preuzimaju povezane podatke iz drugih izvora (API, telefonija , web metrike, itd.) itd.). Dat ću vam primjer. U CRM sistemu se pojavio novi korisnik, a još uvijek ne znamo za njegov UUID. Tada ćemo, prilikom pokušaja primanja podataka od SIP telefonije, primati pozive vezane za njen UUID, ali ih nećemo moći ispravno pohraniti i koristiti. U takvim stvarima važno je imati na umu ovisnost podataka, posebno ako su iz različitih izvora. Ovo su, naravno, nedovoljne mjere za očuvanje integriteta podataka, ali su u nekim slučajevima neophodne. Da, a neracionalno je i neracionalno zauzimanje resursa.
Tako će naš senzor pokrenuti sljedeće vrhove grafa ako u mailu ima svježih informacija, a također će prethodnu informaciju označiti kao nebitnu.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Primamo i koristimo podatke
Za primanje i obradu podataka možete napisati poseban operator, možete koristiti gotove. Pošto je do sada logika trivijalna - da uzmete podatke iz pisma, na primjer, predlažem standardni PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления
Usput, ako je vaša korporativna pošta također na mail.ru, tada nećete moći tražiti pisma po temi, pošiljaocu itd. Još 2016. obećali su da će ga uvesti, ali su se očigledno predomislili. Ovaj problem sam riješio kreiranjem posebnog foldera za potrebna slova i postavljanjem filtera za potrebna slova u web interfejsu pošte. Dakle, samo potrebna slova i uslovi za pretragu, u mom slučaju, jednostavno (NEVIĐENO) ulaze u ovaj folder.
Sumirajući, imamo sljedeći redoslijed: provjeravamo da li postoje nova pisma koja ispunjavaju uslove, ako ih ima, onda preuzimamo arhivu koristeći vezu iz posljednjeg pisma.
Ispod zadnjih tačaka je izostavljeno da će se ova arhiva raspakirati, podaci iz arhive obrisati i obraditi, a kao rezultat toga, cijela stvar će ići dalje do cevovoda ETL procesa, ali to je već izvan granica obim članka. Ako je ispalo zanimljivo i korisno, onda ću rado nastaviti opisivati ETL rješenja i njihove dijelove za Apache Airflow.
izvor: www.habr.com