Ziribe kanthu kuchuluka kwa ukadaulo ukukula, njira zingapo zakale zimatsata chitukuko. Izi zitha kukhala chifukwa cha kusintha kosavuta, zinthu zaumunthu, zosowa zaukadaulo, kapena china. Pankhani yokonza deta, magwero a deta ndi omwe amawulula kwambiri gawoli. Ziribe kanthu kuti timalota bwanji kuti tichotse izi, koma mpaka pano gawo la deta limatumizidwa mwamsanga amithenga ndi maimelo, osatchula mitundu yambiri yakale. Ndikukupemphani kuti muwone imodzi mwazosankha za Apache Airflow, zomwe zikuwonetsa momwe mungasonkhanitsire deta kuchokera pamaimelo.
prehistory
Zambiri zimatumizidwabe kudzera pa imelo, kuchokera pakulankhulana pakati pa anthu kupita kumiyezo yolumikizana pakati pamakampani. Ndibwino ngati mutha kulemba mawonekedwe kuti mulandire deta kapena kuyika anthu muofesi omwe angalowetse izi m'malo osavuta, koma nthawi zambiri izi sizingakhalepo. Ntchito yeniyeni yomwe ndinakumana nayo inali yolumikiza dongosolo lodziwika bwino la CRM ku malo osungiramo deta, ndiyeno ku dongosolo la OLAP. M'mbiri, kwa kampani yathu, kugwiritsa ntchito dongosololi kunali kosavuta m'dera linalake la bizinesi. Choncho, aliyense ankafuna kuti athe kugwira ntchito ndi deta kuchokera dongosolo lachitatu chipani komanso. Choyamba, ndithudi, kuthekera kopeza deta kuchokera ku API yotseguka kunafufuzidwa. Tsoka ilo, API sinaphimbe kupeza zonse zofunika, ndipo, mwachidule, inali yokhotakhota m'njira zambiri, ndipo chithandizo chaukadaulo sichinkafuna kapena sichinathe kukumana ndi theka kuti chipereke magwiridwe antchito ambiri. Koma kachitidwe kameneka kamapereka mwayi wolandila deta yosowa nthawi ndi nthawi kudzera pa imelo ngati ulalo wotsitsa zosungidwazo.
Tiyenera kukumbukira kuti izi sizinali zokhazokha zomwe bizinesiyo inkafuna kusonkhanitsa deta kuchokera ku maimelo kapena amithenga apompopompo. Komabe, pamenepa, sitingathe kukopa kampani yachitatu yomwe imapereka gawo la deta motere.
Apache Airflow
Kupanga njira za ETL, nthawi zambiri timagwiritsa ntchito Apache Airflow. Kuti wowerenga yemwe sadziwa bwino lusoli kuti amvetse bwino momwe amawonekera m'nkhaniyo komanso mwachizoloΕ΅ezi, ndikufotokozera zingapo zoyambirira.
Apache Airflow ndi nsanja yaulere yomwe imagwiritsidwa ntchito pomanga, kuyendetsa, ndikuwunika njira za ETL (Extract-Transform-Loading) mu Python. Lingaliro lalikulu mu Airflow ndi graph acyclic graph, pomwe ma vertices a graph ndi njira zenizeni, ndipo m'mphepete mwa graph ndikuyenda kwa ulamuliro kapena chidziwitso. Njira imatha kutchula ntchito iliyonse ya Python, kapena ikhoza kukhala ndi malingaliro ovuta kwambiri oyitanitsa ntchito zingapo motsatira kalasi. Pazochita zodziwika bwino, pali zambiri zomwe zapangidwa kale zomwe zingagwiritsidwe ntchito ngati njira. Zoterezi zikuphatikizapo:
- ogwira ntchito - kusamutsa deta kuchokera kumalo ena kupita kumalo ena, mwachitsanzo, kuchokera pa tebulo la database kupita kumalo osungiramo deta;
- masensa - podikirira kuchitika kwa chochitika china ndikuwongolera kuwongolera kumayendedwe otsatira a graph;
- mbedza - zogwirira ntchito zapansi, mwachitsanzo, kupeza deta kuchokera pa tebulo la database (logwiritsidwa ntchito m'mawu);
- ndi zina zotero.
Zingakhale zosayenera kufotokozera Apache Airflow mwatsatanetsatane m'nkhaniyi. Mau oyamba achidule atha kuwonedwa
Hook kuti mupeze deta
Choyamba, kuti tithane ndi vutoli tiyenera kulemba mbedza zomwe tingathe:
- kulumikizana ndi imelo
- pezani kalata yomwe mukufuna;
- kulandira deta kuchokera kalata.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Lingaliro ndiloti: timagwirizanitsa, kupeza kalata yomaliza yofunikira, ngati pali ena, timawanyalanyaza. Ntchitoyi imagwiritsidwa ntchito, chifukwa zilembo zam'tsogolo zimakhala ndi zonse zakale. Ngati sizili choncho, ndiye kuti mutha kubweza zilembo zingapo, kapena kukonza yoyamba, ndi ena onse pachiphaso chotsatira. Kawirikawiri, zonse, monga nthawi zonse, zimadalira ntchitoyo.
Timawonjezera ntchito ziwiri zothandizira pa mbedza: kutsitsa fayilo komanso kutsitsa fayilo pogwiritsa ntchito ulalo wa imelo. Mwa njira, amatha kusunthidwa kwa wogwiritsa ntchito, zimatengera pafupipafupi kugwiritsa ntchito ntchitoyi. Ndi chiyani chinanso chowonjezera ku mbedza, kachiwiri, zimadalira ntchitoyo: ngati mafayilo alandiridwa mwamsanga m'kalatayo, ndiye kuti mukhoza kukopera zolembera ku kalatayo, ngati deta yalandiridwa mu kalatayo, ndiye kuti muyenera kuyika kalatayo, ndi zina. Kwa ine, kalatayo imabwera ndi ulalo umodzi ku zosungirako, zomwe ndiyenera kuziyika pamalo ena ndikuyamba kukonzanso.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Khodiyo ndi yosavuta, choncho sifunikanso kufotokozera. Ndingokuuzani zamatsenga mzere imap_conn_id. Apache Airflow imasunga magawo olumikizirana (lolowera, mawu achinsinsi, adilesi, ndi magawo ena) omwe angapezeke ndi chozindikiritsa zingwe. Zowoneka, kasamalidwe ka kulumikizana kumawoneka chonchi
Sensor kudikirira deta
Popeza tikudziwa kale momwe tingagwirizanitse ndi kulandira deta kuchokera ku makalata, tsopano tikhoza kulemba sensa kuti tidikire. Kwa ine, kulembera nthawi yomweyo wogwiritsa ntchito yemwe adzakonza deta, ngati alipo, sizinagwire ntchito, popeza njira zina zimagwira ntchito potengera zomwe zalandilidwa kuchokera ku makalata, kuphatikizapo zomwe zimatengera deta yokhudzana ndi zina (API, telephony, metrics web), etc.) etc.). Ndikupatsani chitsanzo. Wogwiritsa ntchito watsopano wawonekera mu CRM system, ndipo sitikudziwabe za UUID yake. Kenako, tikayesa kulandira deta kuchokera ku telefoni ya SIP, tidzalandira mafoni olumikizidwa ku UUID yake, koma sitingathe kupulumutsa ndikuzigwiritsa ntchito moyenera. M'mafunso oterowo, ndikofunikira kukumbukira kudalira kwa data, makamaka ngati akuchokera kuzinthu zosiyanasiyana. Izi, ndithudi, njira zosakwanira zosungira kukhulupirika kwa deta, koma nthawi zina ndizofunikira. Inde, komanso kusasamala kuti mukhale ndi chuma kulinso kopanda nzeru.
Chifukwa chake, sensa yathu idzayambitsa ma vertices otsatirawa a graph ngati pali chidziwitso chatsopano mu imelo, ndikuyikanso zomwe zapita kale ngati sizofunikira.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Timalandira ndi kugwiritsa ntchito deta
Kuti mulandire ndi kukonza deta, mukhoza kulemba woyendetsa osiyana, mungagwiritse ntchito okonzeka. Popeza mpaka pano malingaliro ake ndi ang'onoang'ono - kutenga deta kuchokera ku kalatayo, mwachitsanzo, ndikupangira PythonOperator wamba.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Mwa njira, ngati makalata anu akampani alinso pa mail.ru, ndiye kuti simungathe kusaka makalata ndi mutu, wotumiza, ndi zina. Kubwerera ku 2016, adalonjeza kuti adzayambitsa, koma mwachiwonekere adasintha malingaliro awo. Ndinathetsa vutoli popanga chikwatu chosiyana cha zilembo zofunika ndikukhazikitsa fyuluta ya zilembo zofunika pa intaneti yamakalata. Chifukwa chake, zilembo ndi zofunikira zokha zakusaka, kwa ine, mophweka (ZOSAWONEKA) zimalowa mufodayi.
Mwachidule, tili ndi mndandanda wotsatirawu: timayang'ana ngati pali zilembo zatsopano zomwe zikugwirizana ndi zikhalidwe, ngati zilipo, ndiye kuti timatsitsa zolembazo pogwiritsa ntchito ulalo wa chilembo chomaliza.
Pansi pa madontho otsiriza, sikunasiyidwe kuti zosungidwa izi zidzatsegulidwa, deta yochokera kumalo osungirako idzachotsedwa ndi kukonzedwa, ndipo chifukwa chake, chinthu chonsecho chidzapita patsogolo pa ndondomeko ya ndondomeko ya ETL, koma izi zadutsa kale. kukula kwa nkhaniyo. Zikadakhala zosangalatsa komanso zothandiza, ndiye kuti ndipitiliza kufotokoza mayankho a ETL ndi magawo awo a Apache Airflow.
Source: www.habr.com