Apache Airflow жүйесінде электрондық поштадан деректерді алудың ETL процесі

Apache Airflow жүйесінде электрондық поштадан деректерді алудың ETL процесі

Технология қаншалықты дамығанымен, даму әрқашан ескірген тәсілдер қатарымен жүреді. Бұл біркелкі өтуге, адам факторларына, технологиялық қажеттіліктерге немесе басқа нәрсеге байланысты болуы мүмкін. Мәліметтерді өңдеу саласында бұл бөлімде ең көп ашылатын деректер көздері болып табылады. Бұдан құтылуды қанша армандасақ та, әзірге деректердің бір бөлігі архаикалық пішімдерді айтпағанда, жедел хабаршылар мен электрондық пошталарға жіберіледі. Мен сізді электрондық пошталардан деректерді қалай жинауға болатынын көрсететін Apache Airflow опцияларының бірін қарауға шақырамын.

тарихын

Көптеген деректер әлі күнге дейін электрондық пошта арқылы, тұлғааралық байланыстардан компаниялар арасындағы өзара әрекеттесу стандарттарына дейін тасымалданады. Деректерді алу үшін интерфейсті жазу немесе кеңсеге осы ақпаратты ыңғайлырақ көздерге енгізетін адамдарды орналастыру мүмкін болса жақсы, бірақ көбінесе бұл мүмкін болмауы мүмкін. Менің алдында тұрған нақты тапсырма атышулы CRM жүйесін деректер қоймасына, содан кейін OLAP жүйесіне қосу болды. Біздің компания үшін бұл жүйені пайдалану бизнестің белгілі бір саласында ыңғайлы болғаны тарихи болды. Сондықтан барлығы осы үшінші тарап жүйесінің деректерімен де жұмыс істей алуды қалайды. Ең алдымен, әрине, ашық API-ден деректерді алу мүмкіндігі зерттелді. Өкінішке орай, API барлық қажетті деректерді алуды қамтымады және қарапайым тілмен айтқанда, ол негізінен қисық болды және техникалық қолдау толық функционалдылықты қамтамасыз ету үшін жарты жолды қаламады немесе қанағаттандыра алмады. Бірақ бұл жүйе мұрағатты жүктеуге сілтеме түрінде жетіспейтін деректерді электрондық пошта арқылы мерзімді түрде алуға мүмкіндік берді.

Айта кету керек, бұл бизнес электрондық пошталардан немесе жедел хабаршылардан деректерді жинағысы келетін жалғыз жағдай емес. Дегенмен, бұл жағдайда деректердің бір бөлігін тек осылайша қамтамасыз ететін үшінші тарап компаниясына әсер ете алмадық.

Apache ауа ағыны

ETL процестерін құру үшін біз көбінесе Apache Airflow қолданамыз. Бұл технологиямен таныс емес оқырман оның контексте және жалпы қалай көрінетінін жақсы түсінуі үшін мен бірнеше кіріспелерді сипаттаймын.

Apache Airflow - бұл Python жүйесінде ETL (Extract-Transform-Loading) процестерін құру, іске қосу және бақылау үшін пайдаланылатын тегін платформа. Ауа ағынындағы негізгі түсінік бағытталған ациклді график болып табылады, мұнда графиктің шыңдары нақты процестер болып табылады, ал графиктің шеттері басқару немесе ақпарат ағыны болып табылады. Процесс кез келген Python функциясын жай ғана шақыра алады немесе оның сынып контекстінде бірнеше функцияларды дәйекті түрде шақырудың күрделі логикасы болуы мүмкін. Ең көп таралған операциялар үшін процесс ретінде пайдалануға болатын көптеген дайын әзірлемелер бар. Мұндай дамуларға мыналар жатады:

  • операторлар – деректерді бір орыннан екінші орынға, мысалы, мәліметтер қоры кестесінен деректер қоймасына ауыстыруға арналған;
  • датчиктер – белгілі бір оқиғаның пайда болуын күту және басқару ағынын графиктің келесі шыңдарына бағыттау;
  • ілгектер – төменгі деңгейдегі операциялар үшін, мысалы, мәліметтер қоры кестесінен деректерді алу үшін (мәлімдемелерде қолданылады);
  • және т.б.

Осы мақалада Apache Airflow-ты егжей-тегжейлі сипаттау орынсыз болар еді. Қысқаша кіріспелерді көруге болады осында немесе осында.

Деректерді алуға арналған ілмек

Ең алдымен, мәселені шешу үшін бізге ілмек жазу керек, оның көмегімен біз:

  • электрондық поштаға қосылу;
  • дұрыс әріпті табыңыз
  • хаттан мәліметтер алу.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Логика мынада: біз қосыламыз, ең соңғы әріпті табамыз, егер басқалары болса, біз оларды елемейміз. Бұл функция пайдаланылады, себебі кейінгі хабарлар алдыңғылардың барлық деректерін қамтиды. Егер олай болмаса, онда сіз барлық әріптердің массивін қайтара аласыз немесе біріншісін және қалғанын келесі жолда өңдеуге болады. Жалпы, әдеттегідей, бәрі тапсырмаға байланысты.

