Proses ETL pikeun meunangkeun data tina email dina Apache Airflow

Proses ETL pikeun meunangkeun data tina email dina Apache Airflow

Henteu paduli sabaraha téknologi ngembang, pamekaran sok dituturkeun ku senar pendekatan anu luntur. Ieu tiasa disababkeun ku transisi anu lancar, faktor manusa, kabutuhan téknologi, atanapi anu sanés. Dina widang ngolah data, anu paling ngungkabkeun dina ieu bagian nyaéta sumber data. Perkara teu sabaraha urang ngimpi meunang leupas tina ieu, pikeun ayeuna sababaraha data dikirim dina utusan instan jeung surelek, teu nyebut format leuwih kolot. Kuring ngajak anjeun ningali salah sahiji pilihan pikeun Apache Airflow, ngagambarkeun kumaha anjeun tiasa ngumpulkeun data tina email.

prasajarah

Seueur data anu masih dikirimkeun ngalangkungan email, ti komunikasi interpersonal dugi ka standar interaksi antara perusahaan. Éta saé upami anjeun tiasa nyerat antarmuka pikeun nampi data atanapi nempatkeun jalma di kantor anu bakal ngalebetkeun inpormasi ieu kana sumber anu langkung merenah, tapi sering kamungkinan ieu ngan saukur teu aya. Tugas khusus anu kuring hadapi nyaéta nyambungkeun sistem CRM anu terkenal ka gudang data, teras ka sistem OLAP. Dina sajarahna, pikeun perusahaan urang, pamakéan sistem ieu merenah dina wewengkon nu tangtu bisnis. Ku alatan éta, dulur hayang pisan bisa beroperasi kalawan data tina sistem pihak katilu ieu ogé. Anu mimiti, tangtosna, kamungkinan kéngingkeun data tina API kabuka digali. Hanjakal, API teu nutupan meunangkeun sakabéh data diperlukeun, sarta, dina istilah basajan, éta sakitu legana bengkung, sarta rojongan teknis teu hayang atawa teu bisa papanggih satengahna nyadiakeun fungsionalitas leuwih komprehensif. Tapi sistem ieu nyadiakeun kasempetan pikeun périodik narima data leungit ku email dina bentuk link pikeun ngundeur arsip.

Perlu dicatet yén ieu sanés hiji-hijina kasus dimana usaha hoyong ngumpulkeun data tina email atanapi utusan instan. Nanging, dina hal ieu, urang henteu tiasa mangaruhan perusahaan pihak katilu anu nyayogikeun bagian tina data ngan ku cara ieu.

aliran hawa Apache

Pikeun ngawangun prosés ETL, urang paling sering nganggo Apache Airflow. Supados pamaca teu wawuh sareng téknologi ieu langkung ngartos kumaha rupana dina kontéks sareng sacara umum, kuring bakal ngajelaskeun sababaraha bubuka.

Apache Airflow mangrupikeun platform gratis anu dianggo pikeun ngawangun, ngajalankeun, sareng ngawas prosés ETL (Extract-Transform-Loading) dina Python. Konsep utama dina Aliran Udara nyaéta grafik asiklik diarahkeun, dimana titik-titik dina grafik mangrupa prosés husus, sarta ujung-ujung grafik mangrupa aliran kontrol atawa informasi. A prosés saukur bisa nelepon sagala fungsi Python, atawa bisa mibanda logika leuwih kompleks sequentially nelepon sababaraha fungsi dina konteks kelas hiji. Pikeun operasi anu paling umum, parantos aya seueur pamekaran anu siap-siap anu tiasa dianggo salaku prosés. Kamajuan sapertos kieu kalebet:

  • operator - pikeun mindahkeun data ti hiji tempat ka nu sejen, contona, ti tabel database ka gudang data;
  • sensor - ngadagoan kajadian nu tangtu sarta ngarahkeun aliran kontrol ka vertice saterusna grafik;
  • hook - pikeun operasi tingkat handap, contona, pikeun retrieving data tina tabel database (dipaké dina pernyataan);
  • jeung sajabana

Teu pantes pikeun ngajelaskeun Apache Airflow sacara rinci dina tulisan ieu. Perkenalan singket tiasa ditingali di dieu atawa di dieu.

Hook pikeun meunangkeun data

Anu mimiti, pikeun ngabéréskeun masalah urang kedah nyerat pancing anu urang tiasa:

  • nyambung ka email;
  • manggihan surat nu peryogi;
  • nampi data tina surat.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika ieu: urang nyambungkeun, manggihan hurup paling relevan panungtungan, lamun aya batur, urang malire aranjeunna. Pungsi ieu dipaké sabab pesen engké ngandung sakabéh data ti nu saméméhna. Upami ieu sanés masalahna, anjeun tiasa ngabalikeun sakumpulan sadaya hurup atanapi ngolah anu kahiji sareng anu sanés dina pas salajengna. Sacara umum, sakumaha salawasna, sagalana gumantung kana tugas.

