Si kasta oo ay tignoolajiyadu u horumarto, habab duugoobay ayaa had iyo jeer ka dambeeya horumarka. Tan waxaa sabab u ah kala-guurka fudud, arrimaha aadanaha, baahida tignoolajiyada, ama wax kale. Dhinaca habaynta xogta, ilaha xogta ayaa ah kuwa ugu muujinta badan qaybtan. Si kastaba ha ahaatee inta aan ku riyooneyno inaan ka takhalusno tan, laakiin ilaa hadda qayb ka mid ah xogta waxaa loo diraa fariimaha degdega ah iyo emails, oo aan ku xusin qaabab badan oo qadiimi ah. Waxaan kugu martiqaadayaa inaad kala saarto mid ka mid ah fursadaha Apache Airflow, oo muujinaya sida aad xogta uga qaadan karto iimaylada.
prehistory
Xog badan ayaa wali lagu kala qaadaa iimaylka, laga soo bilaabo isgaarsiinta dadka dhexdooda ilaa heerarka is dhexgalka shirkadaha. Way fiicantahay haddii aad qori karto interface si aad xogta u hesho ama aad dadka u geliso xafiiska kuwaas oo geli doona macluumaadkan ilo ku habboon, laakiin inta badan suurtagalnimadan ayaa laga yaabaa inaysan halkaas joogin. Hawsha gaarka ah ee aan la kulmay waxay ahayd ku xidhidhiyaha nidaamka CRM ee caanka ah bakhaarka xogta, ka dibna nidaamka OLAP. Taariikh ahaan, shirkaddayada, isticmaalka nidaamkani wuxuu ahaa mid ku habboon meel gaar ah oo ganacsi ah. Sidaa darteed, qof kastaa wuxuu runtii rabay inuu awood u yeesho inuu ku shaqeeyo xogta nidaamkan dhinac saddexaad sidoo kale. Ugu horreyntii, dabcan, suurtagalnimada helitaanka xogta API-ga furan ayaa la sahamiyay. Nasiib darro, API ma daboolin helitaanka dhammaan xogta lagama maarmaanka ah, iyo, ereyada fudud, waxay ahayd inta badan qalloocan, iyo taageerada farsamo ma aysan rabin ama awoodin inay la kulmaan kala bar si ay u bixiyaan hawlo badan oo dhamaystiran. Laakiin nidaamkani waxa uu siiyay fursad uu ku helo xogta maqan iimaylka xilliyada qaarkood qaab xidhiidhiye si loo soo dejiyo kaydka.
Waa in la ogaadaa in tani aysan ahayn kiis kaliya ee ganacsigu rabay inuu ka ururiyo xogta emails ama fariimaha degdega ah. Si kastaba ha ahaatee, kiiskan, ma saameyn karno shirkad saddexaad oo bixisa qayb ka mid ah xogta habkan oo keliya.
Socodka hawada Apache
Si loo dhiso hababka ETL, waxaan inta badan isticmaalnaa Apache Airflow. Si akhristaha aan aqoonta u lahayn tignoolajiyadan uu si fiican u fahmo sida ay u ekaanayso macnaha guud iyo guud ahaan, waxaan ku tilmaami doonaa dhowr hor-dhac ah.
Apache Airflow waa madal bilaash ah oo loo isticmaalo in lagu dhiso, lagu fuliyo laguna kormeero ETL (Soosaar-Transform-Loading) hababka Python. Fikradda ugu weyn ee socodka hawadu waa garaaf acyclic ah oo toosan, halkaasoo geesaha garaafku ay yihiin habab gaar ah, iyo cidhifyada garaafku waa socodka xakamaynta ama macluumaadka. Nidaamku wuxuu si fudud u wici karaa shaqo kasta oo Python ah, ama waxay yeelan kartaa caqli badan oo kakan oo si isdaba joog ah ugu yeera dhowr hawlood oo macnaha guud ee fasalka ah. Hawlgallada ugu badan, waxaa horeyba u jiray horumarro badan oo diyaarsan oo loo isticmaali karo habraac ahaan. Horumarka noocaan ah waxaa ka mid ah:
- hawl-wadeennada - u wareejinta xogta meel ilaa meel kale, tusaale ahaan, miiska xogta kaydinta xogta;
- dareemayaasha - sugitaanka dhacdada dhacdo gaar ah oo hagaya socodka xakamaynta ee geesaha xiga ee garaafka;
- jillaabyada - hawlgallada heerka hoose, tusaale ahaan, si aad xogta uga hesho miiska xogta (loo isticmaalo hadallada);
- iyo wixii la mid ah.
Ma habboonayn in Apache Airflow si faahfaahsan loogu qeexo maqaalkan. Hordhac kooban ayaa la eegi karaa
Hook si aad u hesho xogta
Marka hore, si loo xalliyo dhibaatada, waxaan u baahannahay inaan qorno jillaab aan ku sameyn karno:
- ku xidhid iimaylka;
- Hel warqadda aad u baahan tahay;
- ka hel xogta warqadda.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Caqligu waa kan: waan isku xireynaa, helnaa xarafka ugu dambeeya ee ugu habboon, haddii ay jiraan kuwa kale, waan iska indhatiray. Shaqadan ayaa la isticmaalaa, sababtoo ah xarfaha dambe waxay ka kooban yihiin dhammaan xogta kuwii hore. Haddi aanay taasi arinku ahayn, waxa aad soo celin kartaa xarfaha oo dhan, ama waxa aad ka shaqayn kartaa midda hore, inta soo hadhayna waxa aad mari kartaa kaarka xiga. Guud ahaan, wax walba, sida had iyo jeer, waxay kuxirantahay hawsha.
Waxaan ku darnaa laba hawlood oo caawiye ah: si loo soo dejiyo faylka iyo soo dejinta faylka iyadoo la adeegsanayo isku xirka xarafka. Jid ahaan, waxay ku dari karaan hawlwadeenka, waxay kuxirantahay inta jeer ee isticmaalka shaqadan. Maxaa kale oo lagu daro jillaab, mar kale, waxay kuxirantahay hawsha: haddii faylasha isla markiiba la helo warqadda, markaa waxaad soo dejisan kartaa lifaaqyada warqadda, haddii xogtu ku timaado warqadda, markaa waxaad u baahan tahay inaad kala soo baxdo warqadda, iwm. Xaaladeyda, warqaddu waxay la timaaddaa hal xiriiriye oo kaydka ah, kaas oo aan u baahanahay inaan dhigo meel gaar ah oo aan bilaabo habayn dheeraad ah.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Koodhku waa sahlan yahay, markaa si dhib leh uma baahna sharraxaad dheeraad ah. Kaliya waxaan kaaga sheegi doonaa wax ku saabsan khadka sixirka imap_conn_id. Apache Airflow waxay kaydisaa xuduudaha isku xidhka (galitaanka, erayga sirta ah, ciwaanka, iyo xuduudo kale) kuwaas oo uu gali karo aqoonsiga xadhiga. Muuqaal ahaan, maaraynta isku xidhka ayaa sidan u eeg
Dareemka sugitaanka xogta
Maadaama aan horeyba u naqaanay sida loo xiro oo aan xogta uga helno boostada, hadda waxaan qori karnaa dareeraha si aan u sugno. Xaaladeyda, isla markiiba qorista hawlwadeen ka baaraandegi doona xogta, haddii ay jiraan, ma shaqeynin, maadaama habab kale ay shaqeeyaan iyadoo lagu saleynayo xogta laga helo boostada, oo ay ku jiraan kuwa ka qaata xogta la xiriirta ilo kale (API, telephony, metrik webka, iwm) iwm.). Aan tusaale ku siiyo. Isticmaale cusub ayaa ka soo muuqday nidaamka CRM, welina ma ogin UUID-giisa. Kadib, marka aan isku dayno inaan ka helno xogta taleefanka SIP, waxaan heli doonaa wicitaano ku xiran UUID-keeda, laakiin ma awoodi doono inaan si sax ah u keydiyo oo aan isticmaalno. Su'aalahan oo kale, waxaa muhiim ah in maskaxda lagu hayo ku tiirsanaanta xogta, gaar ahaan haddii ay yihiin ilo kala duwan. Kuwani waa, dabcan, tallaabooyin aan ku filnayn si loo ilaaliyo daacadnimada xogta, laakiin xaaladaha qaarkood waa lagama maarmaan. Iyo in la amaahdo kheyraadka iyadoo aan shaqeynin sidoo kale waa caqli-gal.
Markaa, dareemahayagu wuxuu bilaabi doonaa geesaha xiga ee garaafyada haddii ay jiraan macluumaad cusub oo ku jira boostada, sidoo kale wuxuu calaamadin doonaa macluumaadka hore mid aan khusayn.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Qaadashada iyo isticmaalka xogta
Si loo helo loona habeeyo xogta, waxaad qori kartaa hawlwadeen gaar ah, waxaad isticmaali kartaa kuwa diyaarsan. Maaddaama ilaa hadda caqligu yahay wax aan macquul ahayn - in laga soo qaado xogta xarafka, tusaale ahaan, waxaan soo jeedinayaa heerka PythonOperator.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Jid ahaan, haddii iimaylkaaga shirkadu uu sidoo kale ku yaal mail.ru, markaa ma awoodi doontid inaad ku raadiso xarfaha mawduuca, soo diraha, iwm. Waxay ballan qaadeen inay dib u soo bandhigi doonaan 2016, laakiin sida muuqata waxay beddeleen maskaxdooda. Waxaan xaliyay dhibaatadan anigoo abuuraya gal gaar ah oo loogu talagalay xarfaha lagama maarmaanka ah iyo dejinta shaandhaynta shabakada boostada ee xarfaha lagama maarmaanka ah. Markaa, kaliya xarfaha lagama maarmaanka ah iyo shuruudaha raadinta, kiiskeyga, si fudud (AAN LA ARAGIN) gal galkan.
Soo koobid, waxaan haynaa taxanaha soo socda: waxaan hubineynaa haddii ay jiraan xarfo cusub oo buuxiya shuruudaha, haddii ay jiraan, ka dibna waxaan soo dejineynaa kaydka annaga oo isticmaalaya xiriirka ka xarafka ugu dambeeya.
Marka la eego dhibcaha ugu dambeeya, waxaa laga saarayaa in kaydkan la furi doono, xogta kaydka waa la nadiifin doonaa oo laga baaraandegi doonaa, natiijaduna waxay tahay, wax walbaa waxay sii socon doonaan dhuumaha geedi socodka ETL, laakiin tani mar hore way dhaaftay. baaxadda maqaalka. Haddii ay noqoto mid xiiso leh oo faa'iido leh, markaa waxaan si farxad leh u sii wadi doonaa inaan sharaxo xalalka ETL iyo qaybahooda Apache Airflow.
Source: www.habr.com