
Ongeacht hoeveel technologie zich ontwikkelt, een reeks verouderde benaderingen loopt altijd achter de ontwikkeling aan. Dit kan te wijten zijn aan een soepele overgang, menselijke factoren, technologische behoeften of iets anders. Op het gebied van dataverwerking zijn databronnen in dit onderdeel het meest onthullend. Het maakt niet uit hoeveel we dromen om hiervan af te komen, maar tot nu toe wordt een deel van de gegevens verzonden in instant messengers en e-mails, om nog maar te zwijgen van meer archaïsche formaten. Ik nodig je uit om een van de opties voor Apache Airflow te demonteren, om te illustreren hoe je gegevens uit e-mails kunt halen.
prehistorie
Veel gegevens worden nog steeds via e-mail overgedragen, van interpersoonlijke communicatie tot standaarden voor interactie tussen bedrijven. Het is goed als het mogelijk is om een interface te schrijven om gegevens te verkrijgen of om mensen op kantoor te plaatsen die deze informatie in handigere bronnen zullen invoeren, maar vaak is dit gewoon niet mogelijk. De specifieke taak waar ik voor stond, was het verbinden van het beruchte CRM-systeem met het datawarehouse en vervolgens met het OLAP-systeem. Het is historisch zo gebeurd dat het gebruik van dit systeem voor ons bedrijf handig was in een bepaald bedrijfsgebied. Daarom wilde iedereen heel graag ook met data uit dit systeem van derden kunnen werken. Allereerst is natuurlijk gekeken naar de mogelijkheid om data uit een open API te halen. Helaas dekte de API niet het verkrijgen van alle benodigde gegevens, en in eenvoudige bewoordingen was het in veel opzichten krom, en de technische ondersteuning wilde of kon niet halverwege voldoen om uitgebreidere functionaliteit te bieden. Maar dit systeem bood de mogelijkheid om periodiek de ontbrekende gegevens per mail te ontvangen in de vorm van een link voor het uitladen van het archief.
Opgemerkt moet worden dat dit niet het enige geval was waarin het bedrijf gegevens uit e-mails of instant messengers wilde verzamelen. In dit geval kunnen we echter geen invloed uitoefenen op een derde partij die een deel van de gegevens alleen op deze manier verstrekt.
Apache-luchtstroom
Om ETL-processen te bouwen, gebruiken we meestal Apache Airflow. Om een lezer die niet bekend is met deze technologie beter te laten begrijpen hoe deze er in de context en in het algemeen uitziet, zal ik er een paar inleidende beschrijven.
Apache Airflow is een gratis platform dat wordt gebruikt om ETL-processen (Extract-Transform-Loading) in Python te bouwen, uit te voeren en te bewaken. Het belangrijkste concept in Airflow is een gerichte acyclische grafiek, waarbij de hoekpunten van de grafiek specifieke processen zijn en de randen van de grafiek de stroom van controle of informatie. Een proces kan eenvoudig elke Python-functie aanroepen, of het kan complexere logica hebben door achtereenvolgens verschillende functies aan te roepen in de context van een klasse. Voor de meest voorkomende bewerkingen zijn er al veel kant-en-klare ontwikkelingen die als processen kunnen worden gebruikt. Dergelijke ontwikkelingen zijn onder meer:
- operators - voor het overbrengen van gegevens van de ene plaats naar de andere, bijvoorbeeld van een databasetabel naar een datawarehouse;
- sensoren - om te wachten op het optreden van een bepaalde gebeurtenis en om de controlestroom naar de volgende hoekpunten van de grafiek te leiden;
- hooks - voor bewerkingen op een lager niveau, bijvoorbeeld om gegevens uit een databasetabel te halen (gebruikt in instructies);
- etc.
Het zou ongepast zijn om Apache Airflow in dit artikel in detail te beschrijven. Korte introducties kunnen worden bekeken of .
Haak voor het verkrijgen van gegevens
Allereerst moeten we, om het probleem op te lossen, een haak schrijven waarmee we het volgende kunnen doen:
- verbinding maken met e-mail
- zoek de juiste letter
- gegevens uit de brief ontvangen.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_idDe logica is deze: we verbinden, vinden de laatste meest relevante letter, als er andere zijn, negeren we ze. Deze functie wordt gebruikt, omdat latere brieven alle gegevens van eerdere bevatten. Als dit niet het geval is, kunt u een reeks van alle letters retourneren, of de eerste verwerken en de rest bij de volgende doorgang. Over het algemeen hangt alles, zoals altijd, af van de taak.
We voegen twee hulpfuncties toe aan de hook: voor het downloaden van een bestand en voor het downloaden van een bestand via een link uit een e-mail. Overigens kunnen ze worden verplaatst naar de operator, het hangt af van de frequentie van het gebruik van deze functionaliteit. Wat u nog meer aan de haak kunt toevoegen, hangt weer af van de taak: als bestanden onmiddellijk in de brief worden ontvangen, kunt u bijlagen bij de brief downloaden, als de gegevens in de brief worden ontvangen, moet u de brief ontleden, enz. In mijn geval bevat de brief één link naar het archief, die ik op een bepaalde plek moet plaatsen en het verdere verwerkingsproces moet starten.
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)De code is eenvoudig en behoeft dus nauwelijks verdere uitleg. Ik zal je alleen vertellen over de magische lijn imap_conn_id. Apache Airflow slaat verbindingsparameters op (login, wachtwoord, adres en andere parameters) die toegankelijk zijn via een tekenreeks-ID. Visueel ziet verbindingsbeheer er zo uit

