Bisan unsa pa kadaghan ang teknolohiya nga naugmad, ang usa ka hugpong sa mga karaan nga mga pamaagi kanunay nga nagsunod sa pag-uswag. Mahimong tungod kini sa usa ka hapsay nga pagbalhin, mga hinungdan sa tawo, mga panginahanglanon sa teknolohiya, o uban pa. Sa natad sa pagproseso sa datos, ang mga tinubdan sa datos mao ang labing nagpadayag niini nga bahin. Dili igsapayan kung unsa ka dako ang among damgo nga makuha kini, apan hangtod karon ang bahin sa datos gipadala sa mga instant messenger ug email, walaβy labot ang labi ka karaan nga mga format. Gidapit ko ikaw sa pag-disassemble sa usa sa mga kapilian alang sa Apache Airflow, nga naghulagway kung giunsa nimo pagkuha ang datos gikan sa mga email.
sa naunang kasaysayan
Daghang mga datos ang gibalhin gihapon pinaagi sa e-mail, gikan sa interpersonal nga komunikasyon hangtod sa mga sumbanan sa interaksyon tali sa mga kompanya. Maayo kung posible nga magsulat usa ka interface aron makakuha mga datos o ibutang ang mga tawo sa opisina nga mosulod niini nga kasayuran sa labi ka kombenyente nga mga gigikanan, apan sa kasagaran dili kini mahimo. Ang piho nga buluhaton nga akong giatubang mao ang pagkonektar sa bantog nga CRM nga sistema sa bodega sa datos, ug dayon sa sistema sa OLAP. Nahitabo kini sa kasaysayan nga alang sa among kompanya ang paggamit niini nga sistema kombenyente sa usa ka partikular nga lugar sa negosyo. Busa, ang tanan gusto gayud nga makahimo sa pag-operate uban sa data gikan niini nga ikatulo nga-partido nga sistema usab. Una sa tanan, siyempre, ang posibilidad sa pagkuha sa datos gikan sa usa ka bukas nga API gitun-an. Ikasubo, ang API wala maglakip sa pagkuha sa tanan nga gikinahanglan nga datos, ug, sa yano nga mga termino, kini sa daghang mga paagi hiwi, ug ang teknikal nga suporta dili gusto o dili makatagbo sa tunga-tunga sa paghatag og mas komprehensibo nga gamit. Apan kini nga sistema naghatag higayon nga makadawat matag karon ug unya sa nawala nga datos pinaagi sa koreo sa porma sa usa ka link alang sa pagdiskarga sa archive.
Kinahanglan nga hinumdoman nga dili lamang kini ang kaso diin gusto sa negosyo nga mangolekta mga datos gikan sa mga email o instant messenger. Bisan pa, sa kini nga kaso, dili kami makaimpluwensya sa usa ka kompanya sa ikatulo nga partido nga naghatag bahin sa datos lamang sa kini nga paagi.
apache airflow
Aron matukod ang mga proseso sa ETL, kanunay namong gigamit ang Apache Airflow. Aron ang usa ka magbabasa nga dili pamilyar sa kini nga teknolohiya mas masabtan kung unsa ang hitsura niini sa konteksto ug sa kinatibuk-an, akong ihulagway ang usa ka pares nga pasiuna.
Ang Apache Airflow usa ka libre nga plataporma nga gigamit sa pagtukod, pagpatuman ug pagmonitor sa mga proseso sa ETL (Extract-Transform-Loading) sa Python. Ang nag-unang konsepto sa Airflow usa ka direkta nga acyclic graph, diin ang mga vertices sa graph mga piho nga proseso, ug ang mga kilid sa graph mao ang dagan sa kontrol o impormasyon. Ang usa ka proseso mahimong yano nga motawag sa bisan unsang Python function, o kini adunay mas komplikado nga lohika gikan sa sunod-sunod nga pagtawag sa daghang mga function sa konteksto sa usa ka klase. Alang sa labing kanunay nga mga operasyon, adunay daghan na nga andam nga mga pag-uswag nga magamit ingon mga proseso. Ang maong mga kalamboan naglakip sa:
- operators - alang sa pagbalhin sa data gikan sa usa ka dapit ngadto sa lain, alang sa panig-ingnan, gikan sa usa ka database lamesa ngadto sa usa ka data bodega;
- mga sensor - alang sa paghulat sa panghitabo sa usa ka piho nga panghitabo ug pagdirekta sa dagan sa kontrol sa sunod nga mga vertices sa graph;
- mga kaw-it - alang sa ubos nga lebel nga mga operasyon, pananglitan, aron makakuha og datos gikan sa lamesa sa database (gigamit sa mga pahayag);
- ug uban pa.
Dili angay nga ihulagway ang Apache Airflow sa detalye niini nga artikulo. Ang mubu nga mga pasiuna mahimong tan-awon
Hook alang sa pagkuha sa datos
Una sa tanan, aron masulbad ang problema, kinahanglan namon nga magsulat usa ka kaw-it diin mahimo namon:
- sumpay sa email
- pangitaa ang husto nga letra
- makadawat og datos gikan sa sulat.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Ang lohika mao kini: nagkonektar kami, pangitaa ang katapusan nga labing may kalabutan nga letra, kung adunay uban, wala namon sila gibalewala. Gigamit kini nga function, tungod kay ang ulahi nga mga letra naglangkob sa tanan nga datos sa mga nauna. Kung dili kini ang kahimtang, mahimo nimong ibalik ang usa ka han-ay sa tanan nga mga letra, o iproseso ang una, ug ang nahabilin sa sunod nga pass. Sa kinatibuk-an, ang tanan, sama sa kanunay, nagdepende sa buluhaton.
Nagdugang kami og duha ka auxiliary function sa hook: alang sa pag-download sa usa ka file ug alang sa pag-download sa usa ka file gamit ang usa ka link gikan sa usa ka email. Pinaagi sa dalan, mahimo silang ibalhin sa operator, nagdepende kini sa kadaghan sa paggamit niini nga gamit. Unsa pa ang idugang sa kaw-it, pag-usab, nagdepende sa buluhaton: kung ang mga file madawat dayon sa sulat, nan mahimo nimong i-download ang mga attachment sa sulat, kung ang datos nadawat sa sulat, nan kinahanglan nimo nga i-parse ang sulat, ug uban pa. Sa akong kaso, ang sulat adunay usa ka link sa archive, nga kinahanglan nako ibutang sa usa ka lugar ug magsugod sa dugang nga proseso sa pagproseso.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Simple ra ang code, mao nga halos wala na kini magkinahanglan ug dugang nga katin-awan. Sultihan ko lang ka bahin sa magic line nga imap_conn_id. Ang Apache Airflow nagtipig sa mga parameter sa koneksyon (login, password, address, ug uban pang mga parameter) nga mahimong ma-access pinaagi sa usa ka string identifier. Sa tan-aw, ang pagdumala sa koneksyon ingon niini
Sensor nga maghulat alang sa datos
Tungod kay nahibal-an na namon kung giunsa ang pagkonektar ug pagdawat mga datos gikan sa mail, mahimo na namon magsulat usa ka sensor aron maghulat alang kanila. Sa akong kaso, wala kini nagtrabaho sa pagsulat dayon sa usa ka operator nga magproseso sa datos, kung naa man, tungod kay ang ubang mga proseso nagtrabaho base sa datos nga nadawat gikan sa mail, lakip ang mga nagkuha sa mga may kalabutan nga datos gikan sa ubang mga gigikanan (API, telephony , web metrics, etc.). etc.). Hatagan ko ikaw usa ka pananglitan. Usa ka bag-ong user ang mitungha sa CRM system, ug wala gihapon mi kahibalo bahin sa iyang UUID. Unya, sa dihang mosulay sa pagdawat ug data gikan sa SIP telephony, makadawat kami ug mga tawag nga nahigot sa UUID niini, apan dili namo kini matipigan ug magamit sa husto. Sa ingon nga mga butang, hinungdanon nga hinumdoman ang pagsalig sa datos, labi na kung kini gikan sa lainlaing mga gigikanan. Kini, siyempre, dili igo nga mga lakang aron mapreserbar ang integridad sa datos, apan sa pipila ka mga kaso kini gikinahanglan. Oo, ug ang pag-undang sa pag-okupar sa mga kahinguhaan dili usab makatarunganon.
Sa ingon, ang among sensor maglansad sa sunod nga mga vertices sa graph kung adunay bag-ong kasayuran sa mail, ug markahan usab ang miaging kasayuran nga walaβy kalabotan.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Kami nakadawat ug naggamit sa datos
Aron makadawat ug pagproseso sa datos, mahimo ka magsulat sa usa ka bulag nga operator, mahimo nimong gamiton ang mga andam na. Tungod kay hangtod karon ang lohika walaβy hinungdan - aron makuha ang datos gikan sa sulat, pananglitan, gisugyot nako ang sukaranan nga PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Pinaagi sa dalan, kung ang imong corporate mail naa usab sa mail.ru, nan dili ka makapangita sa mga sulat pinaagi sa hilisgutan, nagpadala, ug uban pa. Balik sa 2016, sila misaad sa pagpaila niini, apan dayag nga nausab ang ilang mga hunahuna. Gisulbad nako kini nga problema pinaagi sa paghimo og usa ka separado nga folder alang sa gikinahanglan nga mga letra ug pag-set up og filter alang sa gikinahanglan nga mga letra sa mail web interface. Busa, ang gikinahanglan nga mga letra ug mga kondisyon alang sa pagpangita, sa akong kaso, yano (UNSEEN) nga makasulod niini nga folder.
Sa pag-summarize, kami adunay mosunod nga han-ay: among gisusi kung adunay bag-ong mga letra nga nagtagbo sa mga kondisyon, kung adunay, unya among i-download ang archive gamit ang link gikan sa katapusang sulat.
Ubos sa katapusan nga mga tuldok, wala'y mahimo nga kini nga archive ma-unpack, ang datos gikan sa archive malimpyohan ug maproseso, ug isip resulta, ang tibuok nga butang moadto sa pipeline sa proseso sa ETL, apan kini labaw pa. ang gilapdon sa artikulo. Kung kini nahimo nga makapaikag ug mapuslanon, nan malipayon kong magpadayon sa paghulagway sa mga solusyon sa ETL ug sa ilang mga bahin alang sa Apache Airflow.
Source: www.habr.com