Haijalishi ni teknolojia ngapi inakua, mlolongo wa mbinu za kizamani daima hufuata maendeleo. Hii inaweza kuwa kutokana na mabadiliko ya laini, mambo ya kibinadamu, mahitaji ya kiteknolojia, au kitu kingine. Katika uwanja wa usindikaji wa data, vyanzo vya data ndivyo vinavyofichua zaidi katika sehemu hii. Haijalishi ni kiasi gani tunaota ya kuondokana na hili, lakini hadi sasa sehemu ya data inatumwa kwa wajumbe wa papo hapo na barua pepe, bila kutaja fomati zaidi za kizamani. Ninakualika utenganishe moja ya chaguo za Apache Airflow, nikionyesha jinsi unavyoweza kuchukua data kutoka kwa barua pepe.
kabla ya historia
Data nyingi bado huhamishwa kupitia barua pepe, kutoka kwa mawasiliano baina ya watu hadi viwango vya mwingiliano kati ya makampuni. Ni vizuri ikiwa inawezekana kuandika kiolesura cha kupata data au kuweka watu ofisini ambao wataingiza habari hii katika vyanzo rahisi zaidi, lakini mara nyingi hii inaweza kuwa haiwezekani. Jukumu mahususi ambalo nilikabili lilikuwa kuunganisha mfumo wa CRM maarufu kwenye ghala la data, na kisha kwa mfumo wa OLAP. Ilifanyika kihistoria kwamba kwa kampuni yetu matumizi ya mfumo huu yalikuwa rahisi katika eneo fulani la biashara. Kwa hivyo, kila mtu alitaka sana kuweza kufanya kazi na data kutoka kwa mfumo huu wa watu wengine pia. Awali ya yote, bila shaka, uwezekano wa kupata data kutoka kwa API wazi ilisomwa. Kwa bahati mbaya, API haikushughulikia kupata data zote muhimu, na, kwa maneno rahisi, ilikuwa imepotoshwa kwa njia nyingi, na msaada wa kiufundi haukutaka au haukuweza kufikia nusu ili kutoa utendakazi wa kina zaidi. Lakini mfumo huu ulitoa fursa ya kupokea mara kwa mara data iliyokosekana kwa njia ya barua kwa njia ya kiunga cha kupakua kumbukumbu.
Ikumbukwe kwamba hii haikuwa kesi pekee ambayo biashara ilitaka kukusanya data kutoka kwa barua pepe au wajumbe wa papo hapo. Hata hivyo, katika kesi hii, hatukuweza kushawishi kampuni ya tatu ambayo hutoa sehemu ya data kwa njia hii tu.
mtiririko wa hewa wa apache
Ili kuunda michakato ya ETL, mara nyingi sisi hutumia Apache Airflow. Ili msomaji ambaye hajui teknolojia hii kuelewa vizuri jinsi inavyoonekana katika muktadha na kwa ujumla, nitaelezea michache ya utangulizi.
Apache Airflow ni jukwaa lisilolipishwa ambalo hutumika kujenga, kutekeleza na kufuatilia michakato ya ETL (Extract-Transform-Loading) katika Python. Dhana kuu katika Airflow ni grafu ya acyclic iliyoelekezwa, ambapo vipeo vya grafu ni michakato maalum, na kingo za grafu ni mtiririko wa udhibiti au habari. Mchakato unaweza tu kuita kazi yoyote ya Python, au inaweza kuwa na mantiki ngumu zaidi kutoka kwa kuita kazi kadhaa mfululizo katika muktadha wa darasa. Kwa shughuli za mara kwa mara, tayari kuna maendeleo mengi yaliyotengenezwa tayari ambayo yanaweza kutumika kama michakato. Maendeleo kama haya ni pamoja na:
- waendeshaji - kwa kuhamisha data kutoka sehemu moja hadi nyingine, kwa mfano, kutoka kwa meza ya database hadi ghala la data;
- sensorer - kwa kusubiri tukio la tukio fulani na kuelekeza mtiririko wa udhibiti kwa wima zifuatazo za grafu;
- ndoano - kwa shughuli za kiwango cha chini, kwa mfano, kupata data kutoka kwa meza ya hifadhidata (kutumika katika taarifa);
- nk
Haitakuwa sawa kuelezea Apache Airflow kwa undani katika nakala hii. Utangulizi mfupi unaweza kutazamwa
Hook kwa kupata data
Kwanza kabisa, ili kutatua shida, tunahitaji kuandika ndoano ambayo tunaweza:
- unganisha kwa barua pepe
- tafuta barua sahihi
- kupokea data kutoka kwa barua.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Mantiki ni hii: tunaunganisha, kupata barua ya mwisho inayofaa zaidi, ikiwa kuna wengine, tunawapuuza. Kazi hii inatumiwa, kwa sababu barua za baadaye zina data zote za mapema. Ikiwa sivyo, basi unaweza kurudisha safu ya herufi zote, au kusindika ya kwanza, na iliyobaki kwenye kupita inayofuata. Kwa ujumla, kila kitu, kama kawaida, inategemea kazi.
Tunaongeza kazi mbili za msaidizi kwenye ndoano: kwa kupakua faili na kupakua faili kwa kutumia kiungo kutoka kwa barua pepe. Kwa njia, wanaweza kuhamishwa kwa operator, inategemea mzunguko wa kutumia utendaji huu. Nini kingine cha kuongeza kwenye ndoano, tena, inategemea kazi: ikiwa faili zinapokelewa mara moja katika barua, basi unaweza kupakua viambatisho kwa barua, ikiwa data imepokelewa katika barua, basi unahitaji kuchanganua barua, na kadhalika. Katika kesi yangu, barua inakuja na kiungo kimoja kwenye kumbukumbu, ambayo ninahitaji kuweka mahali fulani na kuanza mchakato zaidi wa usindikaji.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Nambari ni rahisi, kwa hivyo haihitaji maelezo zaidi. Nitakuambia tu kuhusu mstari wa kichawi imap_conn_id. Apache Airflow huhifadhi vigezo vya muunganisho (kuingia, nenosiri, anwani, na vigezo vingine) vinavyoweza kufikiwa na kitambulishi cha kamba. Kwa kuibua, usimamizi wa muunganisho unaonekana kama hii
Kitambuzi cha kusubiri data
Kwa kuwa tayari tunajua jinsi ya kuunganisha na kupokea data kutoka kwa barua, sasa tunaweza kuandika kitambuzi ili kuwasubiri. Katika kesi yangu, haikufanya kazi kuandika opereta mara moja ambayo itashughulikia data, ikiwa ipo, kwa sababu michakato mingine inafanya kazi kulingana na data iliyopokelewa kutoka kwa barua, pamoja na zile zinazochukua data inayohusiana kutoka kwa vyanzo vingine (API, simu. , vipimo vya wavuti, n.k.) nk.). Nitakupa mfano. Mtumiaji mpya ameonekana katika mfumo wa CRM, na bado hatujui kuhusu UUID yake. Kisha, tunapojaribu kupokea data kutoka kwa simu ya SIP, tutapokea simu zilizounganishwa na UUID yake, lakini hatutaweza kuzihifadhi na kuzitumia kwa usahihi. Katika masuala hayo, ni muhimu kukumbuka utegemezi wa data, hasa ikiwa ni kutoka kwa vyanzo tofauti. Hizi ni, bila shaka, hatua za kutosha za kuhifadhi uadilifu wa data, lakini katika baadhi ya matukio ni muhimu. Ndio, na kuzembea kuchukua rasilimali pia ni jambo lisilo la busara.
Kwa hivyo, sensor yetu itazindua wima zinazofuata za grafu ikiwa kuna habari mpya kwenye barua, na pia alama habari ya hapo awali kuwa haina maana.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Tunapokea na kutumia data
Ili kupokea na kusindika data, unaweza kuandika opereta tofauti, unaweza kutumia zilizotengenezwa tayari. Kwa kuwa hadi sasa mantiki ni ndogo - kuchukua data kutoka kwa barua, kwa mfano, ninapendekeza PythonOperator ya kawaida.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Kwa njia, ikiwa barua yako ya ushirika pia iko kwenye mail.ru, basi hutaweza kutafuta barua kwa somo, mtumaji, nk. Huko nyuma mnamo 2016, waliahidi kuitambulisha, lakini inaonekana walibadilisha mawazo yao. Nilitatua tatizo hili kwa kuunda folda tofauti kwa barua muhimu na kuanzisha chujio kwa barua muhimu katika interface ya mtandao wa barua. Kwa hivyo, barua na masharti muhimu tu ya utaftaji, kwa upande wangu, kwa urahisi (isiyoonekana) huingia kwenye folda hii.
Kwa muhtasari, tuna mlolongo wafuatayo: tunaangalia ikiwa kuna barua mpya zinazokidhi masharti, ikiwa zipo, basi tunapakua kumbukumbu kwa kutumia kiungo kutoka kwa barua ya mwisho.
Chini ya dots za mwisho, imeachwa kuwa kumbukumbu hii itafunguliwa, data kutoka kwenye kumbukumbu itafutwa na kusindika, na kwa sababu hiyo, jambo lote litaenda zaidi kwenye bomba la mchakato wa ETL, lakini hii tayari imepita. upeo wa makala. Ikiwa iligeuka kuwa ya kufurahisha na muhimu, basi nitaendelea kuelezea suluhisho za ETL na sehemu zao za Apache Airflow.
Chanzo: mapenzi.com