በApache Airflow ውስጥ ከኢሜይል መረጃ የማግኘት ETL ሂደት

በApache Airflow ውስጥ ከኢሜይል መረጃ የማግኘት ETL ሂደት

ምንም ያህል ቴክኖሎጂ ቢዳብር፣ ጊዜ ያለፈባቸው አካሄዶች ሁልጊዜም ከዕድገት ጀርባ ይከተላሉ። ይህ በተቀላጠፈ ሽግግር, በሰዎች ምክንያቶች, በቴክኖሎጂ ፍላጎቶች ወይም በሌላ ነገር ምክንያት ሊሆን ይችላል. በመረጃ ማቀናበሪያ መስክ, የውሂብ ምንጮች በዚህ ክፍል ውስጥ በጣም ገላጭ ናቸው. ይህንን ለማስወገድ ምንም ያህል ህልም ብናልም ፣ ግን እስካሁን ድረስ የመረጃው ክፍል በፈጣን መልእክቶች እና ኢሜል ይላካል ፣ የበለጠ ጥንታዊ ቅርጸቶችን ሳይጠቅስ። ከኢሜይሎች መረጃን እንዴት መውሰድ እንደምትችል በማሳየት ለ Apache Airflow ካሉት አማራጮች አንዱን እንድትበታተን እጋብዛለሁ።

prehistory

ብዙ መረጃዎች አሁንም በኢሜል ይተላለፋሉ፣ ከግለሰባዊ ግንኙነቶች ወደ ኩባንያዎች መስተጋብር ደረጃዎች። መረጃን ለማግኘት በይነገጽ መፃፍ ወይም ይህንን መረጃ ወደ ምቹ ምንጮች የሚገቡ ሰዎችን በቢሮ ውስጥ ማስገባት ቢቻል ጥሩ ነው ነገር ግን ብዙ ጊዜ ይህ በቀላሉ ላይሆን ይችላል። ያጋጠመኝ ልዩ ተግባር ዝነኛውን የ CRM ስርዓት ከመረጃ ማከማቻ ጋር እና ከዚያም ከ OLAP ስርዓት ጋር ማገናኘት ነበር። በታሪካዊ ሁኔታ ለድርጅታችን የዚህ ስርዓት አጠቃቀሙ በአንድ የተወሰነ የንግድ መስክ ውስጥ ምቹ ነበር ። ስለዚህ፣ ሁሉም ሰው ከዚህ የሶስተኛ ወገን ስርዓት መረጃ ጋር መስራት መቻልን በእውነት ፈልጎ ነበር። በመጀመሪያ ፣ በእርግጥ ፣ ከተከፈተ ኤፒአይ መረጃ የማግኘት እድሉ ተጠንቷል። እንደ አለመታደል ሆኖ ኤፒአይ ሁሉንም አስፈላጊ መረጃዎች ማግኘት አልሸፈነም ፣ እና በቀላል አነጋገር ፣ በብዙ መንገዶች ጠማማ ነበር ፣ እና ቴክኒካዊ ድጋፍ የበለጠ አጠቃላይ ተግባራትን ለማቅረብ በግማሽ መንገድ መገናኘት አልፈለገም ወይም አልቻለም። ነገር ግን ይህ ስርዓት ማህደሩን ለማራገፍ በአገናኝ መልክ የጎደለውን መረጃ በየጊዜው በፖስታ ለመቀበል እድሉን ሰጥቷል።

ንግዱ ከኢሜል ወይም ፈጣን መልእክተኞች መረጃን ለመሰብሰብ የፈለገበት ሁኔታ ይህ ብቻ እንዳልሆነ ልብ ሊባል ይገባል። ነገር ግን, በዚህ ሁኔታ, በዚህ መንገድ ብቻ የመረጃውን ክፍል በሚያቀርበው የሶስተኛ ወገን ኩባንያ ላይ ተጽዕኖ ማድረግ አልቻልንም.

Apache የአየር ፍሰት

የኢቲኤል ሂደቶችን ለመገንባት ብዙ ጊዜ የምንጠቀመው Apache Airflow ነው። ይህንን ቴክኖሎጂ የማያውቅ አንባቢ በዐውደ-ጽሑፉ እና በአጠቃላይ እንዴት እንደሚመስል በተሻለ ሁኔታ እንዲረዳ, ሁለት መግቢያዎችን እገልጻለሁ.

