Proses ETL untuk mendapatkan data daripada e-mel dalam Apache Airflow

Proses ETL untuk mendapatkan data daripada e-mel dalam Apache Airflow

Tidak kira berapa banyak teknologi berkembang, pembangunan sentiasa diikuti oleh rentetan pendekatan yang ketinggalan zaman. Ini mungkin disebabkan oleh peralihan yang lancar, faktor manusia, keperluan teknologi, atau sesuatu yang lain. Dalam bidang pemprosesan data, yang paling mendedahkan dalam bahagian ini ialah sumber data. Tidak kira betapa kita bermimpi untuk menyingkirkan ini, buat masa ini beberapa data dihantar dalam pemesej segera dan e-mel, apatah lagi format yang lebih kuno. Saya menjemput anda untuk melihat salah satu pilihan untuk Apache Airflow, menggambarkan cara anda boleh mengumpul data daripada e-mel.

prasejarah

Banyak data masih dihantar melalui e-mel, daripada komunikasi interpersonal kepada standard interaksi antara syarikat. Adalah baik jika anda boleh menulis antara muka untuk menerima data atau meletakkan orang di pejabat yang akan memasukkan maklumat ini ke sumber yang lebih mudah, tetapi selalunya kemungkinan ini mungkin tidak ada. Tugas khusus yang saya hadapi ialah menyambungkan sistem CRM yang terkenal ke gudang data, dan kemudian ke sistem OLAP. Dari segi sejarah, untuk syarikat kami, penggunaan sistem ini adalah mudah dalam bidang perniagaan tertentu. Oleh itu, semua orang benar-benar mahu dapat beroperasi dengan data daripada sistem pihak ketiga ini juga. Pertama sekali, sudah tentu, kemungkinan mendapatkan data daripada API terbuka telah diterokai. Malangnya, API tidak meliputi mendapatkan semua data yang diperlukan, dan, secara ringkas, ia sebahagian besarnya bengkok, dan sokongan teknikal tidak mahu atau tidak dapat memenuhi separuh jalan untuk menyediakan fungsi yang lebih komprehensif. Tetapi sistem ini memberi peluang untuk menerima data yang hilang secara berkala melalui e-mel dalam bentuk pautan untuk memuat turun arkib.

Perlu diingatkan bahawa ini bukan satu-satunya kes di mana perniagaan ingin mengumpul data daripada e-mel atau utusan segera. Walau bagaimanapun, dalam kes ini, kami tidak boleh mempengaruhi syarikat pihak ketiga yang menyediakan sebahagian daripada data hanya dengan cara ini.

Aliran Udara Apache

Untuk membina proses ETL, kami paling kerap menggunakan Apache Airflow. Agar pembaca yang tidak biasa dengan teknologi ini untuk lebih memahami bagaimana ia kelihatan dalam konteks dan secara umum, saya akan menerangkan beberapa pengenalan.

Apache Airflow ialah platform percuma yang digunakan untuk membina, menjalankan dan memantau proses ETL (Extract-Transform-Loading) dalam Python. Konsep utama dalam Aliran Udara ialah graf akiklik terarah, di mana bucu graf adalah proses tertentu, dan tepi graf ialah aliran kawalan atau maklumat. Proses boleh memanggil mana-mana fungsi Python, atau ia boleh mempunyai logik yang lebih kompleks untuk memanggil beberapa fungsi secara berurutan dalam konteks kelas. Untuk operasi yang paling biasa, sudah terdapat banyak pembangunan siap sedia yang boleh digunakan sebagai proses. Perkembangan tersebut termasuk:

  • pengendali - untuk memindahkan data dari satu tempat ke tempat lain, sebagai contoh, dari jadual pangkalan data ke gudang data;
  • penderia - untuk menunggu kejadian tertentu dan mengarahkan aliran kawalan ke bucu graf berikutnya;
  • cangkuk - untuk operasi peringkat rendah, contohnya, untuk mendapatkan semula data daripada jadual pangkalan data (digunakan dalam penyata);
  • dan lain-lain

Adalah tidak sesuai untuk menerangkan Apache Airflow secara terperinci dalam artikel ini. Pengenalan ringkas boleh dilihat di sini atau di sini.

Cangkuk untuk mendapatkan data

Pertama sekali, untuk menyelesaikan masalah kita perlu menulis cangkuk yang kita boleh:

  • sambung ke e-mel;
  • cari surat yang anda perlukan;
  • menerima data daripada surat.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logiknya begini: kita sambung, cari huruf terakhir yang paling relevan, jika ada yang lain, kita abaikan mereka. Fungsi ini digunakan kerana mesej kemudian mengandungi semua data daripada yang terdahulu. Jika ini tidak berlaku, maka anda boleh mengembalikan tatasusunan semua huruf atau memproses yang pertama dan yang lain pada pas seterusnya. Secara umum, seperti biasa, semuanya bergantung pada tugas.

