అపాచీ ఎయిర్‌ఫ్లో ఇమెయిల్ నుండి డేటాను పొందడానికి ETL ప్రక్రియ

అపాచీ ఎయిర్‌ఫ్లో ఇమెయిల్ నుండి డేటాను పొందడానికి ETL ప్రక్రియ

సాంకేతికత ఎంత అభివృద్ధి చెందినా, కాలం చెల్లిన విధానాల శ్రేణి ఎల్లప్పుడూ అభివృద్ధి వెనుక ఉంటుంది. ఇది సాఫీగా మారడం, మానవ కారకాలు, సాంకేతిక అవసరాలు లేదా మరేదైనా కారణంగా కావచ్చు. డేటా ప్రాసెసింగ్ రంగంలో, డేటా సోర్స్‌లు ఈ భాగంలో ఎక్కువగా బహిర్గతం అవుతాయి. దీన్ని వదిలించుకోవాలని మనం ఎంత కలలు కంటున్నామో, కానీ ఇప్పటివరకు డేటాలో కొంత భాగం ఇన్‌స్టంట్ మెసెంజర్‌లు మరియు ఇమెయిల్‌లలో పంపబడుతుంది, ఎక్కువ పురాతన ఫార్మాట్‌ల గురించి చెప్పనవసరం లేదు. మీరు ఇమెయిల్‌ల నుండి డేటాను ఎలా తీసుకోవచ్చో వివరిస్తూ, Apache Airflow కోసం ఎంపికలలో ఒకదానిని విడదీయమని నేను మిమ్మల్ని ఆహ్వానిస్తున్నాను.

పూర్వచరిత్ర

వ్యక్తుల మధ్య కమ్యూనికేషన్‌ల నుండి కంపెనీల మధ్య పరస్పర చర్యల ప్రమాణాల వరకు చాలా డేటా ఇప్పటికీ ఇమెయిల్ ద్వారా ప్రసారం చేయబడుతుంది. మీరు డేటాను స్వీకరించడానికి ఇంటర్‌ఫేస్‌ను వ్రాయగలిగితే లేదా ఈ సమాచారాన్ని మరింత అనుకూలమైన మూలాల్లోకి నమోదు చేసే వ్యక్తులను కార్యాలయంలో ఉంచడం మంచిది, కానీ తరచుగా ఈ అవకాశం ఉండకపోవచ్చు. నేను ఎదుర్కొన్న నిర్దిష్ట పని ఏమిటంటే, ప్రసిద్ధ CRM సిస్టమ్‌ను డేటా వేర్‌హౌస్‌కి, ఆపై OLAP సిస్టమ్‌కి కనెక్ట్ చేయడం. చారిత్రాత్మకంగా, మా కంపెనీకి, ఈ వ్యవస్థ యొక్క ఉపయోగం ఒక నిర్దిష్ట వ్యాపారంలో సౌకర్యవంతంగా ఉంటుంది. అందువల్ల, ప్రతి ఒక్కరూ నిజంగా ఈ మూడవ పక్ష సిస్టమ్ నుండి డేటాతో పనిచేయాలని కోరుకున్నారు. అన్నింటిలో మొదటిది, ఓపెన్ API నుండి డేటాను పొందే అవకాశం అన్వేషించబడింది. దురదృష్టవశాత్తూ, API అవసరమైన మొత్తం డేటాను పొందడం లేదు, మరియు సాధారణ పరంగా, ఇది చాలా వంకరగా ఉంది మరియు మరింత సమగ్రమైన కార్యాచరణను అందించడానికి సాంకేతిక మద్దతు కోరుకోలేదు లేదా సగం చేరుకోలేకపోయింది. కానీ ఈ వ్యవస్థ ఆర్కైవ్‌ను డౌన్‌లోడ్ చేయడానికి లింక్ రూపంలో ఇమెయిల్ ద్వారా తప్పిపోయిన డేటాను కాలానుగుణంగా స్వీకరించడానికి అవకాశాన్ని అందించింది.

వ్యాపారం ఇమెయిల్‌లు లేదా తక్షణ మెసెంజర్‌ల నుండి డేటాను సేకరించాలనుకునే ఏకైక సందర్భం ఇది కాదని గమనించాలి. అయితే, ఈ సందర్భంలో, ఈ విధంగా మాత్రమే డేటాలో కొంత భాగాన్ని అందించే మూడవ పక్ష కంపెనీని మేము ప్రభావితం చేయలేకపోయాము.