Apache Airflow ETL (Extract-Transform-Loading) ሂደቶችን በፓይዘን ውስጥ ለመገንባት፣ ለማስፈጸም እና ለመቆጣጠር የሚያገለግል ነፃ መድረክ ነው። በአየር ፍሰት ውስጥ ያለው ዋናው ፅንሰ-ሀሳብ በቀጥታ የሚመራ አሲሊክ ግራፍ ሲሆን የግራፉ ጫፎች የተወሰኑ ሂደቶች ሲሆኑ የግራፉ ጠርዞች ደግሞ የቁጥጥር ወይም የመረጃ ፍሰት ናቸው። አንድ ሂደት በቀላሉ ማንኛውንም የፓይዘን ተግባር ሊጠራ ይችላል ወይም በክፍሉ አውድ ውስጥ ብዙ ተግባራትን በቅደም ተከተል ከመጥራት የበለጠ ውስብስብ አመክንዮ ሊኖረው ይችላል። በጣም በተደጋጋሚ ለሚደረጉ ክዋኔዎች፣ እንደ ሂደቶች ሆነው ሊያገለግሉ የሚችሉ ብዙ የተዘጋጁ እድገቶች አሉ። እንደነዚህ ያሉ እድገቶች የሚከተሉትን ያካትታሉ:

  • ኦፕሬተሮች - መረጃን ከአንድ ቦታ ወደ ሌላ ለማስተላለፍ, ለምሳሌ ከመረጃ ቋት ሰንጠረዥ ወደ የውሂብ ጎተራ;
  • ዳሳሾች - የአንድ የተወሰነ ክስተት ክስተትን ለመጠበቅ እና የመቆጣጠሪያውን ፍሰት ወደ ግራፉ ተከታይ ጫፎች ለመምራት;
  • መንጠቆዎች - ለዝቅተኛ ደረጃ ስራዎች, ለምሳሌ, ከዳታቤዝ ሰንጠረዥ መረጃ ለማግኘት (በመግለጫዎች ውስጥ ጥቅም ላይ ይውላል);
  • እና የመሳሰሉት.

በዚህ ጽሑፍ ውስጥ Apache Airflowን በዝርዝር መግለጹ ተገቢ አይሆንም። አጭር መግቢያዎችን ማየት ይቻላል እዚህ ወይም እዚህ.

መረጃ ለማግኘት መንጠቆ

በመጀመሪያ ደረጃ, ችግሩን ለመፍታት, እኛ የምንችለውን መንጠቆ መጻፍ ያስፈልገናል:

  • ከኢሜል ጋር ይገናኙ
  • ትክክለኛውን ደብዳቤ ያግኙ
  • ከደብዳቤው መረጃ ይቀበሉ.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

አመክንዮው ይህ ነው: እንገናኛለን, የመጨረሻውን በጣም አስፈላጊ የሆነውን ደብዳቤ እናገኛለን, ሌሎች ካሉ, ችላ እንላለን. ይህ ተግባር ጥቅም ላይ ይውላል, ምክንያቱም በኋላ ላይ ያሉ ፊደሎች የቀደሙትን ሁሉንም መረጃዎች ይይዛሉ. ጉዳዩ ይህ ካልሆነ የሁሉም ፊደሎች ድርድር መመለስ ወይም የመጀመሪያውን ማስኬድ እና የቀረውን በሚቀጥለው ማለፊያ ላይ ማድረግ ይችላሉ። በአጠቃላይ, ሁሉም ነገር, እንደ ሁልጊዜ, በስራው ላይ የተመሰረተ ነው.

ወደ መንጠቆው ሁለት ረዳት ተግባራትን እንጨምራለን-ፋይል ለማውረድ እና ከኢሜል አገናኝ በመጠቀም ፋይልን ለማውረድ። በነገራችን ላይ ወደ ኦፕሬተሩ ሊዘዋወሩ ይችላሉ, ይህንን ተግባር በሚጠቀሙበት ድግግሞሽ ላይ ይወሰናል. ወደ መንጠቆው ሌላ ምን መጨመር እንዳለበት, እንደገና, እንደ ሥራው ይወሰናል: ፋይሎች በደብዳቤው ውስጥ ወዲያውኑ ከተቀበሉ, በደብዳቤው ላይ አባሪዎችን ማውረድ ይችላሉ, ውሂቡ በደብዳቤው ውስጥ ከተቀበለ, ደብዳቤውን መተንተን ያስፈልግዎታል. ወዘተ. በእኔ ሁኔታ, ደብዳቤው ወደ ማህደሩ ከአንድ አገናኝ ጋር ይመጣል, ይህም በተወሰነ ቦታ ላይ ማስቀመጥ እና ተጨማሪ ሂደቱን መጀመር አለብኝ.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

ኮዱ ቀላል ነው, ስለዚህ ተጨማሪ ማብራሪያ አያስፈልገውም. ስለ አስማት መስመር imap_conn_id ብቻ እነግራችኋለሁ። Apache Airflow በሕብረቁምፊ ለዪ ሊደረስባቸው የሚችሉ የግንኙነት መለኪያዎችን (መግቢያ፣ የይለፍ ቃል፣ አድራሻ እና ሌሎች መለኪያዎች) ያከማቻል። በእይታ ፣ የግንኙነት አስተዳደር ይህንን ይመስላል

በApache Airflow ውስጥ ከኢሜይል መረጃ የማግኘት ETL ሂደት

ውሂብን ለመጠበቅ ዳሳሽ

