Hazvina mhosva kuti tekinoroji inokura sei, tambo yemaitiro echinyakare inogara ichitevera kumashure kwebudiriro. Izvi zvinogona kunge zviri nekuda kwekuchinja kwakanaka, zvinhu zvevanhu, zvinodiwa zvetekinoroji, kana chimwewo chinhu. Mumunda wekugadzirisa data, masosi edata ndiwo anonyanya kuburitsa muchikamu ichi. Hazvina mhosva kuti tinorota zvakadini kubvisa izvi, asi kusvika ikozvino chikamu che data chinotumirwa muvatumwa pakarepo uye maemail, tisingatauri mamwe mafomati ekare. Ini ndinokukoka iwe kuti uparadzanise imwe yesarudzo dzeApache Airflow, ichiratidza matorero aungaita data kubva kumaemail.
prehistory
Yakawanda yedata ichiri kutamiswa kuburikidza ne-e-mail, kubva pakukurukurirana kwevanhu kuenda kumazinga ekudyidzana pakati pemakambani. Zvakanaka kana zvichikwanisika kunyora interface kuti uwane data kana kuisa vanhu muhofisi vanozoisa ruzivo urwu mune mamwe masosi ari nyore, asi kazhinji izvi zvinogona kunge zvisingaite. Basa chairo randakatarisana naro raive rekubatanidza iyo ine mukurumbira weCRM system kudura re data, uyezve kune iyo OLAP system. Zvakaitika munhoroondo kuti kukambani yedu kushandiswa kwegadziriro iyi kwaive nyore mune imwe nzvimbo yebhizinesi. Naizvozvo, munhu wese aida chaizvo kukwanisa kushanda nedata kubva kune ino yechitatu-bato system zvakare. Chekutanga pane zvese, chokwadi, mukana wekuwana data kubva kune yakavhurika API yakadzidzwa. Nehurombo, iyo API haina kuvhara kuwana ese anodiwa data, uye, mumashoko akareruka, yaive munzira dzakawanda yakatsveyama, uye tsigiro yehunyanzvi yaisada kana kusagona kusangana nepakati kuti ipe mashandiro akazara. Asi iyi sisitimu yakapa mukana wekugashira data rakarasika netsamba nenzira yekubatanidza yekuburitsa dura.
Izvo zvinofanirwa kucherechedzwa kuti iyi yanga isiri iyo chete nyaya iyo bhizinesi yaida kuunganidza data kubva kumaemail kana pakarepo vatumwa. Nekudaro, mune iyi kesi, isu hatina kukwanisa kufurira kambani yechitatu-bato inopa chikamu che data chete nenzira iyi.
apache airflow
Kuvaka ETL maitiro, isu tinowanzo shandisa Apache Airflow. Kuti muverengi asina kujaira tekinoroji iyi anzwisise zviri nani kuti inotaridzika sei mumamiriro ezvinhu uye kazhinji, ini ndichatsanangura akati wandei ekutanga.
Apache Airflow ipuratifomu yemahara iyo inoshandiswa kuvaka, kuita uye kuongorora ETL (Extract-Transform-Loading) maitiro muPython. Pfungwa huru muAirflow inotungamirwa acyclic girafu, uko ma vertices egirafu ari chaiwo maitiro, uye mipendero yegirafu kuyerera kwekutonga kana ruzivo. Chiitiko chinogona kungodaidza chero basa rePython, kana rinogona kuve nehana yakaomesesa kubva pakudaidza akati wandei mabasa mumamiriro ekirasi. Kune anowanzo mashandiro, kwatove nezvakawanda zvakagadzirirwa-zvakagadzirwa izvo zvinogona kushandiswa semaitiro. Zviitiko zvakadaro zvinosanganisira:
- vashandisi - yekuendesa data kubva kune imwe nzvimbo kuenda kune imwe, semuenzaniso, kubva patafura yedatabase kuenda kudura re data;
- sensors - yekumirira kuitika kwechimwe chiitiko uye kutungamirira kuyerera kwekutonga kune vertices inotevera yegirafu;
- zvikorekedzo - zvepazasi-level mashandiro, semuenzaniso, kuwana data kubva kune dhatabhesi tafura (inoshandiswa muzvirevo);
- uye zvakadaro.
Zvingave zvisina kukodzera kutsanangura Apache Airflow zvakadzama muchinyorwa ichi. Sumo pfupi dzinogona kutariswa
Hook yekutora data
Chokutanga pane zvose, kugadzirisa dambudziko, tinofanira kunyora chirauro chatinogona nacho:
- batanidza kune email
- tsvaga tsamba chaiyo
- gamuchira data kubva mutsamba.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Mufungo ndeuyu: tinobatanidza, tsvaga iyo yekupedzisira inonyanya kukosha tsamba, kana paine vamwe, isu tinovafuratira. Iri basa rinoshandiswa, nekuti gare gare mavara ane data rese rekutanga. Kana zvisiri izvo, saka unogona kudzosa rondedzero yemavara ese, kana kugadzirisa yekutanga, uye mamwe pane inotevera pass. Kazhinji, zvinhu zvose, senguva dzose, zvinoenderana nebasa racho.
Isu tinowedzera maviri anobatsira mabasa kune hoko: yekurodha faira uye yekurodha faira uchishandisa chinongedzo kubva kune email. Nenzira, ivo vanogona kutamirwa kune opareta, zvinoenderana nehuwandu hwekushandisa basa iri. Chii chimwe chekuwedzera kune hoko, zvakare, zvinoenderana nebasa: kana mafaera akagamuchirwa nekukurumidza mutsamba, iwe unogona kudhawunirodha zvakabatanidzwa kune tsamba, kana iyo data yakagamuchirwa mutsamba, saka iwe unofanirwa kuparura tsamba, etc. Mune mhaka yangu, iyo tsamba inouya neine imwe link kune archive, iyo yandinofanira kuisa mune imwe nzvimbo uye kutanga imwezve kugadzirisa maitiro.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Iyo kodhi iri nyore, saka haitomboda imwe tsananguro. Ini ndichangokuudza nezve mutsara wemashiripiti imap_conn_id. Apache Airflow inochengetedza maparamendi ekubatanidza (login, password, kero, uye mamwe ma paramita) anogona kuwanikwa netambo identifier. Zvinotaridzika, manejimendi yekubatanidza inotaridzika seizvi
Sensor yekumirira data
Sezvo isu tatoziva nzira yekubatanidza uye kugamuchira data kubva kune tsamba, isu tinogona ikozvino kunyora sensor kuvamirira. Mune yangu, hazvina kushanda kunyora opareta ipapo ipapo anozogadzirisa iyo data, kana iripo, nekuti mamwe maitiro anoshanda zvichienderana nedata rakagamuchirwa kubva kune iyo mail, kusanganisira iyo inotora inoenderana data kubva kune mamwe masosi (API, telephony. , web metrics, nezvimwewo). etc.). Ndichakupa muenzaniso. Mushandisi mutsva akaonekwa muCRM system, uye isu hatisati taziva nezve UUID yake. Zvadaro, pakuedza kugamuchira data kubva kuSIP telephony, tichagamuchira mafoni akasungirirwa kuUUID yayo, asi isu hatizokwanisi kuchengetedza nekuvashandisa nemazvo. Munyaya dzakadai, zvakakosha kuchengeta mupfungwa kutsamira kweiyo data, kunyanya kana ichibva kwakasiyana. Izvi, zvechokwadi, matanho asina kukwana kuchengetedza kutendeseka kwedata, asi mune zvimwe zviitiko zvakakosha. Hongu, uye kuregera kutora zviwanikwa hakunawo musoro.
Saka, sensor yedu inovhura inotevera vertices yegirafu kana paine ruzivo rutsva mutsamba, uye nyorawo ruzivo rwekare serusina basa.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Isu tinogamuchira uye tinoshandisa data
Kuti ugamuchire uye kugadzirisa data, unogona kunyora akasiyana opareta, unogona kushandisa akagadzirira-akagadzirwa. Sezvo kusvika parizvino pfungwa yacho idiki - kutora data kubva mutsamba, semuenzaniso, ini ndinokurudzira iyo yakajairwa PythonOperator.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Nenzira, kana tsamba yako yekambani iriwo pa mail.ru, saka haugone kutsvaga mabhii nenyaya, mutumi, nezvimwe. Kudzoka muna 2016, vakavimbisa kuisuma, asi sezviri pachena vakachinja pfungwa dzavo. Ndakagadzirisa dambudziko iri nekugadzira dhairekitori rakaparadzana remavara anodiwa uye kumisikidza sefa yemavara anodiwa mune iyo tsamba yewebhu interface. Saka, chete mavara anodiwa uye mamiriro ekutsvaga, mune yangu, zviri nyore (UNSEEN) anopinda mune iyi folda.
Kupfupisa, isu tine kutevedzana kunotevera: tinotarisa kana kune mavara matsva anosangana nemamiriro ezvinhu, kana aripo, saka tinodhawunirodha archive tichishandisa chinongedzo kubva mutsamba yekupedzisira.
Pasi pemadonhwe ekupedzisira, inosiiwa kuti ino chengetedzo ichaburitswa, data kubva mudura richacheneswa nekugadziriswa, uye nekudaro, chinhu chose chichaenda mberi kune pombi yeETL maitiro, asi izvi zvatopfuura. hukuru hwechinyorwa. Kana zvikazonakidza uye zvichibatsira, saka ini ndichaenderera mberi ndichitsanangura ETL mhinduro uye zvikamu zvadzo zveApache Airflow.
Source: www.habr.com