Akunandaba ukuthi kukhula kangakanani ubuchwepheshe, uchungechunge lwezindlela eziphelelwe yisikhathi luhlala lulandela intuthuko. Lokhu kungase kube ngenxa yenguquko eshelelayo, izici zomuntu, izidingo zobuchwepheshe, noma enye into. Emkhakheni wokucubungula idatha, imithombo yedatha yiyona eveza kakhulu kule ngxenye. Kungakhathaliseki ukuthi siphupha kangakanani ngokususa lokhu, kodwa kuze kube manje ingxenye yedatha ithunyelwa ngezithunywa ezisheshayo nama-imeyili, ingasaphathwa efomethi ye-archaic eyengeziwe. Ngiyakumema ukuthi uhlukanise eyodwa yezinketho ze-Apache Airflow, okubonisa ukuthi ungathatha kanjani idatha kuma-imeyili.
prehistory
Idatha eminingi isadluliswa nge-imeyili, isuka kwezokuxhumana nabantu iye kumazinga okuxhumana phakathi kwezinkampani. Kuhle uma kungenzeka ukubhala i-interface ukuze uthole idatha noma ubeke abantu ehhovisi abazofaka lolu lwazi emithonjeni elula, kodwa ngokuvamile lokhu kungase kungenzeki. Umsebenzi othile engibhekane nawo wawukuxhuma isistimu ye-CRM edume kabi endaweni yokugcina idatha, bese kuba uhlelo lwe-OLAP. Kwenzeka ngokomlando ukuthi enkampanini yethu ukusetshenziswa kwalolu hlelo kwakulula endaweni ethile yebhizinisi. Ngakho-ke, wonke umuntu wayefuna ngempela ukukwazi ukusebenza ngedatha evela kulolu hlelo lomuntu wesithathu. Okokuqala, yiqiniso, ithuba lokuthola idatha ku-API evulekile yafundwa. Ngeshwa, i-API ayizange ihlanganise ukuthola yonke idatha edingekayo, futhi, ngamagama alula, yayigwegwile ngezindlela eziningi, futhi ukusekelwa kwezobuchwepheshe kwakungafuni noma kwakungakwazi ukuhlangabezana nohhafu ukuze kunikeze ukusebenza okubanzi. Kodwa lolu hlelo lunikeze ithuba lokuthola idatha elahlekile ngezikhathi ezithile ngeposi ngendlela yesixhumanisi sokukhipha ingobo yomlando.
Kufanele kuqashelwe ukuthi lesi kwakungesona kuphela isimo lapho ibhizinisi lalifuna ukuqoqa idatha kuma-imeyili noma izithunywa ezisheshayo. Nokho, kulesi simo, asikwazanga ukuthonya inkampani yezinkampani zangaphandle ehlinzeka ngengxenye yedatha ngale ndlela kuphela.
I-Apache Airflow
Ukwakha izinqubo ze-ETL, sivamise ukusebenzisa i-Apache Airflow. Ukuze umfundi ongajwayele lobu buchwepheshe aqonde kangcono ukuthi bubukeka kanjani kumongo futhi ngokujwayelekile, ngizochaza ezimbalwa ezethulayo.
I-Apache Airflow iyinkundla yamahhala esetshenziselwa ukwakha, ukwenza nokuqapha izinqubo ze-ETL (Extract-Transform-Loading) ePython. Umqondo oyinhloko ku-Airflow igrafu ye-acyclic eqondisiwe, lapho ama-vertices egrafu eyizinqubo ezithile, futhi imiphetho yegrafu iwukugeleza kokulawula noma ulwazi. Inqubo ingavele ibize noma yimuphi umsebenzi wePython, noma ingaba nengqondo eyinkimbinkimbi kakhulu kusukela ekubizeni ngokulandelana imisebenzi eminingana kumongo wekilasi. Emisebenzini evame kakhulu, sekuvele kunentuthuko eminingi esenziwe ngomumo engasetshenziswa njengezinqubo. Intuthuko enjalo ihlanganisa:
- opharetha - ukudlulisa idatha isuka kwenye indawo iye kwenye, isibonelo, isuka kuthebula ledathabheyisi iye endaweni yokugcina idatha;
- izinzwa - ukulinda ukwenzeka kwesenzakalo esithile nokuqondisa ukugeleza kokulawula kuma-vertices alandelayo wegrafu;
- izingwegwe - zemisebenzi yezinga eliphansi, isibonelo, ukuthola idatha kusuka kuthebula ledathabhethi (elisetshenziswa ezitatimendeni);
- nokunye.
Kuyoba okungafanelekile ukuchaza i-Apache Airflow ngokuningiliziwe kulesi sihloko. Izingeniso ezimfishane zingabukwa
Hook ukuze uthole idatha
Okokuqala, ukuze sixazulule inkinga, sidinga ukubhala ingwegwe esingakwazi ngayo:
- xhuma ku-imeyili
- thola incwadi efanele
- thola idatha evela encwadini.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Umqondo uthi: sixhuma, sithole uhlamvu lokugcina olufaneleka kakhulu, uma kukhona ezinye, asizinaki. Lo msebenzi usetshenziswa, ngoba izinhlamvu zakamuva ziqukethe yonke idatha yangaphambili. Uma kungenjalo, ungabuyisela uxhaxha lwazo zonke izinhlamvu, noma ucubungule eyokuqala, bese esele ngephasi elandelayo. Ngokuvamile, yonke into, njengenjwayelo, incike emsebenzini.
Sengeza imisebenzi emibili yokusiza kuhuku: ukulanda ifayela nokulanda ifayela usebenzisa isixhumanisi esivela ku-imeyili. Ngendlela, bangathuthelwa ku-opharetha, kuya ngemvamisa yokusebenzisa lokhu kusebenza. Yini enye ongayengeza ku-hook, futhi, incike emsebenzini: uma amafayela atholwe ngokushesha encwadini, ungalanda okunamathiselwe encwadini, uma idatha itholwa encwadini, khona-ke udinga ukuhlukanisa incwadi, njll. Endabeni yami, incwadi iza nesixhumanisi esisodwa sengobo yomlando, engidinga ukuyibeka endaweni ethile futhi ngiqale inqubo yokucubungula eyengeziwe.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Ikhodi ilula, ngakho ayidingi incazelo eyengeziwe. Ngizokutshela nje ngomugqa womlingo imap_conn_id. I-Apache Airflow igcina amapharamitha okuxhumana (ukungena ngemvume, iphasiwedi, ikheli, namanye amapharamitha) angafinyelelwa ngesihlonzi seyunithi yezinhlamvu. Ngokubukeka, ukuphathwa kokuxhumana kubukeka kanje
Inzwa izolinda idatha
Njengoba sesivele sazi ukuthi singaxhuma futhi samukele kanjani idatha evela kumeyili, manje sesingakwazi ukubhala inzwa ukuze siyilinde. Endabeni yami, akuzange kusebenze ukubhala isisebenzisi ngokushesha esizocubungula idatha, uma ikhona, ngoba ezinye izinqubo zisebenza ngokusekelwe kudatha etholwe nge-imeyili, kuhlanganise naleyo ethatha idatha ehlobene kweminye imithombo (API, ucingo). , amamethrikhi ewebhu, njll.). njll.). Ngizokunikeza isibonelo. Umsebenzisi omusha uvele ohlelweni lwe-CRM, futhi namanje asazi nge-UUID yakhe. Bese, lapho sizama ukuthola idatha evela kucingo lwe-SIP, sizothola amakholi axhunywe ku-UUID yayo, kodwa ngeke sikwazi ukuwalondoloza siwasebenzise ngendlela efanele. Ezindabeni ezinjalo, kubalulekile ukukhumbula ukuncika kwedatha, ikakhulukazi uma ivela emithonjeni ehlukene. Lezi, yiqiniso, izinyathelo ezinganele zokulondoloza ubuqotho bedatha, kodwa kwezinye izimo ziyadingeka. Yebo, futhi ukungenzi lutho ngokuthatha izinsiza nakho akunangqondo.
Ngakho-ke, inzwa yethu izokwethula ama-vertices alandelayo egrafu uma kukhona ulwazi olusha kumeyili, futhi iphinde imake ulwazi lwangaphambilini njengelungabalulekile.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Sithola futhi sisebenzisa idatha
Ukwamukela nokucubungula idatha, ungabhala opharetha ohlukile, ungasebenzisa esenziwe ngomumo. Njengoba i-logic isencane - ukuthatha idatha encwadini, isibonelo, ngiphakamisa i-PythonOperator ejwayelekile
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Ngendlela, uma i-imeyili yakho yenkampani nayo iku-mail.ru, ngeke ukwazi ukucinga izincwadi ngesihloko, umthumeli, njll. Emuva ngo-2016, bathembisa ukuyethula, kodwa ngokusobala bashintsha imiqondo yabo. Ngixazulule le nkinga ngokwakha ifolda ehlukile yezinhlamvu ezidingekayo futhi ngimise isihlungi sezinhlamvu ezidingekayo kusixhumi esibonakalayo sewebhu. Ngakho-ke, kuphela izinhlamvu nemibandela edingekayo yokusesha, esimweni sami, kalula (OKUNGABONWA) ukungena kule folda.
Ukufingqa, sinokulandelana okulandelayo: sibheka ukuthi zikhona yini izinhlamvu ezintsha ezihlangabezana nemibandela, uma zikhona, bese silanda ingobo yomlando sisebenzisa isixhumanisi esivela encwadini yokugcina.
Ngaphansi kwamachashazi okugcina, kushiywe ukuthi le ngobo yomlando izokhishwa, idatha evela kungobo yomlando izosulwa futhi icutshungulwe, futhi ngenxa yalokho, yonke into izoqhubekela phambili epayipini lenqubo ye-ETL, kodwa lokhu sekungaphezu kwalokho. ububanzi besihloko. Uma kubonakale kuthakazelisa futhi kuwusizo, ngizobe ngizoqhubeka nokuchaza izixazululo ze-ETL nezingxenye zazo ze-Apache Airflow.
Source: www.habr.com