Proses ETL kanggo njupuk data saka email ing Apache Airflow

Proses ETL kanggo njupuk data saka email ing Apache Airflow

Ora ketompo carane akeh teknologi berkembang, senar saka pendekatan outdated tansah tilase konco pembangunan. Iki bisa uga amarga transisi sing lancar, faktor manungsa, kabutuhan teknologi, utawa liya-liyane. Wonten ing babagan pangolah data, sumber data ingkang paling dipunandharaken wonten ing perangan menika. Ora ketompo carane akeh kita ngimpi njaluk nyisihaken saka iki, nanging supaya adoh bagéan saka data dikirim ing utusan cepet lan email, ora kanggo sebutno format liyane kuna. Aku ngajak sampeyan mbongkar salah siji opsi kanggo Apache Airflow, nggambarake carane sampeyan bisa njupuk data saka email.

prasejarah

Akeh data sing isih ditransfer liwat e-mail, saka komunikasi interpersonal menyang standar interaksi antarane perusahaan. Iku apik yen bisa nulis antarmuka kanggo njupuk data utawa sijine wong ing kantor sing bakal ngetik informasi iki menyang sumber luwih trep, nanging asring iki mung ora bisa. Tugas tartamtu sing aku diadhepi yaiku nyambungake sistem CRM sing kondhang menyang gudang data, banjur menyang sistem OLAP. Dadi kedaden historis sing kanggo perusahaan kita nggunakake sistem iki trep ing wilayah tartamtu saka bisnis. Mulane, kabeh wong pancene pengin bisa operate karo data saka sistem pihak katelu iki uga. Kaping pisanan, mesthine, kemungkinan entuk data saka API sing mbukak ditliti. Sayange, API ora nyakup kabeh data sing dibutuhake, lan, kanthi prasaja, ana akeh cara sing bengkok, lan dhukungan teknis ora pengin utawa ora bisa ketemu setengah kanggo nyedhiyakake fungsi sing luwih lengkap. Nanging sistem iki menehi kesempatan kanggo periodik nampa data ilang dening mail ing wangun link kanggo unloading arsip.

Perlu dicathet yen iki ora mung kasus sing bisnis pengin ngumpulake data saka email utawa utusan cepet. Nanging, ing kasus iki, kita ora bisa mengaruhi perusahaan pihak katelu sing nyedhiyakake bagean data mung kanthi cara iki.

aliran udara apache

Kanggo mbangun proses ETL, kita paling kerep nggunakake Apache Airflow. Supaya maca sing ora kenal karo teknologi iki luwih ngerti carane katon ing konteks lan ing umum, aku bakal njlèntrèhaké saperangan saka pambuko.

Apache Airflow minangka platform gratis sing digunakake kanggo mbangun, nglakokake lan ngawasi proses ETL (Extract-Transform-Loading) ing Python. Konsep utama ing Airflow yaiku grafik asiklik sing diarahake, ing ngendi titik-titik ing grafik minangka proses tartamtu, lan pinggiran grafik minangka aliran kontrol utawa informasi. A proses mung bisa nelpon sembarang fungsi Python, utawa bisa duwe logika liyane Komplek saka sequentially nelpon sawetara fungsi ing konteks saka kelas. Kanggo operasi sing paling kerep, wis ana akeh pembangunan siap sing bisa digunakake minangka proses. Perkembangan kasebut kalebu:

  • operator - kanggo nransfer data saka panggonan siji menyang panggonan liyane, contone, saka tabel database menyang gudang data;
  • sensor - kanggo nunggu kedadeyan saka acara tartamtu lan ngarahake aliran kontrol menyang verteks sakteruse saka grafik;
  • pancingan - kanggo operasi tingkat ngisor, contone, kanggo njupuk data saka tabel database (digunakake ing statements);
  • lan liya-liyane.

Ora cocog kanggo njlèntrèhaké Apache Airflow kanthi rinci ing artikel iki. Pambuka ringkes bisa dideleng kene utawa kene.

Pancing kanggo njupuk data

Kaping pisanan, kanggo ngatasi masalah, kita kudu nulis pancing sing bisa:

  • nyambung menyang email
  • golek layang sing bener
  • nampa data saka layang.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika iki: kita nyambung, golek huruf pungkasan sing paling relevan, yen ana liyane, kita nglirwakake. Fungsi iki digunakake, amarga aksara mengko ngemot kabeh data saka sadurungé. Yen ora, sampeyan bisa ngasilake kabeh aksara, utawa ngolah sing pisanan, lan liyane ing pass sabanjure. Umumé, kabeh, kaya biasane, gumantung saka tugas.