Sensor wacht op gegevens
Omdat we al weten hoe we verbinding moeten maken en gegevens van e-mail moeten ontvangen, kunnen we nu een sensor schrijven om op hen te wachten. In mijn geval werkte het niet om meteen een operator te schrijven die de gegevens, indien aanwezig, zal verwerken, omdat andere processen werken op basis van de gegevens die via de mail worden ontvangen, inclusief processen die gerelateerde gegevens uit andere bronnen halen (API, telefonie , webstatistieken, enz.). enz.). Ik zal je een voorbeeld geven. Er is een nieuwe gebruiker in het CRM-systeem verschenen en we weten nog steeds niets over zijn UUID. Wanneer we vervolgens proberen gegevens van SIP-telefonie te ontvangen, ontvangen we oproepen die zijn gekoppeld aan de UUID, maar we kunnen ze niet opslaan en correct gebruiken. In dergelijke zaken is het belangrijk om de afhankelijkheid van de gegevens in gedachten te houden, vooral als ze uit verschillende bronnen komen. Dit zijn natuurlijk onvoldoende maatregelen om de data-integriteit te behouden, maar in sommige gevallen zijn ze noodzakelijk. Ja, en niets doen om middelen te bezetten is ook irrationeel.
Onze sensor zal dus volgende hoekpunten van de grafiek lanceren als er nieuwe informatie in de mail is, en ook de eerdere informatie als irrelevant markeren.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return TrueWe ontvangen en gebruiken gegevens
Om gegevens te ontvangen en te verwerken, kunt u een aparte operator schrijven, u kunt kant-en-klare gebruiken. Omdat de logica nog steeds triviaal is - om bijvoorbeeld gegevens uit de brief te halen, stel ik de standaard PythonOperator voor
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["home@home.ru"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управленияTrouwens, als uw zakelijke e-mail ook op mail.ru staat, kunt u niet naar brieven zoeken op onderwerp, afzender, enz. In 2016 beloofden ze het te introduceren, maar blijkbaar zijn ze van gedachten veranderd. Dit probleem heb ik opgelost door een aparte map aan te maken voor de benodigde brieven en een filter in te stellen voor de benodigde brieven in de mail webinterface. Dus alleen de noodzakelijke letters en voorwaarden voor het zoeken, in mijn geval, komen gewoon (UNSEEN) in deze map.
Samengevat hebben we de volgende volgorde: we kijken of er nieuwe brieven zijn die aan de voorwaarden voldoen, zo ja, dan downloaden we het archief via de link van de laatste brief.
Onder de laatste puntjes is weggelaten dat dit archief wordt uitgepakt, de gegevens uit het archief worden gewist en verwerkt, en als gevolg daarvan gaat het geheel verder naar de pijplijn van het ETL-proces, maar dit is al voorbij de reikwijdte van het artikel. Als het interessant en nuttig is gebleken, zal ik graag doorgaan met het beschrijven van ETL-oplossingen en hun onderdelen voor Apache Airflow.
Bron: www.habr.com