ከደብዳቤ መረጃን እንዴት ማገናኘት እና መቀበል እንዳለብን ስለምናውቅ አሁን እነሱን ለመጠበቅ ዳሳሽ መፃፍ እንችላለን። በእኔ ሁኔታ መረጃውን የሚያከናውን ኦፕሬተርን ወዲያውኑ መፃፍ አልሰራም ፣ ካለ ፣ ምክንያቱም ሌሎች ሂደቶች ከደብዳቤ በተቀበሉት መረጃ ላይ ተመስርተው ይሰራሉ ​​\uXNUMXb\uXNUMXbከሌሎች ምንጮች (ኤፒአይ ፣ ቴሌፎን) ተዛማጅ መረጃዎችን የሚወስዱትን ጨምሮ ። ፣ የድር መለኪያዎች ፣ ወዘተ.) ወዘተ) ። አንድ ምሳሌ እሰጥሃለሁ። አዲስ ተጠቃሚ በCRM ስርዓት ውስጥ ታይቷል፣ እና አሁንም ስለሱ UUID አናውቅም። ከዚያ፣ ከSIP ቴሌፎን መረጃ ለመቀበል ስንሞክር፣ ከ UUID ጋር የተሳሰሩ ጥሪዎች ይደርሱናል፣ ነገር ግን በትክክል ማስቀመጥ እና መጠቀም አንችልም። በእንደዚህ አይነት ጉዳዮች ላይ በተለይም ከተለያዩ ምንጮች የመጡ ከሆነ የመረጃውን ጥገኛነት ግምት ውስጥ ማስገባት አስፈላጊ ነው. እነዚህ በእርግጥ የውሂብ ታማኝነትን ለመጠበቅ በቂ ያልሆኑ እርምጃዎች ናቸው, ግን በአንዳንድ ሁኔታዎች አስፈላጊ ናቸው. አዎን፣ እና ሀብትን ለመያዝ ስራ ፈት ማለት እንዲሁ ምክንያታዊነት የጎደለው ነው።

ስለዚህ የእኛ ዳሳሽ በፖስታ ውስጥ ትኩስ መረጃ ካለ የግራፉን ተከታይ ጫፎች ያስነሳል እና እንዲሁም ያለፈውን መረጃ እንደ አስፈላጊነቱ ምልክት ያደርጋል።

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

መረጃ ተቀብለን እንጠቀማለን።

መረጃን ለመቀበል እና ለማስኬድ የተለየ ኦፕሬተር መጻፍ ይችላሉ, ዝግጁ የሆኑትን መጠቀም ይችላሉ. እስካሁን ድረስ አመክንዮው ትንሽ ስለሆነ - ከደብዳቤው ላይ መረጃን ለመውሰድ ፣ ለምሳሌ ፣ መደበኛውን PythonOperator እጠቁማለሁ።

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

በነገራችን ላይ የድርጅትዎ ደብዳቤ በ mail.ru ላይ ከሆነ ደብዳቤዎችን በርዕስ ፣ ላኪ ፣ ወዘተ መፈለግ አይችሉም ። እ.ኤ.አ. በ 2016 ፣ እሱን ለማስተዋወቅ ቃል ገብተዋል ፣ ግን በግልጽ ሀሳባቸውን ቀይረዋል። ለአስፈላጊ ፊደሎች የተለየ አቃፊ በመፍጠር እና በደብዳቤ ድር በይነገጽ ውስጥ አስፈላጊ ለሆኑ ፊደሎች ማጣሪያ በማዘጋጀት ይህንን ችግር ፈታሁት። ስለዚህ, ለፍለጋ አስፈላጊ የሆኑ ፊደሎች እና ሁኔታዎች ብቻ, በእኔ ሁኔታ, በቀላሉ (ያልታዩ) ወደዚህ አቃፊ ይግቡ.

ማጠቃለያ, የሚከተለው ቅደም ተከተል አለን: ሁኔታዎችን የሚያሟሉ አዲስ ፊደሎች መኖራቸውን እናረጋግጣለን, ካሉ, ከዚያም ከመጨረሻው ፊደል አገናኙን በመጠቀም ማህደሩን እናወርዳለን.
በመጨረሻዎቹ ነጥቦች ስር ይህ መዝገብ እንደማይታሸግ ፣ ከማህደሩ ውስጥ ያለው መረጃ ይጸዳል እና ይከናወናል ፣ እና በዚህ ምክንያት ሁሉም ነገር ወደ ኢቲኤል ሂደት ቧንቧ መስመር ይሄዳል ፣ ግን ይህ ቀድሞውኑ ያለፈ ነው ። የጽሑፉ ወሰን. አስደሳች እና ጠቃሚ ሆኖ ከተገኘ፣ የ ETL መፍትሄዎችን እና ክፍሎቻቸውን ለ Apache Airflow መግለጻቸውን በደስታ እቀጥላለሁ።

ምንጭ: hab.com

አስተያየት ያክሉ