అపాచీ ఎయిర్‌ఫ్లో

ETL ప్రక్రియలను రూపొందించడానికి, మేము తరచుగా Apache Airflowని ఉపయోగిస్తాము. ఈ సాంకేతికత గురించి తెలియని పాఠకుడికి సందర్భంలో మరియు సాధారణంగా ఎలా కనిపిస్తుందో బాగా అర్థం చేసుకోవడానికి, నేను కొన్ని పరిచయాలను వివరిస్తాను.

Apache Airflow అనేది పైథాన్‌లో ETL (ఎక్స్‌ట్రాక్ట్-ట్రాన్స్‌ఫార్మ్-లోడింగ్) ప్రక్రియలను నిర్మించడానికి, అమలు చేయడానికి మరియు పర్యవేక్షించడానికి ఉపయోగించే ఒక ఉచిత ప్లాట్‌ఫారమ్. వాయుప్రవాహంలో ప్రధాన భావన నిర్దేశిత అసైక్లిక్ గ్రాఫ్, ఇక్కడ గ్రాఫ్ యొక్క శీర్షాలు నిర్దిష్ట ప్రక్రియలు మరియు గ్రాఫ్ యొక్క అంచులు నియంత్రణ లేదా సమాచారం యొక్క ప్రవాహం. ఒక ప్రక్రియ ఏదైనా పైథాన్ ఫంక్షన్‌ని పిలుస్తుంది లేదా క్లాస్ సందర్భంలో అనేక ఫంక్షన్‌లను వరుసగా కాల్ చేయడం నుండి మరింత సంక్లిష్టమైన తర్కాన్ని కలిగి ఉంటుంది. చాలా తరచుగా జరిగే కార్యకలాపాల కోసం, ప్రక్రియలుగా ఉపయోగించబడే అనేక రెడీమేడ్ డెవలప్‌మెంట్‌లు ఇప్పటికే ఉన్నాయి. అటువంటి అభివృద్ధిలో ఇవి ఉన్నాయి:

  • ఆపరేటర్లు - డేటాను ఒక ప్రదేశం నుండి మరొక ప్రదేశానికి బదిలీ చేయడానికి, ఉదాహరణకు, డేటాబేస్ పట్టిక నుండి డేటా గిడ్డంగికి;
  • సెన్సార్లు - ఒక నిర్దిష్ట సంఘటన సంభవించే వరకు వేచి ఉండండి మరియు గ్రాఫ్ యొక్క తదుపరి శీర్షాలకు నియంత్రణ ప్రవాహాన్ని నిర్దేశించడానికి;
  • hooks - దిగువ-స్థాయి కార్యకలాపాల కోసం, ఉదాహరణకు, డేటాబేస్ పట్టిక నుండి డేటాను పొందడానికి (స్టేట్‌మెంట్‌లలో ఉపయోగించబడుతుంది);
  • మరియు అందువలన న.

ఈ కథనంలో అపాచీ ఎయిర్‌ఫ్లో వివరంగా వివరించడం సరికాదు. సంక్షిప్త పరిచయాలను చూడవచ్చు ఇక్కడ లేదా ఇక్కడ.

డేటా పొందడానికి హుక్

అన్నింటిలో మొదటిది, సమస్యను పరిష్కరించడానికి, మనం చేయగలిగిన హుక్‌ను వ్రాయాలి:

  • ఇమెయిల్‌కి కనెక్ట్ చేయండి
  • సరైన అక్షరాన్ని కనుగొనండి
  • లేఖ నుండి డేటాను స్వీకరించండి.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

తర్కం ఇది: మేము కనెక్ట్ చేస్తాము, చివరి అత్యంత సంబంధిత లేఖను కనుగొంటాము, ఇతరులు ఉంటే, మేము వాటిని విస్మరిస్తాము. ఈ ఫంక్షన్ ఉపయోగించబడుతుంది ఎందుకంటే తదుపరి సందేశాలు మునుపటి వాటి నుండి మొత్తం డేటాను కలిగి ఉంటాయి. ఇది కాకపోతే, మీరు అన్ని అక్షరాల శ్రేణిని తిరిగి ఇవ్వవచ్చు లేదా మొదటిది మరియు మిగిలిన వాటిని తదుపరి పాస్‌లో ప్రాసెస్ చేయవచ్చు. సాధారణంగా, ప్రతిదీ, ఎప్పటిలాగే, పని మీద ఆధారపడి ఉంటుంది.

