Quantumcumque technologiae enucleatur, chorda iamnonis appropinquat semper post progressionem trahit. Hoc contingere potest propter leves transitus, factores hominum, necessitates technologicas, vel aliquid aliud. In campo notitiae processus, fontes notitiae sunt maxime in hac parte revelantes. Quantumvis hoc tollendi somniamus, sed tantum pars notitiarum in instanti nuntiorum et electronicorum mittitur, ut plura archaica formas taceam. Rogamus te ut unam ex optionibus Apache Airflow disgreges, illustrans quomodo notitias ex electronicis accipere potes.
erectus
Multum notitiarum adhuc per electronicas transfertur, ex communicationibus interpersonalibus ad signa commercii inter societates. Bonum est si fieri potest ut interface scribere datas vel homines in officio ponere, qui informationem hanc in fontes commodiores intrabunt, sed saepe hoc simpliciter fieri non potest. Negotium peculiare, quod ante me erat, notorium CRM systematis cum notitia horreorum iungebat, et deinde cum systemate OLAP. Sic historice factum est ut pro nostro coetu usus huius systematis in peculiari negotiorum provincia esset opportunus. Omnes igitur re vera cum notitia ex hac tertia-partii ratione etiam operari posse volebant. Imprimis, sane, facultatem obtinendi notitia ex aperto API studuit. Infeliciter, API omnia necessarias notitias non operiebat, et, simplicibus verbis, multis modis pravae erat, et technicae subsidii nolebat aut non poterat in medio congredi ut plus muneris magis latioris praeberet. Sed haec ratio occasionem praebet ut absentes notitias per epistulas recipiant in modum nexus ad archivum exonerandam.
Animadvertendum est hunc casum solum non fuisse in quo negotium notitias ex nuntiis vel instantibus nuntiis colligere volebant. Attamen in hoc casu non potuit movere comitatum tertium-pars qui hoc modo tantum partem praebet notitiae.
Apache editi
Ad processum ETL aedificandum, frequentius Apache Airflow utimur. Ut lector, qui huic technologiae ignarus melius intelligat quomodo spectat in contextu et generatim, duo introductoria describemus.
Apache Airflow est liberum tribunal quod fabricare, exequi et monitor ETL (Extract-Transform-Loading) processuum in Pythone est. Praecipua notio in Airflow est graphio acyclico directa, ubi vertices graphi sunt processus specifici, et margines graphi sunt fluxus imperii seu notitiae. Processus simpliciter quemlibet munus Python vocare potest, vel logicam magis implicatam habere potest ex pluribus functionibus consequenter vocatis in contextu generis. Ad operationes frequentiores, iam multae progressiones paratae factae sunt, quae uti processibus adhiberi possunt. Huiusmodi explicationes complectuntur:
- operariorum - notitias de loco in locum transferendi, exempli gratia, ex tabula datorum ad cellam datam;
- sensoriis - exspectandi eventum cuiusdam eventus ac moderandi fluxum imperii ad vertices graphi subsequentes;
- hami - pro operationibus inferioribus, exempli gratia, ut notitias ex tabula datorum (in propositionibus usitatas);
- etc.
Inconvenienter Apache Airflow in hoc articulo singillatim describere. Brevis initia considerari possunt
Hamo ad questus notitia
Imprimis, ut problema solvendum, hamum scribere oportet quo potuimus;
- coniungere ad email
- invenire ius litterae
- ex litteris accipere notitia.
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
Ρ ΡΠ»Π΅ΠΊΡΡΠΎΠ½Π½ΠΎΠΉ ΠΏΠΎΡΡΡ
:param imap_conn_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡΡΠ΅
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
ΠΠΎΠ΄ΠΊΠ»ΡΡΠ°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡΡΠ΅
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ ΠΏΠΈΡΡΠΌΠ°,
ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΠ°ΡΡΡΠ΅Π³ΠΎ ΡΡΠ»ΠΎΠ²ΠΈΡΠΌ ΠΏΠΎΠΈΡΠΊΠ°
:param check_seen: ΠΡΠΌΠ΅ΡΠ°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΠΈΡΡΠΌΠΎ ΠΊΠ°ΠΊ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΠΎΠ΅
:type check_seen: bool
:param box: ΠΠ°ΠΈΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΡΠΈΠΊΠ°
:type box: string
:param condition: Π£ΡΠ»ΠΎΠ²ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΠΏΠΈΡΠ΅ΠΌ
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("Π ΡΡΠΈΠΊΠ΅ Π½Π°ΠΉΠ΄Π΅Π½Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ ΠΏΠΈΡΡΠΌΠ°: " + str(mail_ids))
if not mail_ids:
logging.info("ΠΠ΅ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²ΡΡ
ΠΏΠΈΡΠ΅ΠΌ")
return None
mail_id = mail_ids[0]
# Π΅ΡΠ»ΠΈ ΡΠ°ΠΊΠΈΡ
ΠΏΠΈΡΠ΅ΠΌ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ
if len(mail_ids) > 1:
# ΠΎΡΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΎΡΡΠ°Π»ΡΠ½ΡΠ΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌΠΈ
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅
mail_id = mail_ids[-1]
# Π½ΡΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡΠΌΠ΅ΡΠΈΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π΅ ΠΏΡΠΎΡΠΈΡΠ°Π½Π½ΡΠΌ
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
Logica haec est: connectimus, extremam litteram maxime pertinentes reperimus, si quae sunt aliae, eas negligimus. Hoc munus adhibetur, quia litterae posteriores omnes notitias priorum continent. Quod si non ita est, tunc omnes literae ordinatae redire potes, seu primum processum, et reliqua in proximum transitum. Fere omnia, ut semper, ab officio pendent.
Duo munera auxiliaria ad hamum addimus: ad limam deprimendam et ad limam utendam ab inscriptionem nexum adhibendam. Viam facere possunt ad operantem, pendet ex frequentia huius functionis utendi. Quid aliud ad hamum adiciendum, rursus in negotio dependet: si statim recipiuntur fasciculi in epistula, tum attachiamenta ad litteram, si notitia recipitur in epistula, necesse est litteras dividere; etc. In casu meo, epistolae cum uno nexus ad archivum venit, quem certo loco ponere debeo et processus ulterioris processus committitur.
def download_from_url(self, url, path, chunk_size=128):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π°
:param url: ΠΠ΄ΡΠ΅Ρ Π·Π°Π³ΡΡΠ·ΠΊΠΈ
:type url: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
:param chunk_size: ΠΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ Π±Π°ΠΉΡΠΎΠ² ΠΏΠΈΡΠ°ΡΡ
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
ΠΠ΅ΡΠΎΠ΄ Π΄Π»Ρ ΡΠΊΠ°ΡΠΈΠ²Π°Π½ΠΈΡ ΡΠ°ΠΉΠ»Π° ΠΏΠΎ ΡΡΡΠ»ΠΊΠ΅ ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
:param mail_id: ΠΠ΄Π΅Π½ΡΠΈΡΠΈΠΊΠ°ΡΠΎΡ ΠΏΠΈΡΡΠΌΠ°
:type mail_id: string
:param path: ΠΡΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠ°ΠΉΠ»
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
Codex simplex est, ut vix explicatione ampliori indigeat. Modo dicam tibi de linea magica imap_conn_id. Apache Airflow reponit nexum parametri (login, tesseram, inscriptionem, aliosque parametri) quae per filo identifier accedere possunt. Visum, connexio procuratio similis est
Sensorem exspectare notitia
Cum iam scimus quomodo notitias connectere et ex epistularum recipiunt, nunc sensorem scribere possumus ut eas exspectamus. In casu meo, non laboravit operantem statim scribere qui notitias procedet, si quis, quia alii processus laborant in notitia electronicarum receptarum, in iis quae notitias ex aliis fontibus relatas accipiunt (API, telephony. metri textus, etc.). Exemplum dabo. Novus usor in CRM systemate apparuit, et de eius UUID adhuc nescimus. Deinde cum notitias ex HAUSTUS telephony accipere conantes, vocatum UUID ligatum recipiemus, sed ea recte servare et uti non poterimus. His in rebus refert dependentiam notitiarum prae oculis habere, praesertim si ex diversis auctoribus sunt. Hae sunt quidem satis mensurae ad integritatem datam conservandam, sed in quibusdam necessaria sunt. Immo et otiosus opes occupandi irrationabilis est.
Sic sensor noster posteriores vertices graphi mittet si notitiae recentes in electronica epistularum sunt, et etiam priora indicia nullius momenti sunt.
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
Nos accipimus et utimur notitia
Data ad recipiendum et processum, operatorem separatum scribere potes, paratis factis uti potes. Cum logica hactenus levis sit - ut ex littera notitias capias, exempli gratia signum PythonOperatoris admoneo.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Π‘ΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ΅ ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# ΠΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠ΅Π½ΡΠΎΡ
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΠΏΠΈΡΡΠΌΠ°
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
Π²Π΅ΡΡΠΈΠ½ Π³ΡΠ°ΡΠ°
...
# ΠΠ°Π΄Π°Π΅ΠΌ ΡΠ²ΡΠ·Ρ Π½Π° Π³ΡΠ°ΡΠ΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ΠΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΠΎΡΡΠ°Π»ΡΠ½ΡΡ
ΠΏΠΎΡΠΎΠΊΠΎΠ² ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΡ
Obiter, si epistularum corpus tuum etiam in mail.ru est, tunc litteras a subiecto, mittente, etc. Retro in 2016, id se introducere promiserunt, sed animos eorum apparenter mutaverunt. Hoc problema solvi faciendo folder separatum pro litteris necessariis et colum ponens litteris necessariis in interface electronicis. Ita, solum necessariae litterae et condiciones quaerendi, in casu meo, simpliciter in hunc folder ingrediuntur.
Summatim, sequentia sequentia habemus: reprimimus si novae litterae sunt, quae condiciones occurrentes, si quae sint, archivum ex postrema littera nexu utentes eximimus.
Sub ultimis punctis omittitur ut hoc archivum sit solutum, notitia ex archivo patebit et discursum est, et per consequens res tota ulterius ad processum ETL pipelinum perveniet, sed hoc iam ultra. ambitum articuli. Si commodam et utilem evenerit, tum libenter ETL solutiones earumque partes Apache Airflui describere pergam.
Source: www.habr.com