Ma jimpurtax kemm tiżviluppa t-teknoloġija, sensiela ta 'approċċi antikwati dejjem tmur wara l-iżvilupp. Dan jista 'jkun minħabba transizzjoni bla xkiel, fatturi umani, ħtiġijiet teknoloġiċi, jew xi ħaġa oħra. Fil-qasam tal-ipproċessar tad-dejta, is-sorsi tad-dejta huma l-aktar li jiżvelaw f'din il-parti. Ma jimpurtax kemm noħolmu li neħilsu minn dan, iżda s'issa parti mid-dejta tintbagħat f'messaġġi istantanji u emails, biex ma nsemmux formati aktar arkajċi. Nistednek biex tiżżarma waħda mill-għażliet għall-Apache Airflow, li turi kif tista 'tieħu data mill-emails.
preistorja
Ħafna dejta għadha tiġi trasferita permezz tal-posta elettronika, minn komunikazzjonijiet interpersonali għal standards ta’ interazzjoni bejn il-kumpaniji. Huwa tajjeb jekk ikun possibbli li tikteb interface biex tinkiseb data jew tpoġġi nies fl-uffiċċju li se jdaħħlu din l-informazzjoni f'sorsi aktar konvenjenti, iżda ħafna drabi dan jista 'sempliċement ma jkunx possibbli. Il-kompitu speċifiku li ffaċċjajt kien li jgħaqqad is-sistema CRM notorja mal-maħżen tad-dejta, u mbagħad mas-sistema OLAP. Storikament ġara li għall-kumpanija tagħna l-użu ta 'din is-sistema kien konvenjenti f'qasam partikolari tan-negozju. Għalhekk, kulħadd verament ried li jkun jista 'jopera b'dejta minn din is-sistema ta' parti terza wkoll. L-ewwelnett, ovvjament, ġiet studjata l-possibbiltà li tinkiseb data minn API miftuħa. Sfortunatament, l-API ma koprietx il-kisba tad-dejta kollha meħtieġa, u, f'termini sempliċi, kienet f'ħafna modi mgħawweġ, u l-appoġġ tekniku ma riedx jew ma setax jiltaqa 'nofs triq biex jipprovdi funzjonalità aktar komprensiva. Iżda din is-sistema pprovdiet l-opportunità li perjodikament tirċievi d-dejta nieqsa bil-posta fil-forma ta 'link għall-ħatt tal-arkivju.
Ta’ min jinnota li dan ma kienx l-uniku każ li fih in-negozju ried jiġbor data minn emails jew instant messengers. Madankollu, f'dan il-każ, ma nistgħux ninfluwenzaw kumpanija ta' parti terza li tipprovdi parti mid-dejta b'dan il-mod biss.
fluss ta 'arja apache
Biex nibnu proċessi ETL, ħafna drabi nużaw Apache Airflow. Sabiex qarrej li ma jkunx familjari ma 'din it-teknoloġija jifhem aħjar kif tidher fil-kuntest u b'mod ġenerali, se niddeskrivi ftit ta' introduzzjoni.
Apache Airflow hija pjattaforma b'xejn li tintuża biex tibni, tesegwixxi u timmonitorja proċessi ETL (Extract-Transform-Loading) f'Python. Il-kunċett ewlieni fl-Airflow huwa graff aċikliku dirett, fejn il-vertiċi tal-graff huma proċessi speċifiċi, u t-truf tal-graff huma l-fluss ta 'kontroll jew informazzjoni. Proċess jista 'sempliċement isejjaħ kwalunkwe funzjoni Python, jew jista' jkollu loġika aktar kumplessa milli jsejjaħ sekwenzjali diversi funzjonijiet fil-kuntest ta 'klassi. Għall-operazzjonijiet l-aktar frekwenti, diġà hemm ħafna żviluppi lesti li jistgħu jintużaw bħala proċessi. Żviluppi bħal dawn jinkludu:
- operaturi - għat-trasferiment tad-dejta minn post għal ieħor, pereżempju, minn tabella tad-dejtabejż għal maħżen tad-dejta;
- sensuri - biex tistenna l-okkorrenza ta 'ċertu avveniment u tidderieġi l-fluss tal-kontroll lejn il-vertiċi sussegwenti tal-graff;
- ganċijiet - għal operazzjonijiet ta 'livell aktar baxx, pereżempju, biex tikseb dejta minn tabella ta' database (użata f'dikjarazzjonijiet);
- eċċ
Ma jkunx xieraq li tiddeskrivi Apache Airflow fid-dettall f'dan l-artikolu. Introduzzjonijiet qosra jistgħu jitqiesu
Hook biex tikseb id-data
L-ewwelnett, biex issolvi l-problema, għandna bżonn niktbu ganċ li bih nistgħu:
- qabbad mal-email
- issib l-ittra t-tajba
- tirċievi data mill-ittra.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Il-loġika hija din: aħna nikkollegaw, insibu l-aħħar ittra l-aktar rilevanti, jekk hemm oħrajn, ninjorawhom. Din il-funzjoni tintuża, għaliex ittri ta 'wara fihom id-dejta kollha ta' dawk preċedenti. Jekk dan ma jkunx il-każ, allura tista 'tirritorna firxa ta' l-ittri kollha, jew tipproċessa l-ewwel waħda, u l-bqija fil-pass li jmiss. B'mod ġenerali, kollox, bħal dejjem, jiddependi fuq il-kompitu.
Aħna nżidu żewġ funzjonijiet awżiljarji mal-ganċ: biex iniżżlu fajl u biex iniżżlu fajl billi tuża link minn email. Mill-mod, jistgħu jiġu mċaqalqa lill-operatur, jiddependi fuq il-frekwenza tal-użu ta 'din il-funzjonalità. X'iktar għandek iżżid mal-ganċ, għal darb'oħra, jiddependi fuq il-kompitu: jekk il-fajls jiġu riċevuti immedjatament fl-ittra, allura tista 'tniżżel l-annessi mal-ittra, jekk id-data tasal fl-ittra, allura għandek bżonn teżamina l-ittra, eċċ. Fil-każ tiegħi, l-ittra tiġi b'rabta waħda għall-arkivju, li għandi bżonn inpoġġi f'ċertu post u nibda l-proċess ta 'proċessar ulterjuri.
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Il-kodiċi huwa sempliċi, għalhekk bilkemm jeħtieġ aktar spjegazzjoni. Jien ngħidlek biss dwar il-linja magic imap_conn_id. Apache Airflow jaħżen il-parametri tal-konnessjoni (login, password, indirizz, u parametri oħra) li jistgħu jiġu aċċessati minn identifikatur ta 'string. Viżwalment, il-ġestjoni tal-konnessjoni tidher bħal din
Sensor biex jistenna d-data
Peress li diġà nafu kif nikkonnettjaw u nirċievu data mill-posta, issa nistgħu niktbu sensor biex nistennewhom. Fil-każ tiegħi, ma ħadimx li tikteb operatur minnufih li jipproċessa d-dejta, jekk ikun hemm, minħabba li proċessi oħra jaħdmu bbażati fuq id-dejta riċevuta mill-posta, inklużi dawk li jieħdu dejta relatata minn sorsi oħra (API, telefonija , metriċi tal-web, eċċ.). eċċ.). Nagħtik eżempju. Utent ġdid deher fis-sistema CRM, u għadna ma nafux dwar l-UUID tiegħu. Imbagħad, meta nippruvaw nirċievu data mit-telefonija SIP, aħna nirċievu sejħiet marbuta mal-UUID tagħha, iżda mhux se nkunu nistgħu nissejvjawhom u nużawhom b'mod korrett. Fi kwistjonijiet bħal dawn, huwa importanti li wieħed iżomm f'moħħu d-dipendenza tad-dejta, speċjalment jekk tkun minn sorsi differenti. Dawn huma, ovvjament, miżuri insuffiċjenti biex tiġi ppreservata l-integrità tad-dejta, iżda f'xi każijiet huma meħtieġa. Iva, u idling biex jokkupa r-riżorsi huwa wkoll irrazzjonali.
Għalhekk, is-senser tagħna se jniedi vertiċi sussegwenti tal-graff jekk ikun hemm informazzjoni ġdida fil-posta, u jimmarka wkoll l-informazzjoni preċedenti bħala irrilevanti.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Nirċievu u nużaw data
Biex tirċievi u tipproċessa d-dejta, tista 'tikteb operatur separat, tista' tuża dawk lesti. Peress li l-loġika għadha trivjali - biex tieħu data mill-ittra, pereżempju, nissuġġerixxi l-istandard PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления
Mill-mod, jekk il-posta korporattiva tiegħek tinsab ukoll fuq mail.ru, allura ma tkunx tista 'tfittex ittri skont is-suġġett, il-mittent, eċċ. Lura fl-2016, wegħdu li se jintroduċuha, iżda jidher li bidlu fehmhom. I solvut din il-problema billi ħoloq folder separat għall-ittri meħtieġa u waqqaf filtru għall-ittri meħtieġa fl-interface web tal-posta. Għalhekk, l-ittri u l-kundizzjonijiet meħtieġa biss għat-tfittxija, fil-każ tiegħi, sempliċement (UNSEEN) jidħlu f'dan il-folder.
Fil-qosor, għandna s-sekwenza li ġejja: niċċekkjaw jekk hemmx ittri ġodda li jissodisfaw il-kundizzjonijiet, jekk hemmx, imbagħad iniżżlu l-arkivju billi tuża l-link mill-aħħar ittra.
Taħt l-aħħar tikek, jitħalla barra li dan l-arkivju se jiġi żppakkjat, id-dejta mill-arkivju se tiġi kklerjata u pproċessata, u bħala riżultat, il-ħaġa sħiħa se tmur lil hinn mill-pipeline tal-proċess ETL, iżda dan huwa diġà lil hinn l-ambitu tal-artikolu. Jekk irriżulta interessanti u utli, allura bi pjaċir inkompli niddeskrivi s-soluzzjonijiet ETL u l-partijiet tagħhom għal Apache Airflow.
Sors: www.habr.com