Ge bith dè an ìre de theicneòlas a bhios a’ leasachadh, tha sreath de dhòighean-obrach seann-fhasanta an-còmhnaidh air cùl leasachadh. Dh’ fhaodadh seo a bhith mar thoradh air eadar-ghluasad rèidh, factaran daonna, feumalachdan teicneòlais, no rudeigin eile. Ann an raon giollachd dàta, is e stòran dàta an fheadhainn as nochdte sa phàirt seo. Ge bith dè an ìre gu bheil sinn a ’bruadar mu bhith a’ faighinn cuidhteas seo, ach gu ruige seo tha pàirt den dàta air a chuir ann an teachdairean sa bhad agus puist-d, gun luaidh air cruthan nas sine. Tha mi a 'toirt cuireadh dhut aon de na roghainnean airson Apache Airflow a thoirt às a chèile, a' sealltainn mar as urrainn dhut dàta a thoirt bho phuist-d.
ro-eachdraidheil
Tha tòrr dàta fhathast air a ghluasad tro phost-d, bho chonaltradh eadar-phearsanta gu ìrean eadar-obrachaidh eadar companaidhean. Tha e math ma tha e comasach eadar-aghaidh a sgrìobhadh airson dàta fhaighinn no daoine a chuir san oifis a chuireas a-steach am fiosrachadh seo gu stòran nas goireasaiche, ach gu tric is dòcha nach bi seo comasach. B’ e an obair shònraichte a bha romham a bhith a’ ceangal an t-siostam CRM cliùiteach ris an taigh-bathair dàta, agus an uairsin ris an t-siostam OLAP. Tha e mar sin a thachair gu h-eachdraidheil gun robh cleachdadh an t-siostam seo goireasach don chompanaidh againn ann an raon sònraichte de ghnìomhachas. Mar sin, bha a h-uile duine dha-rìribh ag iarraidh a bhith comasach air obrachadh le dàta bhon t-siostam treas-phàrtaidh seo cuideachd. An toiseach, gu dearbh, chaidh sgrùdadh a dhèanamh air comasachd dàta fhaighinn bho API fosgailte. Gu mì-fhortanach, cha robh an API a’ còmhdach a bhith a’ faighinn a h-uile dàta riatanach, agus, gu sìmplidh, bha e cam ann an iomadh dòigh, agus cha robh taic theicnigeach ag iarraidh no cha b’ urrainn dhaibh coinneachadh letheach slighe gus comas-gnìomh nas coileanta a thoirt seachad. Ach thug an siostam seo cothrom bho àm gu àm an dàta a bha a dhìth fhaighinn tron phost ann an cruth ceangal airson an tasglann a luchdachadh sìos.
Bu chòir a thoirt fa-near nach b’ e seo an aon chùis anns an robh an gnìomhachas airson dàta a chruinneachadh bho phuist-d no teachdairean sa bhad. Ach, anns a 'chùis seo, cha b' urrainn dhuinn buaidh a thoirt air companaidh treas-phàrtaidh a bheir seachad pàirt den dàta a-mhàin san dòigh seo.
sruth-adhair apache
Gus pròiseasan ETL a thogail, mar as trice bidh sinn a’ cleachdadh Apache Airflow. Gus am faigh leughadair nach eil eòlach air an teicneòlas seo tuigse nas fheàrr air mar a tha e a’ coimhead sa cho-theacsa agus san fharsaingeachd, bheir mi cunntas air fear tòiseachaidh no dhà.
Tha Apache Airflow na àrd-ùrlar an-asgaidh a thathas a’ cleachdadh gus pròiseasan ETL (Extract-Transform-Loading) a thogail, a chuir an gnìomh agus a sgrùdadh ann am Python. Is e am prìomh bhun-bheachd ann an Airflow graf acyclic stiùirichte, far a bheil vertices a’ ghraf nam pròiseasan sònraichte, agus is e oirean a’ ghraf an sruth smachd no fiosrachaidh. Faodaidh pròiseas dìreach gnìomh Python sam bith a ghairm, no faodaidh loidsig nas iom-fhillte a bhith aige bho bhith a’ gairm grunn ghnìomhan ann an co-theacsa clas. Airson na h-obraichean as trice, tha mòran leasachaidhean deiseil ann mu thràth a dh'fhaodar a chleachdadh mar phròiseasan. Am measg nan leasachaidhean sin tha:
- gnìomhaichean - airson dàta a ghluasad bho aon àite gu àite eile, mar eisimpleir, bho chlàr stòr-dàta gu taigh-bathair dàta;
- mothachaidhean - airson a bhith a 'feitheamh airson tachartas sònraichte agus a' stiùireadh an t-sruth smachd gu na h-earrainnean às dèidh sin den ghraf;
- dubhan - airson obraichean aig ìre nas ìsle, mar eisimpleir, gus dàta fhaighinn bho chlàr stòr-dàta (air a chleachdadh ann an aithrisean);
- agus mar sin air adhart.
Bhiodh e neo-iomchaidh cunntas mionaideach a thoirt air Apache Airflow san artaigil seo. Faodar ro-ràdh goirid fhaicinn
Hook airson dàta fhaighinn
An toiseach, gus an duilgheadas fhuasgladh, feumaidh sinn dubhan a sgrìobhadh leis am b’ urrainn dhuinn:
- ceangail ri post-d
- lorg an litir cheart
- dàta fhaighinn bhon litir.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Is e seo an loidsig: bidh sinn a’ ceangal, lorg an litir mu dheireadh as buntainniche, ma tha feadhainn eile ann, bidh sinn gan leigeil seachad. Tha an gnìomh seo air a chleachdadh, oir tha a h-uile dàta bho fheadhainn nas tràithe ann an litrichean nas fhaide air adhart. Mura h-eil seo fìor, faodaidh tu sreath de na litrichean uile a thilleadh, no a 'chiad fhear a phròiseasadh, agus an còrr air an ath bhealaich. San fharsaingeachd, tha a h-uile dad, mar a tha e an-còmhnaidh, an urra ris an obair.
Cuiridh sinn dà ghnìomh taiceil ris an dubhan: airson faidhle a luchdachadh sìos agus airson faidhle a luchdachadh sìos a’ cleachdadh ceangal bho phost-d. Air an t-slighe, faodaidh iad a bhith air an gluasad chun a 'ghnìomhaiche, tha e an crochadh air cho tric' sa bhith a 'cleachdadh an gnìomh seo. Bidh dè eile a chuireas ris an dubhan, a-rithist, an urra ris a ’ghnìomh: ma gheibhear faidhlichean sa bhad san litir, faodaidh tu ceanglachan a luchdachadh sìos don litir, ma gheibhear an dàta san litir, feumaidh tu an litir a pharsadh, etc. Anns a 'chùis agam, tha an litir a' tighinn le aon cheangal ris an tasglann, a dh'fheumas mi a chuir ann an àite sònraichte agus tòiseachadh air a 'phròiseas giollachd eile.
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Tha an còd sìmplidh, agus mar sin cha mhòr gu bheil feum air tuilleadh mìneachaidh. Innsidh mi dhut mun loidhne draoidheil imap_conn_id. Bidh Apache Airflow a’ stòradh paramadairean ceangail (logadh a-steach, facal-faire, seòladh, agus paramadairean eile) a gheibhear thuige le aithnichear sreang. Gu fradharcach, tha riaghladh ceangail a ’coimhead mar seo
Sensor feitheamh airson dàta
Leis gu bheil fios againn mu thràth mar a cheanglas agus a gheibh sinn dàta bhon phost, is urrainn dhuinn a-nis sensor a sgrìobhadh gus feitheamh riutha. Anns a ’chùis agam, cha do dh’ obraich e gus gnìomhaiche a sgrìobhadh sa bhad a làimhsicheas an dàta, ma tha gin ann, leis gu bheil pròiseasan eile ag obair stèidhichte air an dàta a gheibhear bhon phost, a ’toirt a-steach an fheadhainn a bheir dàta co-cheangailte bho stòran eile (API, fòn. , meatrach lìn, msaa) msaa). Bheir mi eisimpleir dhut. Tha cleachdaiche ùr air nochdadh san t-siostam CRM, agus chan eil fios againn fhathast mun UUID aige. An uairsin, nuair a dh'fheuchas sinn ri dàta fhaighinn bho fhòn SIP, gheibh sinn fiosan ceangailte ris an UUID aige, ach cha bhith e comasach dhuinn an sàbhaladh agus an cleachdadh gu ceart. Ann an leithid de chùisean, tha e cudromach cuimhneachadh air eisimeileachd an dàta, gu sònraichte ma tha iad bho dhiofar thùsan. Tha iad sin, gu dearbh, nan ceumannan gu leòr gus ionracas dàta a ghleidheadh, ach ann an cuid de chùisean tha iad riatanach. Tha, agus tha e mì-reusanta cuideachd a bhith a’ gabhail fois ann an goireasan.
Mar sin, cuiridh an sensor againn vertices às deidh sin den ghraf air bhog ma tha fiosrachadh ùr anns a’ phost, agus comharraichidh e cuideachd gu bheil am fiosrachadh a bh ’ann roimhe neo-iomchaidh.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Bidh sinn a’ faighinn agus a’ cleachdadh dàta
Gus dàta fhaighinn agus a phròiseasadh, faodaidh tu gnìomhaiche air leth a sgrìobhadh, faodaidh tu feadhainn deiseil a chleachdadh. Gu ruige seo tha an loidsig beag - gus dàta a thoirt bhon litir, mar eisimpleir, tha mi a 'moladh an PythonOperator àbhaisteach
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления
Air an t-slighe, ma tha am post corporra agad cuideachd air mail.ru, cha bhith e comasach dhut litrichean a lorg le cuspair, neach-cuiridh, msaa. Air ais ann an 2016, gheall iad a thoirt a-steach, ach a rèir coltais dh’ atharraich iad an inntinn. Dh ’fhuasgail mi an duilgheadas seo le bhith a’ cruthachadh pasgan air leth airson na litrichean riatanach agus a ’stèidheachadh sìoltachan airson na litrichean riatanach ann an eadar-aghaidh lìn a’ phuist. Mar sin, is e dìreach na litrichean agus na cumhaichean riatanach airson an sgrùdaidh, na mo chùis, dìreach (UNSEEN) a dhol a-steach don phasgan seo.
A 'toirt geàrr-chunntas, tha an t-sreath a leanas againn: bidh sinn a' sgrùdadh a bheil litrichean ùra ann a tha a 'coinneachadh ris na cumhaichean, ma tha, an uairsin bidh sinn a' luchdachadh sìos an tasglann a 'cleachdadh a' cheangal bhon litir mu dheireadh.
Fo na dotagan mu dheireadh, tha e air fhàgail a-mach gun tèid an tasglann seo a dhì-phapadh, thèid an dàta bhon tasglann a ghlanadh agus a phròiseasadh, agus mar thoradh air an sin, thèid an rud gu lèir nas fhaide gu loidhne-phìoban pròiseas ETL, ach tha seo mar-thà nas fhaide na farsaingeachd an artaigil. Ma thionndaidh e a-mach inntinneach agus feumail, leanaidh mi gu toilichte a’ toirt cunntas air fuasglaidhean ETL agus na pàirtean aca airson Apache Airflow.
Source: www.habr.com