Nokuba ingakanani itekhnoloji ephuhlayo, umtya weendlela eziphelelwe lixesha uhlala ulandela uphuhliso. Oku kusenokuba ngenxa yenguqu egudileyo, izinto zabantu, iimfuno zobuchwepheshe, okanye enye into. Kwintsimi yokulungiswa kwedatha, imithombo yedatha yeyona nto ityhilayo kule nxalenye. Kungakhathaliseki ukuba siphupha kangakanani ukulahla oku, kodwa ukuza kuthi ga ngoku inxalenye yedatha ithunyelwa kwizithunywa ezikhawulezayo kunye nee-imeyile, singasathethi ke iifomati ze-archaic. Ndiyakumema ukuba udibanise enye yeenketho ze-Apache Airflow, ebonisa indlela ongathatha ngayo idatha kwii-imeyile.
ukubuzwa
Uninzi lwedatha lusakhutshelwa nge-imeyile, ukusuka kunxibelelwano phakathi kwabantu ukuya kwimigangatho yentsebenziswano phakathi kweenkampani. Kulungile ukuba kunokwenzeka ukubhala ujongano ukufumana idatha okanye ukubeka abantu eofisini abaya kungenisa olu lwazi kwimithombo eluncedo ngakumbi, kodwa amaxesha amaninzi oku kusenokungenzeki. Umsebenzi okhethekileyo endijongene nawo yayikukudibanisa inkqubo yeCRM edume kakubi kwindawo yokugcina idatha, kwaye emva koko kwinkqubo ye-OLAP. Kwenzeke ngokwembali ukuba kwinkampani yethu ukusetyenziswa kwale nkqubo kwakulula kwindawo ethile yeshishini. Ke ngoko, wonke umntu wayefuna ngenene ukukwazi ukusebenza ngedatha evela kule nkqubo yomntu wesithathu ngokunjalo. Okokuqala, ngokuqinisekileyo, ithuba lokufumana idatha kwi-API evulekile yafundwa. Ngelishwa, i-API ayizange igubungele ukufumana yonke idatha efunekayo, kwaye, ngeendlela ezilula, yayiziindlela ezininzi ezigwenxa, kwaye inkxaso yezobuchwephesha ayifuni okanye ayikwazanga ukuhlangabezana nesiqingatha sokubonelela ngokusebenza okubanzi. Kodwa le nkqubo inike ithuba lokufumana ngamaxesha athile idatha elahlekileyo ngeposi ngendlela yekhonkco lokukhulula i-archive.
Kufuneka kuqatshelwe ukuba oku kwakungeyona kuphela imeko apho ishishini lalifuna ukuqokelela idatha kwii-imeyile okanye kwizithunywa ezikhawulezayo. Nangona kunjalo, kulo mzekelo, asikwazanga ukuphembelela inkampani yesithathu enikezela inxalenye yedatha kuphela ngale ndlela.
i-apache airflow
Ukwakha iinkqubo ze-ETL, sihlala sisebenzisa iApache Airflow. Ukuze umfundi ongaqhelananga nale teknoloji aqonde ngcono indlela ekhangeleka ngayo kumxholo kwaye ngokubanzi, ndiya kuchaza isibini sentshayelelo.
I-Apache Airflow yindawo yamahhala esetyenziselwa ukwakha, ukwenza kunye nokubeka iliso kwiinkqubo ze-ETL (i-Extract-Transform-Loading) kwiPython. Ingcamango ephambili kwi-Airflow yigrafu ye-acyclic eqondisiweyo, apho i-vertices yegrafu ziinkqubo ezithile, kwaye imiphetho yegrafu yindlela yokulawula okanye ulwazi. Inkqubo inokubiza ngokulula nawuphi na umsebenzi wePython, okanye inokuba nengqiqo entsonkothileyo ukusuka ekubizeni ngokulandelelana imisebenzi emininzi kumxholo weklasi. Kweyona misebenzi ixhaphakileyo, sele kukho uphuhliso oluninzi olwenziweyo olunokusetyenziswa njengeenkqubo. Olu phuhliso lubandakanya:
- abaqhubi - ukudlulisa idatha ukusuka kwenye indawo ukuya kwenye, umzekelo, ukusuka kwitafile yedatha ukuya kwindawo yokugcina idatha;
- abenzi boluvo - ukulinda ukwenzeka kwesiganeko esithile kunye nokuqondisa ukuhamba kokulawula kwii-vertices ezilandelayo zegrafu;
- iikhonkco - kwimisebenzi yezinga eliphantsi, umzekelo, ukufumana idatha kwitafile yedatha (esetyenziswe kwiingxelo);
- njalo njalo.
Kuya kuba yinto engafanelekanga ukuchaza iApache Airflow ngokweenkcukacha kweli nqaku. Iintshayelelo ezimfutshane zinokujongwa
Hook ukufumana idatha
Okokuqala, ukusombulula ingxaki, kufuneka sibhale ikhonkco esinokuthi ngalo:
- qhagamshela kwi-imeyile
- fumana unobumba ochanekileyo
- fumana idatha kwileta.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Ingqiqo yile: sidibanisa, sifumana unobumba wokugqibela ofanelekileyo, ukuba kukho abanye, asibahoyi. Lo msebenzi usetyenziswa, kuba iileta zamva ziqulethe yonke idatha yangaphambili. Ukuba oku akunjalo, ngoko ungabuyisela uluhlu lwabo bonke oonobumba, okanye usebenze eyokuqala, kunye nabanye kwipasi elandelayo. Ngokubanzi, yonke into, njengesiqhelo, ixhomekeke kumsebenzi.
Songeza imisebenzi emibini yokuncedisa kwi-hook: ukukhuphela ifayile kunye nokukhuphela ifayile usebenzisa ikhonkco kwi-imeyile. Ngendlela, banokushenxiswa kumqhubi, kuxhomekeke kwixesha lokusebenzisa lo msebenzi. Yintoni enye yokongeza kwi-hook, kwakhona, kuxhomekeke kumsebenzi: ukuba iifayile zifunyenwe ngokukhawuleza kwileta, ngoko unokukhuphela izihlomelo kwileta, ukuba idatha ifunyenwe kwileta, ngoko kufuneka uhlalutye ileta, njl. Kwimeko yam, ileta iza nekhonkco enye kwindawo yokugcina, endiyidingayo ukuyibeka kwindawo ethile kwaye ndiqalise inkqubo yokuqhubela phambili.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Ikhowudi ilula, ngoko ayifuni ngcaciso eyongezelelekileyo. Ndiza kukuxelela ngomgca womlingo imap_conn_id. I-Apache Airflow igcina iiparamitha zoqhagamshelo (ukungena, igama lokugqitha, idilesi, kunye nezinye iiparamitha) ezinokufikelelwa ngesichongi somtya. Ngokubonakalayo, ulawulo loqhagamshelwano lujongeka ngolu hlobo
Inzwa yokulinda idatha
Ekubeni sele siyayazi indlela yokudibanisa kunye nokufumana idatha kwi-imeyile, ngoku sinokubhala inzwa yokuyilinda. Kwimeko yam, ayizange isebenze ukubhala umqhubi ngoko nangoko oya kuqhuba idatha, ukuba ikhona, kuba ezinye iinkqubo zisebenza ngokusekelwe kwidatha efunyenwe kwi-imeyile, kubandakanywa nalabo bathatha idatha ehambelanayo kweminye imithombo (API, ucingo). , iimetriki zewebhu, njl. njl.). Ndiza kukunika umzekelo. Umsebenzisi omtsha uvele kwinkqubo yeCRM, kwaye asikayazi malunga ne-UUID yakhe. Emva koko, xa uzama ukufumana idatha kwi-SIP telephony, siya kufumana iifowuni ezibotshelelwe kwi-UUID yayo, kodwa asiyi kukwazi ukugcina kunye nokuzisebenzisa ngokuchanekileyo. Kwimiba enjalo, kubalulekile ukugcina engqondweni ukuxhomekeka kwedatha, ngakumbi ukuba ivela kwimithombo eyahlukeneyo. Oku, ngokuqinisekileyo, amanyathelo aneleyo okugcina ingqibelelo yedatha, kodwa kwezinye iimeko ziyimfuneko. Ewe, kwaye ukuncama ukufumana izixhobo nako akukho ngqiqweni.
Ke, uluvo lwethu luya kuqalisa ii-vertices ezilandelayo zegrafu ukuba kukho ulwazi olutsha kwi-imeyile, kwaye iphawule ulwazi lwangaphambili njengento engabalulekanga.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Sifumana kwaye sisebenzise idatha
Ukufumana kunye nokusebenza kwedatha, ungabhala umqhubi owahlukileyo, ungasebenzisa esele zenziwe. Ukusukela ngoku ingqiqo iyinto encinci - ukuthatha idatha kwileta, umzekelo, ndiphakamisa umgangatho wePythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Ngendlela, ukuba i-imeyile yakho yenkampani ikwi-mail.ru, ngoko awuyi kukwazi ukukhangela iileta ngesihloko, umthumeli, njl. Emuva ngo-2016, bathembisa ukuzisa, kodwa kubonakala ukuba batshintshe iingqondo zabo. Ndiyisombulule le ngxaki ngokwenza ifolda eyahlukileyo yeeleta eziyimfuneko kunye nokumisela icebo lokucoca iileta eziyimfuneko kwi-mail interface yewebhu. Ngaloo ndlela, kuphela iileta eziyimfuneko kunye neemeko zokukhangela, kwimeko yam, ngokulula (UNSEEN) ukungena kule folda.
Ukushwankathela, sinokulandelelana okulandelayo: sijonga ukuba kukho iileta ezintsha ezihlangabezana neemeko, ukuba zikhona, ngoko sikhuphela i-archive usebenzisa ikhonkco kwileta yokugqibela.
Ngaphantsi kwamachaphaza okugqibela, ishiywe ukuba le ndawo yogcino iya kukhutshwa, idatha evela kwi-archive iya kucocwa kwaye iqhutywe, kwaye ngenxa yoko, yonke into iya kuqhubela phambili kwipayipi yenkqubo ye-ETL, kodwa oku sele kudlule. ububanzi benqaku. Ukuba ibonakale inomdla kwaye iluncedo, ngoko ndiya kuqhubeka ndichaza izisombululo ze-ETL kunye namalungu azo eApache Airflow.
umthombo: www.habr.com