Kita nambahake rong fungsi tambahan kanggo pancing: kanggo ngundhuh file lan kanggo ngundhuh file nggunakake link saka email. Miturut cara, bisa dipindhah menyang operator, gumantung saka frekuensi nggunakake fungsi iki. Apa maneh kanggo nambah pancing, maneh, gumantung ing tugas: yen file ditampa langsung ing layang, sampeyan bisa ngundhuh lampiran menyang layang, yen data ditampa ing layang, sampeyan kudu ngurai layang, lsp. Ing kasusku, surat kasebut dilengkapi karo siji link menyang arsip, sing kudu dilebokake ing papan tartamtu lan miwiti proses pangolahan luwih lanjut.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kode iku prasaja, dadi meh ora perlu panjelasan luwih. Aku mung bakal pitutur marang kowe bab baris gaib imap_conn_id. Apache Airflow nyimpen paramèter sambungan (login, sandhi, alamat, lan paramèter liyane) sing bisa diakses nganggo pengenal senar. Secara visual, manajemen sambungan katon kaya iki

Proses ETL kanggo njupuk data saka email ing Apache Airflow

Sensor kanggo ngenteni data

Awit kita wis ngerti carane nyambungake lan nampa data saka mail, kita saiki bisa nulis sensor kanggo ngenteni wong. Ing kasusku, ora bisa langsung nulis operator sing bakal ngolah data, yen ana, amarga proses liyane bisa digunakake adhedhasar data sing ditampa saka surat, kalebu sing njupuk data sing gegandhengan saka sumber liyane (API, telephony). , metrik web, lsp.). lsp). Aku bakal menehi conto. Pangguna anyar wis muncul ing sistem CRM, lan kita isih ora ngerti babagan UUID. Banjur, nalika nyoba nampa data saka SIP telephony, kita bakal nampa telpon disambungake menyang UUID sawijining, nanging kita ora bakal bisa kanggo nyimpen lan nggunakake bener. Ing prakara kasebut, penting kanggo ngelingi katergantungan data, utamane yen saka sumber sing beda. Iki, mesthi, langkah-langkah sing ora cukup kanggo njaga integritas data, nanging ing sawetara kasus perlu. Ya, lan idling kanggo manggoni sumber daya uga ora rasional.

Mangkono, sensor kita bakal miwiti vertices sakteruse saka graph yen ana informasi anyar ing mail, lan uga menehi tandha informasi sadurungé minangka ora salaras.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Kita nampa lan nggunakake data

Kanggo nampa lan ngolah data, sampeyan bisa nulis operator sing kapisah, sampeyan bisa nggunakake sing wis siap. Wiwit saiki logika iku ora pati penting - kanggo njupuk data saka huruf, contone, aku suggest PythonOperator standar

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Miturut cara, yen surat perusahaan sampeyan uga ana ing mail.ru, mula sampeyan ora bakal bisa nelusuri layang miturut subyek, pangirim, lsp. Mbalik ing 2016, dheweke janji bakal ngenalake, nanging ketoke owah-owahan. Aku ngrampungake masalah iki kanthi nggawe folder sing kapisah kanggo huruf sing dibutuhake lan nyetel filter kanggo huruf sing dibutuhake ing antarmuka web mail. Mangkono, mung huruf lan kahanan sing dibutuhake kanggo telusuran, ing kasusku, mung (UNSEEN) mlebu ing folder iki.

Ringkesan, kita duwe urutan ing ngisor iki: kita mriksa yen ana huruf anyar sing nyukupi syarat kasebut, yen ana, banjur didownload arsip nggunakake link saka huruf pungkasan.
Ing titik pungkasan, ora ana arsip iki bakal dibongkar, data saka arsip bakal dibusak lan diproses, lan minangka asil, kabeh bakal luwih maju menyang pipa proses ETL, nanging iki wis ngluwihi. ruang lingkup artikel. Yen ternyata menarik lan migunani, mula aku bakal terus njlentrehake solusi ETL lan bagean kanggo Apache Airflow.

Source: www.habr.com

Add a comment