మేము హుక్‌కు రెండు సహాయక విధులను జోడిస్తాము: ఫైల్‌ను డౌన్‌లోడ్ చేయడానికి మరియు ఇమెయిల్ నుండి లింక్‌ని ఉపయోగించి ఫైల్‌ను డౌన్‌లోడ్ చేయడానికి. మార్గం ద్వారా, వాటిని ఆపరేటర్‌కు తరలించవచ్చు, ఇది ఈ కార్యాచరణను ఉపయోగించే ఫ్రీక్వెన్సీపై ఆధారపడి ఉంటుంది. హుక్‌కి ఇంకా ఏమి జోడించాలి, మళ్ళీ, పనిపై ఆధారపడి ఉంటుంది: లేఖలో ఫైల్‌లు వెంటనే స్వీకరించబడితే, మీరు లేఖకు జోడింపులను డౌన్‌లోడ్ చేసుకోవచ్చు, లేఖలో డేటా అందినట్లయితే, మీరు లేఖను అన్వయించాలి, మొదలైనవి నా విషయంలో, లేఖ ఆర్కైవ్‌కి ఒక లింక్‌తో వస్తుంది, దానిని నేను నిర్దిష్ట ప్రదేశంలో ఉంచి తదుపరి ప్రాసెసింగ్ ప్రక్రియను ప్రారంభించాలి.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

కోడ్ చాలా సులభం, కాబట్టి దీనికి మరింత వివరణ అవసరం లేదు. imap_conn_id అనే మ్యాజిక్ లైన్ గురించి నేను మీకు చెప్తాను. అపాచీ ఎయిర్‌ఫ్లో స్ట్రింగ్ ఐడెంటిఫైయర్ ద్వారా యాక్సెస్ చేయగల కనెక్షన్ పారామితులను (లాగిన్, పాస్‌వర్డ్, చిరునామా మరియు ఇతర పారామీటర్‌లు) నిల్వ చేస్తుంది. దృశ్యమానంగా, కనెక్షన్ నిర్వహణ ఇలా కనిపిస్తుంది

అపాచీ ఎయిర్‌ఫ్లో ఇమెయిల్ నుండి డేటాను పొందడానికి ETL ప్రక్రియ

డేటా కోసం వేచి ఉండటానికి సెన్సార్

మెయిల్ నుండి డేటాను ఎలా కనెక్ట్ చేయాలో మరియు స్వీకరించాలో మాకు ఇప్పటికే తెలుసు కాబట్టి, దాని కోసం వేచి ఉండటానికి ఇప్పుడు సెన్సార్‌ను వ్రాయవచ్చు. నా విషయంలో, డేటాను ప్రాసెస్ చేసే ఆపరేటర్‌ని వెంటనే వ్రాయడం, ఏదైనా ఉంటే, పని చేయలేదు, ఎందుకంటే ఇతర ప్రక్రియలు ఇతర మూలాల (API, టెలిఫోనీ, వెబ్ మెట్రిక్‌లు,) నుండి సంబంధిత డేటాను తీసుకునే వాటితో సహా మెయిల్ నుండి అందుకున్న డేటా ఆధారంగా పని చేస్తాయి etc.) etc.). నేను మీకు ఒక ఉదాహరణ ఇస్తాను. CRM సిస్టమ్‌లో కొత్త వినియోగదారు కనిపించారు మరియు అతని UUID గురించి మాకు ఇంకా తెలియదు. అప్పుడు, మేము SIP టెలిఫోనీ నుండి డేటాను స్వీకరించడానికి ప్రయత్నించినప్పుడు, మేము దాని UUIDకి అనుసంధానించబడిన కాల్‌లను స్వీకరిస్తాము, కానీ మేము వాటిని సరిగ్గా సేవ్ చేయలేము మరియు ఉపయోగించలేము. అటువంటి ప్రశ్నలలో, డేటా యొక్క ఆధారపడటాన్ని గుర్తుంచుకోవడం ముఖ్యం, ప్రత్యేకించి అవి వేర్వేరు మూలాల నుండి వచ్చినట్లయితే. వాస్తవానికి, ఇవి డేటా సమగ్రతను కాపాడటానికి సరిపోని చర్యలు, కానీ కొన్ని సందర్భాల్లో అవి అవసరం. అవును, మరియు వనరులను ఆక్రమించడానికి పనిలేకుండా ఉండటం కూడా అహేతుకం.

