அப்பாச்சி ஏர்ஃப்ளோவில் மின்னஞ்சலில் இருந்து தரவைப் பெறுவதற்கான ETL செயல்முறை

அப்பாச்சி ஏர்ஃப்ளோவில் மின்னஞ்சலில் இருந்து தரவைப் பெறுவதற்கான ETL செயல்முறை

தொழில்நுட்பம் எவ்வளவு வளர்ச்சியடைந்தாலும், காலாவதியான அணுகுமுறைகளின் சரம் எப்போதும் வளர்ச்சியின் பின்னால் செல்கிறது. இது ஒரு மென்மையான மாற்றம், மனித காரணிகள், தொழில்நுட்ப தேவைகள் அல்லது வேறு ஏதாவது காரணமாக இருக்கலாம். தரவு செயலாக்கத் துறையில், தரவு மூலங்கள் இந்த பகுதியில் மிகவும் வெளிப்படுத்துகின்றன. இதிலிருந்து விடுபட வேண்டும் என்று நாம் எவ்வளவு கனவு கண்டாலும், இதுவரை தரவுகளின் ஒரு பகுதி உடனடி தூதர்கள் மற்றும் மின்னஞ்சல்களில் அனுப்பப்படுகிறது, மேலும் தொன்மையான வடிவங்களைக் குறிப்பிட தேவையில்லை. அப்பாச்சி ஏர்ஃப்ளோவிற்கான விருப்பங்களில் ஒன்றை பிரிப்பதற்கு உங்களை அழைக்கிறேன், மின்னஞ்சல்களில் இருந்து நீங்கள் எவ்வாறு தரவை எடுக்கலாம் என்பதை விளக்குகிறது.

முன்வரலாறு

தனிநபர் தொடர்புகளிலிருந்து நிறுவனங்களுக்கிடையேயான தொடர்புகளின் தரநிலைகள் வரை நிறைய தரவு இன்னும் மின்னஞ்சல் வழியாக மாற்றப்படுகிறது. தரவைப் பெற ஒரு இடைமுகத்தை எழுதுவது அல்லது இந்த தகவலை மிகவும் வசதியான ஆதாரங்களில் உள்ளிடும் நபர்களை அலுவலகத்தில் வைப்பது நல்லது, ஆனால் பெரும்பாலும் இது சாத்தியமில்லை. நான் எதிர்கொண்ட குறிப்பிட்ட பணியானது, மோசமான CRM அமைப்பை தரவுக் கிடங்கிற்கும், பின்னர் OLAP அமைப்பிற்கும் இணைப்பதாகும். இது வரலாற்று ரீதியாக நடந்தது, எங்கள் நிறுவனத்திற்கு இந்த அமைப்பைப் பயன்படுத்துவது ஒரு குறிப்பிட்ட வணிகப் பகுதியில் வசதியாக இருந்தது. எனவே, இந்த மூன்றாம் தரப்பு அமைப்பிலிருந்தும் தரவைக் கொண்டு செயல்பட வேண்டும் என்று அனைவரும் விரும்பினர். முதலாவதாக, நிச்சயமாக, திறந்த API இலிருந்து தரவைப் பெறுவதற்கான சாத்தியம் ஆய்வு செய்யப்பட்டது. துரதிர்ஷ்டவசமாக, API ஆனது தேவையான அனைத்து தரவையும் பெறவில்லை, மேலும் எளிமையான சொற்களில், இது பல வழிகளில் வளைந்திருந்தது, மேலும் தொழில்நுட்ப ஆதரவு இன்னும் விரிவான செயல்பாட்டை வழங்க விரும்பவில்லை அல்லது பாதியிலேயே சந்திக்க முடியவில்லை. ஆனால் இந்த அமைப்பு, காப்பகத்தை இறக்குவதற்கான இணைப்பு வடிவத்தில் மின்னஞ்சல் மூலம் காணாமல் போன தரவை அவ்வப்போது பெறுவதற்கான வாய்ப்பை வழங்கியது.

