Maak nie saak hoeveel tegnologie ontwikkel nie, 'n string verouderde benaderings volg altyd agter ontwikkeling. Dit kan wees as gevolg van 'n gladde oorgang, menslike faktore, tegnologiese behoeftes, of iets anders. Op die gebied van dataverwerking is databronne die mees onthullende in hierdie deel. Maak nie saak hoeveel ons daarvan droom om daarvan ontslae te raak nie, maar tot dusver word 'n deel van die data in kitsboodskappe en e-posse gestuur, om nie eers te praat van meer argaΓ―ese formate nie. Ek nooi jou uit om een ββvan die opsies vir Apache Airflow uitmekaar te haal, wat illustreer hoe jy data uit e-posse kan neem.
voorgeskiedenis
Baie data word steeds per e-pos oorgedra, van interpersoonlike kommunikasie tot standaarde van interaksie tussen maatskappye. Dit is goed as dit moontlik is om 'n koppelvlak te skryf om data te bekom of mense in die kantoor te plaas wat hierdie inligting in geriefliker bronne sal invoer, maar dikwels is dit dalk eenvoudig nie moontlik nie. Die spesifieke taak wat ek in die gesig gestaar het, was om die berugte CRM-stelsel aan die datapakhuis te koppel, en dan aan die OLAP-stelsel. Dit het so histories gebeur dat die gebruik van hierdie stelsel vir ons maatskappy gerieflik was in 'n spesifieke besigheidsarea. Daarom wou almal regtig ook met data van hierdie derdeparty-stelsel kan werk. Eerstens is natuurlik die moontlikheid bestudeer om data van 'n oop API te verkry. Ongelukkig het die API nie gedek om al die nodige data te kry nie, en in eenvoudige terme was dit in baie opsigte skeef, en tegniese ondersteuning wou of kon nie halfpad ontmoet om meer omvattende funksionaliteit te verskaf nie. Maar hierdie stelsel het die geleentheid gebied om die ontbrekende data periodiek per pos te ontvang in die vorm van 'n skakel vir die aflaai van die argief.
Daar moet kennis geneem word dat dit nie die enigste geval was waarin die onderneming data van e-posse of kitsboodskappers wou insamel nie. In hierdie geval kon ons egter nie 'n derdepartymaatskappy beΓ―nvloed wat 'n deel van die data slegs op hierdie manier verskaf nie.
Apache lugvloei
Om ETL-prosesse te bou, gebruik ons ββmeestal Apache Airflow. Sodat 'n leser wat nie met hierdie tegnologie vertroud is nie beter kan verstaan ββhoe dit in die konteks en in die algemeen lyk, sal ek 'n paar inleidende beskryf.
Apache Airflow is 'n gratis platform wat gebruik word om ETL (Extract-Transform-Loading)-prosesse in Python te bou, uit te voer en te monitor. Die hoofkonsep in Lugvloei is 'n gerigte asikliese grafiek, waar die hoekpunte van die grafiek spesifieke prosesse is, en die rande van die grafiek die vloei van beheer of inligting is. 'n Proses kan eenvoudig enige Python-funksie oproep, of dit kan meer komplekse logika hΓͺ om verskeie funksies opeenvolgend in die konteks van 'n klas te roep. Vir die mees gereelde operasies is daar reeds baie klaargemaakte ontwikkelings wat as prosesse gebruik kan word. Sulke ontwikkelings sluit in:
- operateurs - vir die oordrag van data van een plek na 'n ander, byvoorbeeld van 'n databasistabel na 'n datapakhuis;
- sensors - om te wag vir die voorkoms van 'n sekere gebeurtenis en die vloei van beheer na die daaropvolgende grafiekhoekpunte te rig;
- hake - vir laervlak-bewerkings, byvoorbeeld om data van 'n databasistabel te kry (gebruik in stellings);
- ens.
Dit sal onvanpas wees om Apache Airflow in detail in hierdie artikel te beskryf. Kort inleidings kan besigtig word
Haak om data te kry
Eerstens, om die probleem op te los, moet ons 'n haak skryf waarmee ons kan:
- koppel aan e-pos
- vind die regte letter
- data uit die brief ontvang.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Die logika is dit: ons verbind, vind die laaste mees relevante letter, as daar ander is, ignoreer ons hulle. Hierdie funksie word gebruik, want latere briewe bevat al die data van vroeΓ«re. As dit nie die geval is nie, kan jy 'n reeks van alle letters terugstuur, of die eerste een verwerk, en die res met die volgende pas. Oor die algemeen hang alles, soos altyd, van die taak af.
Ons voeg twee hulpfunksies by die haak: om 'n lΓͺer af te laai en om 'n lΓͺer af te laai met 'n skakel van 'n e-pos. Terloops, hulle kan na die operateur geskuif word, dit hang af van die frekwensie van die gebruik van hierdie funksionaliteit. Wat anders om by die haak te voeg, hang weer van die taak af: as lΓͺers onmiddellik in die brief ontvang word, kan u aanhangsels by die brief aflaai, as die data in die brief ontvang word, moet u die brief ontleed, ens. In my geval kom die brief met een skakel na die argief, wat ek op 'n sekere plek moet plaas en die verdere verwerkingsproses moet begin.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Die kode is eenvoudig, so dit het skaars verdere verduideliking nodig. Ek sal jou net vertel van die towerlyn imap_conn_id. Apache Airflow stoor verbindingsparameters (aanmelding, wagwoord, adres en ander parameters) wat deur 'n string-identifiseerder verkry kan word. Visueel lyk verbindingsbestuur so
Sensor om te wag vir data
Aangesien ons reeds weet hoe om data van pos te koppel en te ontvang, kan ons nou 'n sensor skryf om vir hulle te wag. In my geval het die skryf van 'n operateur wat die data sal verwerk, indien enige, nie dadelik uitgewerk nie, aangesien ander prosesse ook werk op grond van die data wat van die pos ontvang is, insluitend diΓ© wat verwante data van ander bronne af neem (API, telefonie, webmetriek, ens.). Ek sal jou 'n voorbeeld gee. 'n Nuwe gebruiker het in die CRM-stelsel verskyn, en ons weet steeds nie van sy UUID nie. Dan, wanneer ons probeer om data vanaf SIP-telefonie te ontvang, sal ons oproepe ontvang wat aan sy UUID gekoppel is, maar ons sal dit nie reg kan stoor en gebruik nie. In sulke sake is dit belangrik om die afhanklikheid van die data in gedagte te hou, veral as dit uit verskillende bronne kom. Dit is natuurlik onvoldoende maatreΓ«ls om data-integriteit te bewaar, maar in sommige gevalle is dit nodig. Ja, en luier om hulpbronne te beset is ook irrasioneel.
Ons sensor sal dus die volgende hoekpunte van die grafiek begin as daar vars inligting in die pos is, en ook die vorige inligting as irrelevant merk.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Ons ontvang en gebruik data
Om data te ontvang en te verwerk, kan jy 'n aparte operateur skryf, jy kan klaargemaakte gebruik. Aangesien die logika tot dusver triviaal is - om byvoorbeeld data uit die brief te neem, stel ek die standaard PythonOperator voor
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Terloops, as u korporatiewe pos ook op mail.ru is, sal u nie briewe kan soek volgens onderwerp, sender, ens. In 2016 het hulle belowe om dit bekend te stel, maar het blykbaar van plan verander. Ek het hierdie probleem opgelos deur 'n aparte vouer vir die nodige briewe te skep en 'n filter vir die nodige briewe in die pos-webkoppelvlak op te stel. Dus, slegs die nodige briewe en voorwaardes vir die soektog, in my geval, kom eenvoudig (ONSIEN) in hierdie gids.
Opsommend het ons die volgende volgorde: ons kyk of daar nuwe briewe is wat aan die voorwaardes voldoen, as daar is, dan laai ons die argief af deur die skakel van die laaste brief te gebruik.
Onder die laaste kolletjies word dit weggelaat dat hierdie argief uitgepak sal word, die data uit die argief sal uitgevee en verwerk word, en gevolglik sal die hele ding verder gaan na die pyplyn van die ETL-proses, maar dit is reeds verby die omvang van die artikel. As dit interessant en bruikbaar blyk, sal ek graag voortgaan om ETL-oplossings en hul onderdele vir Apache Airflow te beskryf.
Bron: will.com