Proses ETL untuk mendapatkan data dari email di Apache Airflow

Proses ETL untuk mendapatkan data dari email di Apache Airflow

Tidak peduli seberapa besar perkembangan teknologi, serangkaian pendekatan yang sudah ketinggalan zaman selalu tertinggal di belakang perkembangannya. Hal ini mungkin disebabkan oleh kelancaran transisi, faktor manusia, kebutuhan teknologi, atau hal lainnya. Dalam bidang pengolahan data, sumber data merupakan hal yang paling terbuka pada bagian ini. Tidak peduli seberapa besar impian kita untuk menghilangkannya, namun sejauh ini sebagian data dikirim dalam pesan instan dan email, belum lagi format yang lebih kuno. Saya mengundang Anda untuk membongkar salah satu opsi untuk Apache Airflow, yang mengilustrasikan bagaimana Anda dapat mengambil data dari email.

prasejarah

Banyak data yang masih ditransfer melalui email, mulai dari komunikasi interpersonal hingga standar interaksi antar perusahaan. Ada baiknya jika memungkinkan untuk menulis antarmuka untuk memperoleh data atau menempatkan orang di kantor yang akan memasukkan informasi ini ke sumber yang lebih nyaman, tetapi seringkali hal ini tidak dapat dilakukan. Tugas khusus yang saya hadapi adalah menghubungkan sistem CRM yang terkenal buruk ke gudang data, dan kemudian ke sistem OLAP. Secara historis, bagi perusahaan kami, penggunaan sistem ini nyaman dalam bidang bisnis tertentu. Oleh karena itu, semua orang sangat ingin dapat beroperasi dengan data dari sistem pihak ketiga ini juga. Pertama-tama, tentu saja, kemungkinan memperoleh data dari API terbuka telah dipelajari. Sayangnya, API tidak mencakup perolehan semua data yang diperlukan, dan, secara sederhana, API tersebut dalam banyak hal tidak benar, dan dukungan teknis tidak ingin atau tidak dapat memenuhi setengah jalan untuk menyediakan fungsionalitas yang lebih komprehensif. Namun sistem ini memberikan kesempatan untuk menerima data yang hilang secara berkala melalui surat dalam bentuk link untuk membongkar arsip.

Perlu dicatat bahwa ini bukan satu-satunya kasus di mana bisnis ingin mengumpulkan data dari email atau pesan instan. Namun, dalam kasus ini, kami tidak dapat mempengaruhi perusahaan pihak ketiga yang menyediakan sebagian data hanya dengan cara ini.

Aliran Udara Apache

Untuk membangun proses ETL, kami paling sering menggunakan Apache Airflow. Agar pembaca yang belum familiar dengan teknologi ini dapat lebih memahami tampilannya dalam konteks dan secara umum, saya akan menjelaskan beberapa pengantar.

Apache Airflow adalah platform gratis yang digunakan untuk membangun, mengeksekusi, dan memantau proses ETL (Extract-Transform-Loading) dengan Python. Konsep utama dalam Aliran Udara adalah grafik asiklik berarah, di mana simpul-simpul grafik adalah proses tertentu, dan tepi-tepi grafik adalah aliran kontrol atau informasi. Suatu proses dapat dengan mudah memanggil fungsi Python apa pun, atau dapat memiliki logika yang lebih kompleks dengan memanggil beberapa fungsi secara berurutan dalam konteks suatu kelas. Untuk pengoperasian yang paling sering, sudah banyak pengembangan siap pakai yang dapat digunakan sebagai proses. Perkembangan tersebut meliputi:

  • operator - untuk mentransfer data dari satu tempat ke tempat lain, misalnya, dari tabel database ke gudang data;
  • sensor - untuk menunggu terjadinya peristiwa tertentu dan mengarahkan aliran kontrol ke simpul berikutnya pada grafik;
  • kait - untuk operasi tingkat rendah, misalnya, untuk mendapatkan data dari tabel database (digunakan dalam pernyataan);
  • dan lain-lain

Tidak tepat untuk menjelaskan Apache Airflow secara detail di artikel ini. Perkenalan singkat dapat dilihat di sini ΠΈΠ»ΠΈ di sini.

Kait untuk mendapatkan data