மின்னஞ்சல்கள் அல்லது உடனடி தூதர்களிடமிருந்து தரவை சேகரிக்க வணிகம் விரும்பிய ஒரே வழக்கு இதுவல்ல என்பதை கவனத்தில் கொள்ள வேண்டும். இருப்பினும், இந்த விஷயத்தில், தரவின் ஒரு பகுதியை மட்டும் இந்த வழியில் வழங்கும் மூன்றாம் தரப்பு நிறுவனத்தை எங்களால் பாதிக்க முடியவில்லை.

அப்பாச்சி காற்றோட்டம்

ETL செயல்முறைகளை உருவாக்க, நாங்கள் பெரும்பாலும் Apache Airflow ஐப் பயன்படுத்துகிறோம். இந்த தொழில்நுட்பத்தைப் பற்றி அறிமுகமில்லாத ஒரு வாசகருக்கு இது சூழலிலும் பொதுவாகவும் எப்படி இருக்கிறது என்பதை நன்கு புரிந்துகொள்ள, நான் இரண்டு அறிமுகங்களை விவரிக்கிறேன்.

அப்பாச்சி ஏர்ஃப்ளோ என்பது ஒரு இலவச தளமாகும், இது பைத்தானில் ETL (எக்ஸ்ட்ராக்ட்-டிரான்ஸ்ஃபார்ம்-லோடிங்) செயல்முறைகளை உருவாக்க, செயல்படுத்த மற்றும் கண்காணிக்க பயன்படுகிறது. காற்றோட்டத்தின் முக்கிய கருத்து ஒரு இயக்கப்பட்ட அசைக்ளிக் வரைபடமாகும், அங்கு வரைபடத்தின் செங்குத்துகள் குறிப்பிட்ட செயல்முறைகள் மற்றும் வரைபடத்தின் விளிம்புகள் கட்டுப்பாடு அல்லது தகவலின் ஓட்டம் ஆகும். ஒரு செயல்முறையானது எந்தவொரு பைதான் செயல்பாட்டையும் அழைக்கலாம் அல்லது ஒரு வகுப்பின் சூழலில் பல செயல்பாடுகளை வரிசையாக அழைப்பதில் இருந்து மிகவும் சிக்கலான தர்க்கத்தைக் கொண்டிருக்கலாம். மிகவும் பொதுவான செயல்பாடுகளுக்கு, ஏற்கனவே பல ஆயத்த மேம்பாடுகள் உள்ளன, அவை செயல்முறைகளாகப் பயன்படுத்தப்படலாம். இத்தகைய வளர்ச்சிகளில் பின்வருவன அடங்கும்:

  • ஆபரேட்டர்கள் - ஒரு இடத்திலிருந்து மற்றொரு இடத்திற்கு தரவை மாற்றுவதற்கு, எடுத்துக்காட்டாக, தரவுத்தள அட்டவணையில் இருந்து தரவுக் கிடங்கிற்கு;
  • சென்சார்கள் - ஒரு குறிப்பிட்ட நிகழ்வு நிகழும் வரை காத்திருக்கவும் மற்றும் வரைபடத்தின் அடுத்தடுத்த செங்குத்துகளுக்கு கட்டுப்பாட்டு ஓட்டத்தை இயக்கவும்;
  • கொக்கிகள் - கீழ்-நிலை செயல்பாடுகளுக்கு, எடுத்துக்காட்டாக, தரவுத்தள அட்டவணையில் இருந்து தரவைப் பெற (அறிக்கைகளில் பயன்படுத்தப்படுகிறது);
  • மற்றும் பல.

இந்தக் கட்டுரையில் அப்பாச்சி காற்றோட்டத்தை விரிவாக விவரிப்பது பொருத்தமற்றது. சுருக்கமான அறிமுகங்களைப் பார்க்கலாம் இங்கே அல்லது இங்கே.

தரவு பெறுவதற்கான ஹூக்

முதலில், சிக்கலைத் தீர்க்க, நாம் ஒரு கொக்கி எழுத வேண்டும்:

  • மின்னஞ்சலுடன் இணைக்கவும்
  • சரியான கடிதத்தைக் கண்டுபிடி
  • கடிதத்திலிருந்து தரவைப் பெறுங்கள்.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