Kami menambah dua fungsi tambahan pada cangkuk: untuk memuat turun fail dan untuk memuat turun fail menggunakan pautan dari surat. Dengan cara ini, mereka boleh dimasukkan ke dalam pengendali, ia bergantung pada kekerapan penggunaan fungsi ini. Apa lagi yang perlu ditambahkan pada cangkuk, sekali lagi, bergantung pada tugas: jika fail diterima serta-merta dalam surat, maka anda boleh memuat turun lampiran pada surat itu, jika data datang dalam surat, maka anda perlu menghuraikan surat itu, dsb. Dalam kes saya, surat itu tiba dengan satu pautan ke arkib, yang perlu saya letakkan di tempat tertentu dan mula memproses selanjutnya.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kod ini mudah, jadi ia tidak memerlukan sebarang penjelasan tambahan. Saya hanya akan memberitahu anda tentang garis ajaib imap_conn_id. Apache Airflow menyimpan parameter sambungan (log masuk, kata laluan, alamat dan parameter lain), yang boleh diakses oleh pengecam rentetan. Secara visual, pengurusan sambungan kelihatan seperti ini

Proses ETL untuk mendapatkan data daripada e-mel dalam Apache Airflow

Sensor untuk menunggu data

Memandangkan kami sudah tahu cara menyambung dan menerima data daripada mel, kami kini boleh menulis penderia untuk menunggunya. Dalam kes saya, menulis dengan segera pengendali yang akan memproses data, jika ada, tidak berfungsi, kerana proses lain berfungsi berdasarkan data yang diterima daripada mel, termasuk yang mengambil data berkaitan daripada sumber lain (API, telefon, metrik web, dll.) dll.). Biar saya berikan satu contoh. Pengguna baharu telah muncul dalam sistem CRM dan kami masih belum mengetahui tentang UUIDnya. Kemudian, apabila kami cuba menerima data daripada telefon SIP, kami akan menerima panggilan yang terikat pada UUIDnya, tetapi kami tidak akan dapat menyimpan dan menggunakannya dengan betul. Dalam soalan sedemikian, adalah penting untuk mengingati pergantungan data, terutamanya jika ia adalah daripada sumber yang berbeza. Ini, sudah tentu, langkah-langkah yang tidak mencukupi untuk mengekalkan integriti data, tetapi dalam beberapa kes ia adalah perlu. Dan meminjam sumber semasa terbiar juga tidak rasional.

Oleh itu, penderia kami akan melancarkan bucu graf berikutnya jika terdapat maklumat baharu dalam mel, dan juga menandakan maklumat sebelumnya sebagai tidak berkaitan.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Menerima dan menggunakan data

Untuk menerima dan memproses data, anda boleh menulis operator yang berasingan, atau anda boleh menggunakan yang sudah siap. Oleh kerana logiknya masih remeh - untuk mengambil data dari surat, maka sebagai contoh saya mencadangkan PythonOperator standard

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

Dengan cara ini, jika e-mel korporat anda juga berada di mail.ru, maka anda tidak akan dapat mencari surat mengikut subjek, pengirim, dsb. Mereka berjanji untuk memperkenalkannya kembali pada 2016, tetapi nampaknya mengubah fikiran mereka. Saya menyelesaikan masalah ini dengan mencipta folder berasingan untuk huruf yang diperlukan dan menyediakan penapis dalam antara muka web mel untuk huruf yang diperlukan. Oleh itu, hanya huruf yang diperlukan dan syarat carian, dalam kes saya, hanya (GHAIB) masuk ke dalam folder ini.

Untuk meringkaskan, kami mempunyai urutan berikut: kami menyemak sama ada terdapat surat baru yang memenuhi syarat; jika ada, maka kami memuat turun arkib menggunakan pautan dari huruf terakhir.
Di bawah elips terakhir, arkib ini ditinggalkan akan dibongkar, data daripada arkib akan dibersihkan dan diproses, dan akibatnya, semuanya akan pergi lebih jauh ke saluran paip proses ETL, tetapi ini sudah di luar skop daripada artikel itu. Jika ia ternyata menarik dan berguna, maka saya dengan senang hati akan terus menerangkan penyelesaian ETL dan bahagiannya untuk Apache Airflow.

Sumber: www.habr.com

Tambah komen