Teknologia zenbat garatzen den edozein dela ere, ikuspegi zaharkituen kate bat beti dago garapenaren atzean. Trantsizio leun baten, giza faktoreen, behar teknologikoen edo beste zerbaiten ondorioz izan daiteke hori. Datuen tratamenduaren alorrean, datu-iturriak dira zati honetan adierazgarrienak. Hori kentzearekin zenbat amesten dugun, baina orain arte datuen zati bat berehalako mezulari eta posta elektronikoetan bidaltzen da, formatu arkaikoagoak ahaztu gabe. Apache Airflow-en aukeretako bat desmuntatzera gonbidatzen zaitut, mezu elektronikoetatik datuak nola hartu ditzakezun ilustratuz.
historiaurrea
Datu asko oraindik posta elektroniko bidez transferitzen dira, pertsonen arteko komunikazioetatik hasi eta enpresen arteko elkarrekintza estandaretaraino. Ona da datuak lortzeko interfaze bat idaztea edo informazio hori iturri erosoagoetan sartuko duten pertsonak bulegoan jartzea posible bada, baina askotan hori ez da posible izango. Aurre egin nuen zeregin zehatza CRM sistema ezaguna datu biltegira konektatzea izan zen, eta gero OLAP sistemara. Gertatu zen historikoki gure enpresarentzat sistema hau erabiltzea komenigarria zela negozio-eremu jakin batean. Hori dela eta, denek nahi zuten hirugarren sistema honetako datuekin ere funtzionatu ahal izateko. Lehenik eta behin, noski, API ireki batetik datuak lortzeko aukera aztertu zen. Zoritxarrez, APIak ez zituen beharrezko datu guztiak eskuratzea estaltzen, eta, termino sinpleetan, modu askotan okerra zegoen, eta laguntza teknikoak ez zuen erdibidean bete nahi edo ezin izan zuen funtzionalitate zabalagoa eskaintzeko. Baina sistema honek aukera ematen zuen aldian-aldian falta ziren datuak postaz jasotzeko artxiboa deskargatzeko esteka moduan.
Kontuan izan behar da ez dela negozioak posta elektroniko edo berehalako mezularien datuak bildu nahi izan dituen kasu bakarra. Hala ere, kasu honetan, ezin izan dugu datuen zati bat horrela bakarrik ematen duen hirugarren enpresa batean eragin.
apache aire-fluxua
ETL prozesuak eraikitzeko, gehienetan Apache Airflow erabiltzen dugu. Teknologia hau ezagutzen ez duen irakurle batek testuinguruan eta orokorrean duen itxura hobeto uler dezan, sarrerako pare bat deskribatuko ditut.
Apache Airflow Python-en ETL (Extract-Transform-Loading) prozesuak eraikitzeko, exekutatzeko eta monitorizatzeko erabiltzen den doako plataforma bat da. Airflow-en kontzeptu nagusia grafiko azikliko zuzendua da, non grafikoaren erpinak prozesu zehatzak diren, eta grafikoaren ertzak kontrol edo informazio-fluxua diren. Prozesu batek Python-en edozein funtzio dei dezake, edo logika konplexuagoa izan dezake klase baten testuinguruan hainbat funtzio sekuentzialki deitzean. Eragiketarik ohikoenetarako, dagoeneko prozesu gisa erabil daitezkeen prest egindako garapen asko daude. Halako garapenak honako hauek dira:
- operadoreak - datuak leku batetik bestera transferitzeko, adibidez, datu-baseko taula batetik datu biltegira;
- sentsoreak - gertaera jakin bat gertatuko denaren zain eta kontrol-fluxua grafikoaren ondorengo erpinetara bideratzeko;
- kakoak - behe-mailako eragiketetarako, adibidez, datu-baseko taula bateko datuak lortzeko (adierazpenetan erabiltzen dena);
- eta abar.
Desegokia litzateke artikulu honetan Apache Airflow zehatz-mehatz deskribatzea. Sarrera laburrak ikus daitezke
Datuak lortzeko kakoa
Lehenik eta behin, arazoa konpontzeko, kako bat idatzi behar dugu, eta horrekin:
- konektatu posta elektronikora
- aurkitu letra egokia
- gutunetik datuak jaso.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Logika hau da: konektatu egiten dugu, azken letrarik garrantzitsuena aurkitzen dugu, beste batzuk badaude, alde batera uzten ditugu. Funtzio hau erabiltzen da, ondorengo letrek aurrekoen datu guztiak baitituzte. Horrela ez bada, letra guztien array bat itzuli dezakezu edo lehenengoa prozesatu eta gainerakoak hurrengo pasean. Oro har, dena, beti bezala, zereginaren araberakoa da.
Bi funtzio osagarri gehitzen dizkiogu kakoari: fitxategi bat deskargatzeko eta fitxategi bat deskargatzeko e-mail bateko esteka erabiliz. Bide batez, operadorearengana eraman daitezke, funtzionalitate hau erabiltzeko maiztasunaren araberakoa da. Zer gehiago gehitu amuari, berriro ere, zereginaren araberakoa da: fitxategiak gutunean berehala jasotzen badira, gutunaren eranskinak deskargatu ditzakezu, datuak gutunean jasotzen badira, gutuna analizatu behar duzu, etab. Nire kasuan, gutuna artxiborako esteka batekin dator, leku jakin batean jarri eta prozesatzeko prozesuari ekin behar diot.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Kodea sinplea da, beraz, ez du ia azalpen gehiago behar. Imap_conn_id lerro magikoaren berri emango dizut. Apache Airflow-ek konexio-parametroak gordetzen ditu (saioa, pasahitza, helbidea eta beste parametro batzuk) kate-identifikatzaile baten bidez atzi daitezkeen. Ikusmenean, konexioen kudeaketak itxura hau du
Datuen zain egoteko sentsorea
Dagoeneko postatik datuak konektatu eta jasotzen dakigunez, orain sentsore bat idatz dezakegu haien zain egoteko. Nire kasuan, ez du funtzionatu datuak prozesatuko dituen operadore bat idaztea berehala, halakorik balego, beste prozesu batzuek postatik jasotako datuetan oinarrituta funtzionatzen dutelako, beste iturri batzuetatik erlazionatutako datuak hartzen dituztenak barne (APIa, telefonia). , web-neurriak, etab.) etab.). Adibide bat jarriko dizut. Erabiltzaile berri bat agertu da CRM sisteman, eta oraindik ez dakigu bere UUIDaren berri. Gero, SIP telefoniatik datuak jasotzen saiatzean, bere UUIDari lotuta deiak jasoko ditugu, baina ezin izango ditugu behar bezala gorde eta erabili. Horrelako gaietan, kontuan izan behar da datuen menpekotasuna, batez ere iturri ezberdinetakoak badira. Horiek, noski, datuen osotasuna zaintzeko neurri nahikoak ez dira, baina kasu batzuetan beharrezkoak dira. Bai, eta baliabideak okupatzeko alfer-jartzea ere irrazionala da.
Horrela, gure sentsoreak grafikoaren ondorengo erpinak abiaraziko ditu posta elektronikoan informazio freskoa badago, eta aurreko informazioa garrantzirik gabekotzat ere markatuko du.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Datuak jasotzen eta erabiltzen ditugu
Datuak jaso eta prozesatzeko, beste operadore bat idatz dezakezu, prest egindakoak erabil ditzakezu. Logika oraindik hutsala denez - gutunetik datuak hartzeko, adibidez, PythonOperator estandarra proposatzen dut
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Bide batez, zure posta korporatiboa mail.ru-n ere badago, orduan ezin izango dituzu gutunak bilatu gaiaren, bidaltzailearen, etab. 2016an, aurkeztuko zutela agindu zuten, baina itxuraz iritziz aldatu zuten. Arazo hau konpondu nuen beharrezkoak diren letren karpeta bereizi bat sortuz eta beharrezko letren iragazki bat ezarriz posta web interfazean. Horrela, bilaketarako beharrezkoak diren letrak eta baldintzak bakarrik, nire kasuan, besterik gabe (IKUSITA) karpeta honetan sartzen dira.
Laburbilduz, ondoko sekuentzia dugu: baldintzak betetzen dituzten letra berririk ba ote dagoen egiaztatzen dugu, badaude, gero artxiboa deskargatuko dugu azken letrako esteka erabiliz.
Azken puntuen azpian, artxibo hau deskonprimituko dela, artxiboko datuak garbitu eta prozesatu egingo dira eta, ondorioz, guztia ETL prozesuaren kanalizaziora joango da, baina hori dagoeneko haratago dago. artikuluaren esparrua. Interesgarria eta erabilgarria izan balitz, pozik jarraituko dut Apache Airflow-erako ETL soluzioak eta haien zatiak deskribatzen.
Iturria: www.habr.com