Pertama-tama, untuk menyelesaikan masalah ini, kita perlu menulis sebuah pengait yang dapat kita gunakan:

  • terhubung ke email
  • menemukan huruf yang tepat
  • menerima data dari surat itu.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logikanya begini: kita sambung, cari huruf terakhir yang paling relevan, kalau ada yang lain kita abaikan. Fungsi ini digunakan karena huruf-huruf selanjutnya berisi semua data huruf-huruf sebelumnya. Jika hal ini tidak terjadi, maka Anda dapat mengembalikan array yang berisi semua huruf, atau memproses huruf pertama, dan sisanya pada lintasan berikutnya. Secara umum, semuanya, seperti biasa, bergantung pada tugasnya.

Kami menambahkan dua fungsi tambahan ke hook: untuk mengunduh file dan untuk mengunduh file menggunakan tautan dari email. Omong-omong, mereka dapat dipindahkan ke operator, itu tergantung pada frekuensi penggunaan fungsi ini. Apa lagi yang harus ditambahkan ke hook, sekali lagi, tergantung pada tugasnya: jika file langsung diterima dalam surat, maka Anda dapat mengunduh lampiran pada surat itu, jika data diterima dalam surat itu, maka Anda perlu menguraikan surat itu, dll. Dalam kasus saya, surat itu dilengkapi dengan satu tautan ke arsip, yang harus saya letakkan di tempat tertentu dan memulai proses pemrosesan lebih lanjut.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kodenya sederhana, sehingga tidak memerlukan penjelasan lebih lanjut. Langsung saja saya ceritakan tentang garis ajaib imap_conn_id. Apache Airflow menyimpan parameter koneksi (login, kata sandi, alamat, dan parameter lainnya) yang dapat diakses oleh pengidentifikasi string. Secara visual, manajemen koneksi terlihat seperti ini

Proses ETL untuk mendapatkan data dari email di Apache Airflow

Sensor untuk menunggu data

Karena kita sudah mengetahui cara menyambung dan menerima data dari email, sekarang kita dapat menulis sensor untuk menunggunya. Dalam kasus saya, tidak mungkin untuk langsung menulis operator yang akan memproses data, jika ada, karena proses lain bekerja berdasarkan data yang diterima dari surat, termasuk proses yang mengambil data terkait dari sumber lain (API, telepon , metrik web, dll.).dll). Saya akan memberi Anda sebuah contoh. Pengguna baru telah muncul di sistem CRM, dan kami masih belum mengetahui tentang UUID-nya. Kemudian, ketika mencoba menerima data dari telepon SIP, kami akan menerima panggilan yang terkait dengan UUID-nya, tetapi kami tidak dapat menyimpan dan menggunakannya dengan benar. Dalam hal ini, penting untuk mengingat ketergantungan data, terutama jika data tersebut berasal dari sumber yang berbeda. Tentu saja langkah-langkah ini tidak cukup untuk menjaga integritas data, namun dalam beberapa kasus hal ini diperlukan. Ya, dan bermalas-malasan untuk menduduki sumber daya juga tidak rasional.

Dengan demikian, sensor kami akan meluncurkan simpul grafik berikutnya jika ada informasi baru dalam email, dan juga menandai informasi sebelumnya sebagai tidak relevan.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Kami menerima dan menggunakan data

Untuk menerima dan mengolah data, Anda dapat menulis operator terpisah, Anda dapat menggunakan yang sudah jadi. Karena sejauh ini logikanya sepele - untuk mengambil data dari surat itu, misalnya, saya menyarankan PythonOperator standar

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Omong-omong, jika surat perusahaan Anda juga ada di mail.ru, Anda tidak akan bisa mencari surat berdasarkan subjek, pengirim, dll. Pada tahun 2016 lalu, mereka berjanji untuk memperkenalkannya, namun ternyata berubah pikiran. Saya memecahkan masalah ini dengan membuat folder terpisah untuk surat-surat yang diperlukan dan menyiapkan filter untuk surat-surat yang diperlukan di antarmuka web surat. Jadi, hanya huruf-huruf yang diperlukan dan ketentuan pencarian, dalam kasus saya, cukup (TIDAK DILIHAT) yang masuk ke folder ini.

Ringkasnya, kita punya urutannya sebagai berikut: kita cek apakah ada surat baru yang memenuhi syarat, jika ada maka kita download arsipnya menggunakan link dari surat terakhir.
Di bawah titik terakhir, dihilangkan bahwa arsip ini akan dibongkar, data dari arsip akan dibersihkan dan diproses, dan sebagai hasilnya, semuanya akan berlanjut ke jalur proses ETL, tetapi ini sudah melampaui ruang lingkup artikel. Jika ternyata menarik dan bermanfaat, saya dengan senang hati akan terus menjelaskan solusi ETL dan bagian-bagiannya untuk Apache Airflow.

Sumber: www.habr.com

Tambah komentar