जतिसुकै प्रविधिको विकास भएतापनि पुरानो दृष्टिकोणको एक स्ट्रिङ सधैं विकासको पछि लाग्छ। यो एक सहज संक्रमण, मानव कारक, प्राविधिक आवश्यकता, वा अन्य केहि कारण हुन सक्छ। डाटा प्रोसेसिङको क्षेत्रमा, डाटा स्रोतहरू यस भागमा सबैभन्दा खुलासा हुन्छन्। हामी यसबाट छुटकारा पाउने जतिसुकै सपना देख्छौं, तर अहिलेसम्म डाटाको केही अंश तत्काल मेसेन्जर र इमेलहरूमा पठाइन्छ, थप पुरातन ढाँचाहरू उल्लेख नगर्न। म तपाईंलाई Apache Airflow को लागि विकल्पहरू मध्ये एकलाई छुट्याउन आमन्त्रित गर्दछु, तपाईं कसरी इमेलहरूबाट डाटा लिन सक्नुहुन्छ भन्ने चित्रण गर्दै।
प्रागितिहास
धेरै डाटा अझै पनि इ-मेल मार्फत हस्तान्तरण गरिएको छ, पारस्परिक संचार देखि कम्पनीहरु बीच अन्तरक्रिया को स्तर मा। यो राम्रो छ यदि डेटा प्राप्त गर्नको लागि एक इन्टरफेस लेख्न वा कार्यालयमा व्यक्तिहरू राख्न सम्भव छ जसले यो जानकारी थप सुविधाजनक स्रोतहरूमा प्रविष्ट गर्नेछ, तर अक्सर यो सम्भव नहुन सक्छ। मैले सामना गरेको विशिष्ट कार्य कुख्यात CRM प्रणालीलाई डाटा गोदाममा र त्यसपछि OLAP प्रणालीमा जडान गर्ने थियो। यो ऐतिहासिक रूपमा भयो कि हाम्रो कम्पनीको लागि यो प्रणालीको प्रयोग व्यापारको एक विशेष क्षेत्रमा सुविधाजनक थियो। तसर्थ, सबैजना साँच्चै यो तेस्रो-पक्ष प्रणालीबाट डाटा संग काम गर्न सक्षम हुन चाहन्थे। सबै भन्दा पहिले, निस्सन्देह, खुला API बाट डाटा प्राप्त गर्ने सम्भावना अध्ययन गरिएको थियो। दुर्भाग्यवश, एपीआईले सबै आवश्यक डेटा प्राप्त गर्न कभर गरेन, र, सरल शब्दहरूमा, यो धेरै तरिकामा बाङ्गो थियो, र प्राविधिक समर्थन चाहँदैन थियो वा अधिक व्यापक कार्यक्षमता प्रदान गर्न आधा बाटो पूरा गर्न सकेन। तर यो प्रणालीले आवधिक रूपमा संग्रह अनलोड गर्न लिङ्कको रूपमा मेल मार्फत हराएको डाटा प्राप्त गर्ने अवसर प्रदान गर्यो।
यो ध्यान दिनुपर्छ कि यो एक मात्र मामला थिएन जसमा व्यवसायले इमेल वा तत्काल सन्देशवाहकहरूबाट डाटा सङ्कलन गर्न चाहन्थे। यद्यपि, यस अवस्थामा, हामीले तेस्रो-पक्ष कम्पनीलाई प्रभाव पार्न सकेनौं जसले डेटाको अंश यस तरिकाले मात्र प्रदान गर्दछ।
अपाचे एयरफ्लो
ETL प्रक्रियाहरू निर्माण गर्न, हामी प्रायः Apache Airflow प्रयोग गर्छौं। यस प्रविधिसँग अपरिचित पाठकलाई यो सन्दर्भमा र सामान्य रूपमा कस्तो देखिन्छ भनेर राम्रोसँग बुझ्नको लागि, म केही परिचयात्मकहरू वर्णन गर्नेछु।
Apache Airflow एउटा नि:शुल्क प्लेटफर्म हो जुन पाइथनमा ETL (Extract-Transform-Loading) प्रक्रियाहरू निर्माण, कार्यान्वयन र निगरानी गर्न प्रयोग गरिन्छ। एयरफ्लोमा मुख्य अवधारणा एक निर्देशित एसाइक्लिक ग्राफ हो, जहाँ ग्राफको ठाडोहरू विशिष्ट प्रक्रियाहरू हुन्, र ग्राफको किनारहरू नियन्त्रण वा जानकारीको प्रवाह हुन्। एउटा प्रक्रियाले कुनै पनि पाइथन प्रकार्यलाई मात्र कल गर्न सक्छ, वा क्लासको सन्दर्भमा धेरै प्रकार्यहरूलाई क्रमिक रूपमा कल गरेर थप जटिल तर्क हुन सक्छ। धेरै पटक सञ्चालनका लागि, त्यहाँ पहिले नै धेरै तयार-बनाइएका विकासहरू छन् जुन प्रक्रियाहरूको रूपमा प्रयोग गर्न सकिन्छ। त्यस्ता विकासहरू समावेश छन्:
- अपरेटरहरू - एक ठाउँबाट अर्को ठाउँमा डाटा स्थानान्तरण गर्नका लागि, उदाहरणका लागि, डाटाबेस तालिकाबाट डाटा गोदाममा;
- सेन्सरहरू - निश्चित घटनाको घटनाको लागि पर्खन र ग्राफको पछिल्ला ठाडोहरूमा नियन्त्रणको प्रवाहलाई निर्देशित गर्नको लागि;
- हुकहरू - तल्लो-स्तर सञ्चालनहरूका लागि, उदाहरणका लागि, डाटाबेस तालिकाबाट डाटा प्राप्त गर्न (कथनहरूमा प्रयोग गरिएको);
- र यति।
यो लेखमा Apache Airflow को विस्तृत रूपमा वर्णन गर्न अनुपयुक्त हुनेछ। संक्षिप्त परिचय हेर्न सकिन्छ
डाटा प्राप्त गर्न हुक
सबै भन्दा पहिले, समस्या समाधान गर्न, हामीले एउटा हुक लेख्न आवश्यक छ जसको साथ हामी गर्न सक्छौं:
- इमेलमा जडान गर्नुहोस्
- सही अक्षर फेला पार्नुहोस्
- पत्रबाट डाटा प्राप्त गर्नुहोस्।
from airflow.hooks.base_hook import BaseHook
import imaplib
import logging
class IMAPHook(BaseHook):
def __init__(self, imap_conn_id):
"""
IMAP hook для получения данных с электронной почты
:param imap_conn_id: Идентификатор подключения к почте
:type imap_conn_id: string
"""
self.connection = self.get_connection(imap_conn_id)
self.mail = None
def authenticate(self):
"""
Подключаемся к почте
"""
mail = imaplib.IMAP4_SSL(self.connection.host)
response, detail = mail.login(user=self.connection.login, password=self.connection.password)
if response != "OK":
raise AirflowException("Sign in failed")
else:
self.mail = mail
def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
"""
Метод для получения идентификатора последнего письма,
удовлетвораяющего условиям поиска
:param check_seen: Отмечать последнее письмо как прочитанное
:type check_seen: bool
:param box: Наименования ящика
:type box: string
:param condition: Условия поиска писем
:type condition: string
"""
self.authenticate()
self.mail.select(mailbox=box)
response, data = self.mail.search(None, condition)
mail_ids = data[0].split()
logging.info("В ящике найдены следующие письма: " + str(mail_ids))
if not mail_ids:
logging.info("Не найдено новых писем")
return None
mail_id = mail_ids[0]
# если таких писем несколько
if len(mail_ids) > 1:
# отмечаем остальные прочитанными
for id in mail_ids:
self.mail.store(id, "+FLAGS", "\Seen")
# возвращаем последнее
mail_id = mail_ids[-1]
# нужно ли отметить последнее прочитанным
if not check_seen:
self.mail.store(mail_id, "-FLAGS", "\Seen")
return mail_id
तर्क यो हो: हामी जडान गर्छौं, अन्तिम सबैभन्दा सान्दर्भिक पत्र फेला पार्छौं, यदि त्यहाँ अरू छन् भने, हामी तिनीहरूलाई बेवास्ता गर्छौं। यो प्रकार्य प्रयोग गरिन्छ, किनभने पछिका अक्षरहरूमा पहिलेका सबै डाटा समावेश हुन्छन्। यदि यो मामला होइन भने, त्यसपछि तपाइँ सबै अक्षरहरूको एरे फर्काउन सक्नुहुन्छ, वा पहिलोलाई प्रशोधन गर्न सक्नुहुन्छ, र बाँकी अर्को पासमा। सामान्यतया, सबै कुरा, सधैं रूपमा, कार्य मा निर्भर गर्दछ।
हामी हुकमा दुई सहायक प्रकार्यहरू थप्छौं: फाइल डाउनलोड गर्न र इमेलबाट लिङ्क प्रयोग गरेर फाइल डाउनलोड गर्नका लागि। वैसे, तिनीहरू अपरेटरमा सार्न सकिन्छ, यो यो कार्यक्षमता प्रयोग गर्ने आवृत्तिमा निर्भर गर्दछ। हुकमा थप्न के गर्न, फेरि, कार्यमा निर्भर गर्दछ: यदि फाइलहरू पत्रमा तुरुन्तै प्राप्त भएमा, त्यसपछि तपाइँ पत्रमा संलग्नकहरू डाउनलोड गर्न सक्नुहुन्छ, यदि पत्रमा डाटा प्राप्त भयो भने, तपाइँलाई पत्र पार्स गर्न आवश्यक छ, आदि मेरो मामलामा, पत्र संग्रहमा एउटा लिङ्कको साथ आउँछ, जुन मैले निश्चित ठाउँमा राख्नु पर्छ र थप प्रशोधन प्रक्रिया सुरु गर्न आवश्यक छ।
def download_from_url(self, url, path, chunk_size=128):
"""
Метод для скачивания файла
:param url: Адрес загрузки
:type url: string
:param path: Куда положить файл
:type path: string
:param chunk_size: По сколько байтов писать
:type chunk_size: int
"""
r = requests.get(url, stream=True)
with open(path, "wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def download_mail_href_attachment(self, mail_id, path):
"""
Метод для скачивания файла по ссылке из письма
:param mail_id: Идентификатор письма
:type mail_id: string
:param path: Куда положить файл
:type path: string
"""
response, data = self.mail.fetch(mail_id, "(RFC822)")
raw_email = data[0][1]
raw_soup = raw_email.decode().replace("r", "").replace("n", "")
parse_soup = BeautifulSoup(raw_soup, "html.parser")
link_text = ""
for a in parse_soup.find_all("a", href=True, text=True):
link_text = a["href"]
self.download_from_url(link_text, path)
कोड सरल छ, त्यसैले यसलाई शायदै थप व्याख्या आवश्यक छ। म तपाईंलाई जादुई रेखा imap_conn_id बारे बताउनेछु। Apache Airflow ले जडान प्यारामिटरहरू (लगइन, पासवर्ड, ठेगाना, र अन्य प्यारामिटरहरू) भण्डार गर्दछ जुन स्ट्रिङ पहिचानकर्ताद्वारा पहुँच गर्न सकिन्छ। दृश्यात्मक रूपमा, जडान व्यवस्थापन यस्तो देखिन्छ
डेटाको लागि पर्खन सेन्सर
हामीलाई पहिले नै थाहा छ कि कसरी मेलबाट डेटा जडान गर्ने र प्राप्त गर्ने, अब हामी तिनीहरूलाई पर्खन सेन्सर लेख्न सक्छौं। मेरो मामलामा, यो कुनै अपरेटरलाई तुरुन्तै लेख्न काम गर्दैन जसले डाटालाई प्रशोधन गर्दछ, यदि कुनै हो, किनभने अन्य प्रक्रियाहरूले मेलबाट प्राप्त डाटामा आधारित काम गर्दछ, अन्य स्रोतहरू (एपीआई, टेलिफोनी) बाट सम्बन्धित डाटा लिनेहरू सहित। , वेब मेट्रिक्स, आदि।) आदि)। म तपाईंलाई एउटा उदाहरण दिनेछु। CRM प्रणालीमा नयाँ प्रयोगकर्ता देखा परेको छ, र हामीलाई अझै पनि उसको UUID बारे थाहा छैन। त्यसपछि, SIP टेलिफोनीबाट डाटा प्राप्त गर्ने प्रयास गर्दा, हामी यसको UUID सँग जोडिएको कलहरू प्राप्त गर्नेछौं, तर हामीले तिनीहरूलाई सुरक्षित गर्न र प्रयोग गर्न सक्षम हुने छैनौं। त्यस्ता मामिलाहरूमा, डाटाको निर्भरतालाई ध्यानमा राख्नु महत्त्वपूर्ण छ, विशेष गरी यदि तिनीहरू विभिन्न स्रोतहरूबाट छन्। यी निस्सन्देह, डेटा अखण्डता जोगाउन अपर्याप्त उपायहरू छन्, तर केही अवस्थामा तिनीहरू आवश्यक छन्। हो, र स्रोतहरू कब्जा गर्न अल्छी पनि तर्कहीन छ।
यसैले, हाम्रो सेन्सरले पत्रमा ताजा जानकारी छ भने ग्राफको पछिल्ला ठाडोहरू सुरु गर्नेछ, र अघिल्लो जानकारीलाई अप्रासंगिक भनी चिन्ह लगाउनेछ।
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook
class MailSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.check_seen = check_seen
self.box = box
self.condition = condition
def poke(self, context):
conn = IMAPHook(self.conn_id)
mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
if mail_id is None:
return False
else:
return True
हामी डेटा प्राप्त र प्रयोग गर्छौं
डाटा प्राप्त गर्न र प्रशोधन गर्न, तपाइँ एक अलग अपरेटर लेख्न सक्नुहुन्छ, तपाइँ तयार-बनाएकाहरू प्रयोग गर्न सक्नुहुन्छ। तर्क अझै मामूली छ - अक्षरबाट डाटा लिनको लागि, उदाहरणका लागि, म मानक पाइथन अपरेटर सुझाव दिन्छु।
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook
start_date = datetime(2020, 4, 4)
# Стандартное конфигурирование графа
args = {
"owner": "example",
"start_date": start_date,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retry_delay": timedelta(minutes=15),
"provide_context": False,
}
dag = DAG(
dag_id="test_etl",
default_args=args,
schedule_interval="@hourly",
)
# Определяем сенсор
mail_check_sensor = MailSensor(
task_id="check_new_emails",
poke_interval=10,
conn_id="mail_conn_id",
timeout=10,
soft_fail=True,
box="my_box",
dag=dag,
mode="poke",
)
# Функция для получения данных из письма
def prepare_mail():
imap_hook = IMAPHook("mail_conn_id")
mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
if mail_id is None:
raise AirflowException("Empty mailbox")
conn.download_mail_href_attachment(mail_id, "./path.zip")
prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
# Описание остальных вершин графа
...
# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления
खैर, यदि तपाइँको कर्पोरेट मेल mail.ru मा पनि छ भने, तपाइँ विषय, प्रेषक, आदि द्वारा अक्षरहरू खोज्न सक्षम हुनुहुने छैन। 2016 मा फिर्ता, तिनीहरूले यसलाई पेश गर्ने वाचा गरे, तर स्पष्ट रूपमा तिनीहरूको दिमाग परिवर्तन भयो। मैले आवश्यक अक्षरहरूको लागि छुट्टै फोल्डर सिर्जना गरेर र मेल वेब इन्टरफेसमा आवश्यक अक्षरहरूको लागि फिल्टर सेटअप गरेर यो समस्या समाधान गरें। यसैले, केवल आवश्यक अक्षरहरू र खोजका लागि सर्तहरू, मेरो मामलामा, यो फोल्डरमा बस (अनदेखी) पाउनुहोस्।
संक्षेपमा, हामीसँग निम्न अनुक्रम छ: हामी सर्तहरू पूरा गर्ने नयाँ अक्षरहरू छन् कि छैनन् भनेर जाँच गर्छौं, यदि त्यहाँ छन् भने, हामी अन्तिम अक्षरको लिङ्क प्रयोग गरेर संग्रह डाउनलोड गर्छौं।
अन्तिम बिन्दुहरू अन्तर्गत, यो हटाइएको छ कि यो अभिलेख अनप्याक गरिनेछ, अभिलेखबाट डाटा खाली गरिनेछ र प्रशोधन गरिनेछ, र नतिजाको रूपमा, सम्पूर्ण कुरा ETL प्रक्रियाको पाइपलाइनमा जान्छ, तर यो पहिले नै बाहिर छ। लेखको दायरा। यदि यो रोचक र उपयोगी भयो भने, म खुशीसाथ ETL समाधानहरू र Apache Airflow को लागि तिनीहरूका भागहरू वर्णन गर्न जारी राख्नेछु।
स्रोत: www.habr.com