Hindi mahalaga kung gaano kalaki ang pag-unlad ng teknolohiya, ang isang string ng mga hindi napapanahong diskarte ay palaging nauuwi sa pag-unlad. Ito ay maaaring dahil sa isang maayos na paglipat, mga kadahilanan ng tao, mga teknolohikal na pangangailangan, o iba pa. Sa larangan ng pagpoproseso ng data, ang mga pinagmumulan ng data ay ang pinaka-nagsisiwalat sa bahaging ito. Hindi mahalaga kung gaano natin pangarap na maalis ito, ngunit hanggang ngayon ang bahagi ng data ay ipinadala sa mga instant messenger at email, hindi banggitin ang higit pang mga archaic na format. Inaanyayahan kita na i-disassemble ang isa sa mga opsyon para sa Apache Airflow, na naglalarawan kung paano ka makakakuha ng data mula sa mga email.
prehistory
Maraming data pa rin ang inililipat sa pamamagitan ng e-mail, mula sa mga interpersonal na komunikasyon hanggang sa mga pamantayan ng pakikipag-ugnayan sa pagitan ng mga kumpanya. Mabuti kung posible na magsulat ng isang interface upang makakuha ng data o ilagay ang mga tao sa opisina na maglalagay ng impormasyong ito sa mas maginhawang mga mapagkukunan, ngunit kadalasan ay hindi ito posible. Ang partikular na gawain na kinaharap ko ay ang pagkonekta sa kilalang CRM system sa data warehouse, at pagkatapos ay sa OLAP system. Nangyari ito sa kasaysayan na para sa aming kumpanya ang paggamit ng sistemang ito ay maginhawa sa isang partikular na lugar ng negosyo. Samakatuwid, gusto talaga ng lahat na makapagpatakbo gamit ang data mula sa third-party system na ito rin. Una sa lahat, siyempre, ang posibilidad ng pagkuha ng data mula sa isang bukas na API ay pinag-aralan. Sa kasamaang-palad, hindi saklaw ng API ang pagkuha ng lahat ng kinakailangang data, at, sa madaling salita, ito ay baluktot sa maraming paraan, at ayaw o hindi matugunan ng teknikal na suporta ang kalahati upang magbigay ng mas komprehensibong paggana. Ngunit ang sistemang ito ay nagbigay ng pagkakataon na pana-panahong makatanggap ng nawawalang data sa pamamagitan ng koreo sa anyo ng isang link para sa pag-unload ng archive.
Dapat tandaan na hindi lamang ito ang kaso kung saan nais ng negosyo na mangolekta ng data mula sa mga email o instant messenger. Gayunpaman, sa kasong ito, hindi namin maimpluwensyahan ang isang third-party na kumpanya na nagbibigay lamang ng bahagi ng data sa ganitong paraan.
apache airflow
Upang bumuo ng mga proseso ng ETL, madalas naming ginagamit ang Apache Airflow. Upang mas maunawaan ng isang mambabasa na hindi pamilyar sa teknolohiyang ito kung ano ang hitsura nito sa konteksto at sa pangkalahatan, ilalarawan ko ang ilang mga panimulang.
Ang Apache Airflow ay isang libreng platform na ginagamit upang bumuo, magsagawa at subaybayan ang mga proseso ng ETL (Extract-Transform-Loading) sa Python. Ang pangunahing konsepto sa Airflow ay isang direktang acyclic graph, kung saan ang mga vertices ng graph ay mga partikular na proseso, at ang mga gilid ng graph ay ang daloy ng kontrol o impormasyon. Ang isang proseso ay maaari lamang tumawag sa anumang function ng Python, o maaari itong magkaroon ng mas kumplikadong lohika mula sa sunud-sunod na pagtawag sa ilang mga function sa konteksto ng isang klase. Para sa pinakamadalas na operasyon, marami nang mga nakahanda nang pagpapaunlad na maaaring magamit bilang mga proseso. Kabilang sa mga naturang pag-unlad ang:
- mga operator - para sa paglilipat ng data mula sa isang lugar patungo sa isa pa, halimbawa, mula sa isang talahanayan ng database patungo sa isang bodega ng data;
- mga sensor - para sa paghihintay para sa paglitaw ng isang tiyak na kaganapan at pagdidirekta sa daloy ng kontrol sa kasunod na mga vertices ng graph;
- mga kawit - para sa mas mababang antas ng mga operasyon, halimbawa, upang makakuha ng data mula sa isang talahanayan ng database (ginamit sa mga pahayag);
- at iba pa
Hindi angkop na ilarawan ang Apache Airflow nang detalyado sa artikulong ito. Maaaring matingnan ang maikling pagpapakilala
Hook para sa pagkuha ng data
Una sa lahat, upang malutas ang problema, kailangan nating magsulat ng isang kawit kung saan maaari nating:
- kumonekta sa email
- hanapin ang tamang titik
- tumanggap ng datos mula sa liham.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Ang lohika ay ito: kumonekta kami, hanapin ang huling pinaka-nauugnay na sulat, kung may iba pa, hindi namin sila pinapansin. Ginagamit ang function na ito, dahil ang mga susunod na titik ay naglalaman ng lahat ng data ng mga nauna. Kung hindi ito ang kaso, maaari mong ibalik ang isang array ng lahat ng mga titik, o iproseso ang una, at ang natitira sa susunod na pass. Sa pangkalahatan, ang lahat, gaya ng dati, ay nakasalalay sa gawain.
Nagdaragdag kami ng dalawang pantulong na pag-andar sa hook: para sa pag-download ng isang file at para sa pag-download ng isang file gamit ang isang link mula sa isang email. Sa pamamagitan ng paraan, maaari silang ilipat sa operator, depende ito sa dalas ng paggamit ng pag-andar na ito. Ano pa ang idaragdag sa hook, muli, ay depende sa gawain: kung ang mga file ay natanggap kaagad sa liham, pagkatapos ay maaari kang mag-download ng mga attachment sa liham, kung ang data ay natanggap sa liham, pagkatapos ay kailangan mong i-parse ang liham, atbp. Sa aking kaso, ang liham ay may kasamang isang link sa archive, na kailangan kong ilagay sa isang tiyak na lugar at simulan ang karagdagang proseso ng pagproseso.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Ang code ay simple, kaya halos hindi ito nangangailangan ng karagdagang paliwanag. Sasabihin ko lang sa iyo ang tungkol sa magic line na imap_conn_id. Ang Apache Airflow ay nag-iimbak ng mga parameter ng koneksyon (pag-login, password, address, at iba pang mga parameter) na maaaring ma-access ng isang string identifier. Biswal, ganito ang hitsura ng pamamahala ng koneksyon
Sensor upang maghintay para sa data
Dahil alam na natin kung paano kumonekta at tumanggap ng data mula sa mail, maaari na tayong sumulat ng sensor para hintayin sila. Sa aking kaso, hindi gumana ang pagsulat kaagad ng operator na magpoproseso ng data, kung mayroon man, dahil gumagana ang ibang mga proseso batay sa data na natanggap mula sa mail, kabilang ang mga kumukuha ng nauugnay na data mula sa iba pang mga mapagkukunan (API, telephony , mga sukatan sa web, atbp.). atbp.). Bibigyan kita ng isang halimbawa. May lumitaw na bagong user sa CRM system, at hindi pa rin namin alam ang tungkol sa kanyang UUID. Pagkatapos, kapag sinusubukang tumanggap ng data mula sa SIP telephony, makakatanggap kami ng mga tawag na nauugnay sa UUID nito, ngunit hindi namin mai-save at magagamit ang mga ito nang tama. Sa ganitong mga bagay, mahalagang isaisip ang pag-asa ng data, lalo na kung ang mga ito ay mula sa iba't ibang mapagkukunan. Ang mga ito, siyempre, ay hindi sapat na mga hakbang upang mapanatili ang integridad ng data, ngunit sa ilang mga kaso ay kinakailangan ang mga ito. Oo, at ang kawalang-ginagawa upang sakupin ang mga mapagkukunan ay hindi rin makatwiran.
Kaya, ang aming sensor ay maglulunsad ng mga kasunod na vertice ng graph kung mayroong sariwang impormasyon sa mail, at markahan din ang nakaraang impormasyon bilang hindi nauugnay.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Tumatanggap at gumagamit kami ng data
Upang makatanggap at magproseso ng data, maaari kang magsulat ng isang hiwalay na operator, maaari mong gamitin ang mga handa na. Dahil sa ngayon ang lohika ay walang halaga - upang kumuha ng data mula sa liham, halimbawa, iminumungkahi ko ang karaniwang PythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Sa pamamagitan ng paraan, kung ang iyong corporate mail ay nasa mail.ru din, kung gayon hindi ka makakapaghanap ng mga titik ayon sa paksa, nagpadala, atbp. Noong 2016, nangako silang ipapakilala ito, ngunit tila nagbago ang kanilang isip. Nalutas ko ang problemang ito sa pamamagitan ng paggawa ng hiwalay na folder para sa mga kinakailangang titik at pag-set up ng filter para sa mga kinakailangang titik sa mail web interface. Kaya, ang mga kinakailangang titik at kundisyon lamang para sa paghahanap, sa aking kaso, simpleng (UNSEEN) ang makapasok sa folder na ito.
Summarizing, mayroon kaming sumusunod na pagkakasunud-sunod: sinusuri namin kung may mga bagong titik na nakakatugon sa mga kondisyon, kung mayroon, pagkatapos ay i-download namin ang archive gamit ang link mula sa huling titik.
Sa ilalim ng mga huling tuldok, tinatanggal na ang archive na ito ay aalisin, ang data mula sa archive ay mali-clear at mapoproseso, at bilang isang resulta, ang buong bagay ay mapupunta pa sa pipeline ng proseso ng ETL, ngunit ito ay lampas na. ang saklaw ng artikulo. Kung ito ay naging kawili-wili at kapaki-pakinabang, pagkatapos ay natutuwa akong patuloy na ilarawan ang mga solusyon sa ETL at ang kanilang mga bahagi para sa Apache Airflow.
Pinagmulan: www.habr.com