Біз ілмекке екі көмекші функцияны қосамыз: файлды жүктеп алу және электрондық поштадан сілтеме арқылы файлды жүктеу үшін. Айтпақшы, оларды операторға ауыстыруға болады, бұл осы функционалдылықты пайдалану жиілігіне байланысты. Ілмекке тағы не қосу керек, тағы да тапсырмаға байланысты: егер файлдар хатта бірден алынса, онда хатқа қосымшаларды жүктеп алуға болады, егер хатта деректер алынған болса, онда хатты талдау керек, т.б. Менің жағдайда, хат мұрағатқа бір сілтемемен бірге келеді, оны белгілі бір жерге қойып, одан әрі өңдеу процесін бастау керек.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Код қарапайым, сондықтан оған қосымша түсініктеме қажет емес. Мен сізге imap_conn_id сиқырлы желісі туралы айтып беремін. Apache Airflow қосылым параметрлерін (логин, құпия сөз, мекенжай және басқа параметрлер) сақтайды, оларға жол идентификаторы арқылы қол жеткізуге болады. Көрнекі түрде қосылымды басқару осылай көрінеді

Apache Airflow жүйесінде электрондық поштадан деректерді алудың ETL процесі

Деректерді күтуге арналған сенсор

Біз поштадан деректерді қосу және алу жолын бұрыннан білетіндіктен, енді оны күту үшін сенсорды жаза аламыз. Менің жағдайда деректерді өңдейтін операторды дереу жазу, егер бар болса, жұмыс істемеді, өйткені басқа процестер поштадан алынған деректер негізінде жұмыс істейді, соның ішінде басқа көздерден (API, телефония, веб-метрика, т.б.) т.б.). Мен сізге мысал келтіремін. CRM жүйесінде жаңа пайдаланушы пайда болды және біз оның UUID туралы әлі білмейміз. Содан кейін, SIP телефониясынан деректерді алуға тырысқанда, біз оның UUID идентификаторына байланысты қоңырауларды қабылдаймыз, бірақ біз оларды дұрыс сақтай және пайдалана алмаймыз. Мұндай сұрақтарда деректердің тәуелділігін, әсіресе олар әртүрлі көздерден алынған болса, есте сақтау маңызды. Бұл, әрине, деректер тұтастығын сақтау үшін жеткіліксіз шаралар, бірақ кейбір жағдайларда олар қажет. Ал бос тұрып жатқан ресурстарды қарызға алу да қисынсыз.

Осылайша, біздің сенсорымыз поштада жаңа ақпарат болса, графиктің келесі шыңдарын іске қосады, сонымен қатар алдыңғы ақпаратты маңызды емес деп белгілейді.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Мәліметтерді қабылдау және пайдалану

Мәліметтерді қабылдау және өңдеу үшін бөлек оператор жазуға болады, дайындарын пайдалануға болады. Әзірге логика тривиальды болғандықтан - хаттан деректерді алу үшін, мысалы, стандартты PythonOperator ұсынамын.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Айтпақшы, егер сіздің корпоративтік электрондық поштаңыз mail.ru сайтында болса, онда сіз хаттарды тақырып, жіберуші және т.б. бойынша іздей алмайсыз. Олар оны қайтадан 2016 жылы енгізуге уәде берді, бірақ ойларын өзгерткен сияқты. Мен бұл мәселені қажетті әріптер үшін бөлек қалта жасау және қажетті әріптер үшін пошта веб-интерфейсіндегі сүзгіні орнату арқылы шештім. Осылайша, тек қажетті әріптер мен іздеу шарттары, менің жағдайда, бұл қалтаға жай ғана (UNSEEN) кіреді.

Қорытындылай келе, бізде келесі реттілік бар: шарттарға сәйкес келетін жаңа әріптердің бар-жоғын тексереміз, егер бар болса, соңғы әріптегі сілтеме арқылы мұрағатты жүктеп аламыз.
Соңғы эллипстердің астында бұл мұрағаттың орамынан шығарылатыны, мұрағаттағы деректердің тазаланатыны және өңделетіні ескерілмейді, ақыр соңында бұл барлық нәрсе ETL процесс құбырына өтеді, бірақ бұл қазірдің өзінде мүмкін емес. Мақала. Егер бұл қызықты және пайдалы болса, мен Apache Airflow үшін ETL шешімдерін және олардың бөліктерін сипаттауды жалғастыруға қуаныштымын.

Ақпарат көзі: www.habr.com

пікір қалдыру