தர்க்கம் இதுதான்: நாங்கள் இணைக்கிறோம், கடைசியாக மிகவும் பொருத்தமான கடிதத்தைக் கண்டறிகிறோம், மற்றவர்கள் இருந்தால், அவற்றைப் புறக்கணிப்போம். இந்த செயல்பாடு பயன்படுத்தப்படுகிறது, ஏனெனில் பிந்தைய எழுத்துக்கள் முந்தையவற்றின் அனைத்து தரவையும் கொண்டிருக்கும். இது அவ்வாறு இல்லையென்றால், நீங்கள் எல்லா எழுத்துக்களின் வரிசையையும் திரும்பப் பெறலாம் அல்லது முதல் ஒன்றைச் செயலாக்கலாம், மீதமுள்ளவை அடுத்த பாஸில். பொதுவாக, எல்லாம், எப்போதும் போல், பணி சார்ந்தது.

ஹூக்கில் இரண்டு துணை செயல்பாடுகளைச் சேர்க்கிறோம்: ஒரு கோப்பைப் பதிவிறக்குவதற்கும், மின்னஞ்சலில் இருந்து இணைப்பைப் பயன்படுத்தி கோப்பைப் பதிவிறக்குவதற்கும். மூலம், அவை ஆபரேட்டருக்கு நகர்த்தப்படலாம், இது இந்த செயல்பாட்டைப் பயன்படுத்துவதற்கான அதிர்வெண்ணைப் பொறுத்தது. ஹூக்கில் வேறு என்ன சேர்க்க வேண்டும், மீண்டும், பணியைப் பொறுத்தது: கடிதத்தில் கோப்புகள் உடனடியாகப் பெறப்பட்டால், கடிதத்திற்கான இணைப்புகளை நீங்கள் பதிவிறக்கலாம், கடிதத்தில் தரவு பெறப்பட்டால், நீங்கள் கடிதத்தை அலச வேண்டும், முதலியன என் விஷயத்தில், கடிதம் காப்பகத்திற்கான ஒரு இணைப்புடன் வருகிறது, அதை நான் ஒரு குறிப்பிட்ட இடத்தில் வைத்து மேலும் செயலாக்க செயல்முறையைத் தொடங்க வேண்டும்.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

குறியீடு எளிமையானது, எனவே இதற்கு கூடுதல் விளக்கம் தேவையில்லை. imap_conn_id என்ற மேஜிக் லைனைப் பற்றி நான் உங்களுக்குச் சொல்கிறேன். Apache Airflow ஆனது சரம் அடையாளங்காட்டி மூலம் அணுகக்கூடிய இணைப்பு அளவுருக்களை (உள்நுழைவு, கடவுச்சொல், முகவரி மற்றும் பிற அளவுருக்கள்) சேமிக்கிறது. பார்வைக்கு, இணைப்பு மேலாண்மை இது போல் தெரிகிறது

அப்பாச்சி ஏர்ஃப்ளோவில் மின்னஞ்சலில் இருந்து தரவைப் பெறுவதற்கான ETL செயல்முறை

தரவுக்காக காத்திருக்க சென்சார்

மின்னஞ்சலில் இருந்து தரவை எவ்வாறு இணைப்பது மற்றும் பெறுவது என்பது எங்களுக்கு ஏற்கனவே தெரிந்திருப்பதால், அவர்களுக்காக காத்திருக்க இப்போது சென்சார் எழுதலாம். என் விஷயத்தில், ஒரு ஆபரேட்டரை உடனடியாக எழுதுவது வேலை செய்யவில்லை, அது ஏதேனும் இருந்தால், அது தரவைச் செயலாக்கும், ஏனென்றால் மற்ற மூலங்களிலிருந்து (API, டெலிபோனி) தொடர்புடைய தரவை எடுப்பது உட்பட, மின்னஞ்சலில் இருந்து பெறப்பட்ட தரவின் அடிப்படையில் பிற செயல்முறைகள் செயல்படுகின்றன. , இணைய அளவீடுகள், முதலியன) போன்றவை). நான் ஒரு உதாரணம் தருகிறேன். CRM அமைப்பில் ஒரு புதிய பயனர் தோன்றியுள்ளார், அவருடைய UUID பற்றி எங்களுக்கு இன்னும் தெரியவில்லை. பின்னர், SIP தொலைபேசியிலிருந்து தரவைப் பெற முயற்சிக்கும்போது, ​​அதன் UUID உடன் இணைக்கப்பட்ட அழைப்புகளைப் பெறுவோம், ஆனால் அவற்றைச் சேமித்து சரியாகப் பயன்படுத்த முடியாது. இதுபோன்ற விஷயங்களில், தரவுகளின் சார்புநிலையை மனதில் வைத்திருப்பது முக்கியம், குறிப்பாக அவை வெவ்வேறு ஆதாரங்களில் இருந்து இருந்தால். இவை நிச்சயமாக, தரவு ஒருமைப்பாட்டைப் பாதுகாக்க போதுமான நடவடிக்கைகள் இல்லை, ஆனால் சில சந்தர்ப்பங்களில் அவை அவசியம். ஆம், வளங்களை ஆக்கிரமிக்க சும்மா இருப்பதும் பகுத்தறிவற்றது.