Kami nambihan dua fungsi bantu pikeun pancing: pikeun ngaunduh file sareng pikeun ngaunduh file nganggo tautan tina surat. Ku jalan kitu, aranjeunna bisa kaasup kana operator, éta gumantung kana frékuénsi pamakéan fungsi ieu. Naon deui pikeun nambahkeun kana hook, deui, gumantung kana tugas: lamun file narima langsung dina surat, Anjeun bisa ngundeur kantétan kana surat, lamun data datang dina surat, mangka anjeun kudu parse hurup, jsb. Dina hal kuring, suratna sumping sareng hiji tautan kana arsip, anu kuring kedah nempatkeun dina tempat anu tangtu sareng ngamimitian ngolah salajengna.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kodena saderhana, janten henteu peryogi katerangan tambahan. Kuring ngan bakal ngabejaan Anjeun tentang garis magic imap_conn_id. Apache Airflow nyimpen parameter sambungan (login, kecap akses, alamat jeung parameter sejenna), nu bisa diasupan ku identifier string. Sacara visual, manajemén sambungan sapertos kieu

Proses ETL pikeun meunangkeun data tina email dina Apache Airflow

Sensor pikeun ngantosan data

Kusabab urang parantos terang kumaha nyambungkeun sareng nampi data tina surat, urang ayeuna tiasa nyerat sensor pikeun ngantosan éta. Dina kasus kuring, langsung nyerat operator anu bakal ngolah data, upami aya, henteu jalan, sabab prosés sanésna jalan dumasar kana data anu ditampi tina surat, kalebet anu nyandak data anu aya hubunganana tina sumber anu sanés (API, teleponi, métrik wéb, jsb.) jsb.). Hayu atuh masihan anjeun conto. Pamaké anyar parantos muncul dina sistem CRM, sareng urang henteu acan terang ngeunaan UUID na. Teras, nalika urang nyobian nampi data tina telepon SIP, urang bakal nampi telepon anu aya hubunganana sareng UUID na, tapi urang moal tiasa leres-leres nyimpen sareng ngagunakeunana. Dina patarosan sapertos kitu, hal anu penting pikeun tetep dina pikiran gumantungna data, utamana lamun aranjeunna ti sumber béda. Ieu, tangtosna, ukuran anu teu cekap pikeun ngajaga integritas data, tapi dina sababaraha kasus aranjeunna diperyogikeun. Sareng nginjeum sumber nalika dianggurkeun ogé teu rasional.

Ku kituna, sensor kami bakal ngajalankeun vertice saterusna tina grafik lamun aya informasi seger dina surat, sarta ogé cirian informasi saméméhna teu relevan.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Narima jeung ngagunakeun data

Pikeun nampa sareng ngolah data, anjeun tiasa nyerat operator anu misah, atanapi anjeun tiasa nganggo anu siap-siap. Kusabab logika masih trivial - nyandak data tina surat, teras salaku conto kuring nyarankeun PythonOperator standar

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Ngomong-ngomong, upami email perusahaan anjeun ogé aya dina mail.ru, maka anjeun moal tiasa milarian hurup dumasar kana subjék, pangirim, jsb. Aranjeunna jangji baris ngenalkeun deui dina 2016, tapi tétéla robah pikiran maranéhanana. Kuring direngsekeun masalah ieu ku nyieun hiji folder misah pikeun hurup diperlukeun tur nyetel hiji filter dina panganteur web mail pikeun hurup perlu. Ku kituna, ngan hurup perlu jeung kaayaan pilarian, bisi kuring, saukur (Ghaib) lebet kana folder ieu.

Pikeun nyimpulkeun, urang gaduh sekuen ieu: urang pariksa naha aya hurup anyar anu nyumponan sarat; upami aya, teras urang unduh arsip nganggo tautan tina hurup anu terakhir.
Dina elips anu terakhir, dileungitkeun yén arsip ieu bakal dibongkar, data tina arsip bakal dibersihkeun sareng diolah, sareng tungtungna sadayana ieu bakal langkung jauh kana jalur pipa prosés ETL, tapi ieu parantos saluareun ruang lingkup artikel. Upami tétéla pikaresepeun sareng mangpaat, maka kuring bakal resep neraskeun ngajelaskeun solusi ETL sareng bagian-bagianna pikeun Apache Airflow.

sumber: www.habr.com

Tambahkeun komentar