ఈ విధంగా, మెయిల్‌లో తాజా సమాచారం ఉన్నట్లయితే మా సెన్సార్ గ్రాఫ్ యొక్క తదుపరి శీర్షాలను ప్రారంభిస్తుంది మరియు మునుపటి సమాచారాన్ని అసంబద్ధం అని గుర్తు చేస్తుంది.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

మేము డేటాను స్వీకరిస్తాము మరియు ఉపయోగిస్తాము

డేటాను స్వీకరించడానికి మరియు ప్రాసెస్ చేయడానికి, మీరు ప్రత్యేక ఆపరేటర్‌ను వ్రాయవచ్చు లేదా మీరు సిద్ధంగా ఉన్న వాటిని ఉపయోగించవచ్చు. తర్కం ఇప్పటికీ అల్పమైనది కాబట్టి - లేఖ నుండి డేటాను తీసుకోవడానికి, ఒక ఉదాహరణగా నేను ప్రామాణిక పైథాన్ ఆపరేటర్‌ని సూచిస్తున్నాను

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

మార్గం ద్వారా, మీ కార్పొరేట్ మెయిల్ కూడా mail.ruలో ఉంటే, మీరు సబ్జెక్ట్, పంపినవారు మొదలైనవాటి ద్వారా అక్షరాల కోసం శోధించలేరు. తిరిగి 2016 లో, వారు దానిని ప్రవేశపెడతామని వాగ్దానం చేసారు, కానీ స్పష్టంగా వారి మనసు మార్చుకున్నారు. నేను అవసరమైన అక్షరాల కోసం ప్రత్యేక ఫోల్డర్‌ను సృష్టించడం ద్వారా మరియు మెయిల్ వెబ్ ఇంటర్‌ఫేస్‌లో అవసరమైన అక్షరాల కోసం ఫిల్టర్‌ను సెటప్ చేయడం ద్వారా ఈ సమస్యను పరిష్కరించాను. అందువల్ల, శోధన కోసం అవసరమైన అక్షరాలు మరియు షరతులు మాత్రమే, నా విషయంలో, కేవలం (UNSEEN) ఈ ఫోల్డర్‌లోకి ప్రవేశించండి.

సంగ్రహంగా, మేము క్రింది క్రమాన్ని కలిగి ఉన్నాము: షరతులకు అనుగుణంగా కొత్త అక్షరాలు ఉన్నాయా అని మేము తనిఖీ చేస్తాము, ఉంటే, చివరి అక్షరం నుండి లింక్ని ఉపయోగించి మేము ఆర్కైవ్ను డౌన్లోడ్ చేస్తాము.
చివరి ఎలిప్సెస్ కింద, ఈ ఆర్కైవ్ అన్‌ప్యాక్ చేయబడుతుందని విస్మరించబడింది, ఆర్కైవ్ నుండి డేటా శుభ్రం చేయబడుతుంది మరియు ప్రాసెస్ చేయబడుతుంది మరియు ఫలితంగా, ఈ మొత్తం ETL ప్రాసెస్ పైప్‌లైన్‌కు మరింత ముందుకు వెళుతుంది, అయితే ఇది ఇప్పటికే పరిధికి మించినది వ్యాసం యొక్క. ఇది ఆసక్తికరంగా మరియు ఉపయోగకరంగా మారినట్లయితే, అపాచీ ఎయిర్‌ఫ్లో కోసం ETL సొల్యూషన్‌లు మరియు వాటి భాగాలను వివరించడం కొనసాగించడానికి నేను సంతోషిస్తాను.

మూలం: www.habr.com

ఒక వ్యాఖ్యను జోడించండి