எனவே, மின்னஞ்சலில் புதிய தகவல்கள் இருந்தால், எங்கள் சென்சார் வரைபடத்தின் அடுத்தடுத்த முனைகளைத் தொடங்கும், மேலும் முந்தைய தகவலை பொருத்தமற்றதாகக் குறிக்கும்.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

நாங்கள் தரவைப் பெற்று பயன்படுத்துகிறோம்

தரவைப் பெறவும் செயலாக்கவும், நீங்கள் ஒரு தனி ஆபரேட்டரை எழுதலாம், நீங்கள் தயாராக உள்ளவற்றைப் பயன்படுத்தலாம். தர்க்கம் இன்னும் அற்பமானதாக இருப்பதால் - கடிதத்திலிருந்து தரவை எடுக்க, எடுத்துக்காட்டாக, நிலையான பைதான் ஆபரேட்டரை நான் பரிந்துரைக்கிறேன்

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

மூலம், உங்கள் கார்ப்பரேட் அஞ்சல் mail.ru இல் இருந்தால், நீங்கள் பொருள், அனுப்புநர் போன்றவற்றின் அடிப்படையில் கடிதங்களைத் தேட முடியாது. மீண்டும் 2016 இல், அவர்கள் அதை அறிமுகப்படுத்துவதாக உறுதியளித்தனர், ஆனால் வெளிப்படையாக தங்கள் மனதை மாற்றிக்கொண்டனர். தேவையான கடிதங்களுக்கு தனி கோப்புறையை உருவாக்கி, அஞ்சல் வலை இடைமுகத்தில் தேவையான கடிதங்களுக்கான வடிப்பானை அமைத்து இந்த சிக்கலை தீர்த்தேன். எனவே, தேடலுக்கு தேவையான கடிதங்கள் மற்றும் நிபந்தனைகள் மட்டுமே, என் விஷயத்தில், (UNSEEN) இந்த கோப்புறையில் கிடைக்கும்.

சுருக்கமாக, எங்களிடம் பின்வரும் வரிசை உள்ளது: நிபந்தனைகளை பூர்த்தி செய்யும் புதிய கடிதங்கள் உள்ளனவா என்பதை நாங்கள் சரிபார்க்கிறோம், இருந்தால், கடைசி கடிதத்திலிருந்து இணைப்பைப் பயன்படுத்தி காப்பகத்தைப் பதிவிறக்குகிறோம்.
கடைசி புள்ளிகளின் கீழ், இந்த காப்பகம் திறக்கப்படும், காப்பகத்திலிருந்து தரவு அழிக்கப்பட்டு செயலாக்கப்படும், இதன் விளைவாக, முழு விஷயமும் ETL செயல்முறையின் குழாய்க்கு மேலும் செல்லும், ஆனால் இது ஏற்கனவே அப்பால் உள்ளது கட்டுரையின் நோக்கம். இது சுவாரஸ்யமாகவும் பயனுள்ளதாகவும் இருந்தால், அப்பாச்சி ஏர்ஃப்ளோவுக்கான ETL தீர்வுகள் மற்றும் அவற்றின் பகுதிகளை மகிழ்ச்சியுடன் தொடர்ந்து விவரிக்கிறேன்.

ஆதாரம்: www.habr.com

கருத்தைச் சேர்