Apache Airflow'da e-postadan veri almak için ETL işlemi

Apache Airflow'da e-postadan veri almak için ETL işlemi

Teknoloji ne kadar gelişirse gelişsin, bir dizi modası geçmiş yaklaşım her zaman gelişimin gerisinde kalır. Bunun nedeni yumuşak bir geçiş, insan faktörü, teknolojik ihtiyaçlar veya başka bir şey olabilir. Veri işleme alanında, veri kaynakları bu kısımda en açıklayıcıdır. Bundan ne kadar kurtulmayı hayal etsek de, şimdiye kadar verilerin bir kısmı, daha eski biçimlerden bahsetmeye gerek yok, anlık mesajlaşma programlarında ve e-postalarda gönderiliyor. Sizi, e-postalardan nasıl veri alabileceğinizi gösteren Apache Airflow seçeneklerinden birini parçalara ayırmaya davet ediyorum.

tarih öncesi

Kişiler arası iletişimden şirketler arasındaki etkileşim standartlarına kadar pek çok veri hala e-posta yoluyla aktarılıyor. Veri elde etmek için bir arayüz yazmak veya ofiste bu bilgileri daha uygun kaynaklara girecek kişileri koymak mümkünse iyidir, ancak çoğu zaman bu mümkün olmayabilir. Karşılaştığım özel görev, kötü şöhretli CRM sistemini veri ambarına ve ardından OLAP sistemine bağlamaktı. Tarihsel olarak öyle oldu ki, şirketimiz için bu sistemin kullanımı belirli bir iş alanında uygun oldu. Bu nedenle, herkes gerçekten bu üçüncü taraf sistemden gelen verilerle de çalışabilmeyi istedi. Her şeyden önce, elbette, açık bir API'den veri elde etme olasılığı araştırıldı. Ne yazık ki, API gerekli tüm verileri almayı kapsamıyordu ve basit bir ifadeyle, birçok yönden çarpıktı ve teknik destek daha kapsamlı işlevsellik sağlamak için yarı yolda buluşmak istemedi veya arayamadı. Ancak bu sistem, eksik verileri periyodik olarak arşivi boşaltmak için bir bağlantı şeklinde posta yoluyla alma fırsatı sağladı.

İşletmenin e-postalardan veya anlık mesajlaşma programlarından veri toplamak istediği tek durumun bu olmadığına dikkat edilmelidir. Ancak bu durumda, verilerin yalnızca bir kısmını bu şekilde sağlayan üçüncü taraf bir şirketi etkileyemedik.

Apache Hava Akışı

ETL süreçleri oluşturmak için genellikle Apache Airflow kullanırız. Bu teknolojiye aşina olmayan bir okuyucunun bağlamda ve genel olarak nasıl göründüğünü daha iyi anlaması için birkaç giriş niteliğinde anlatacağım.

Apache Airflow, Python'da ETL (Extract-Transform-Loading) süreçleri oluşturmak, yürütmek ve izlemek için kullanılan ücretsiz bir platformdur. Airflow'daki ana konsept, grafiğin köşelerinin belirli süreçler olduğu ve grafiğin kenarlarının kontrol veya bilgi akışı olduğu yönlendirilmiş bir asiklik grafiktir. Bir işlem, herhangi bir Python işlevini basitçe çağırabilir veya bir sınıf bağlamında birkaç işlevi sırayla çağırmaktan daha karmaşık bir mantığa sahip olabilir. En sık yapılan işlemler için, süreç olarak kullanılabilecek pek çok hazır geliştirme hali hazırda mevcuttur. Bu tür gelişmeler şunları içerir:

  • operatörler - verileri bir yerden diğerine, örneğin bir veritabanı tablosundan bir veri ambarına aktarmak için;
  • sensörler - belirli bir olayın gerçekleşmesini beklemek ve kontrol akışını grafiğin sonraki köşelerine yönlendirmek için;
  • kancalar - alt düzey işlemler için, örneğin bir veritabanı tablosundan veri almak için (ifadelerde kullanılır);
  • vb

Bu makalede Apache Airflow'u ayrıntılı olarak anlatmak uygun olmayacaktır. Kısa tanıtımlar görüntülenebilir burada veya burada.

Veri almak için kanca

Her şeyden önce, sorunu çözmek için, yapabileceğimiz bir kanca yazmamız gerekiyor:

  • e-postaya bağlan
  • doğru harfi bul
  • mektuptan veri almak.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Mantık şudur: bağlanırız, en alakalı son harfi buluruz, başkaları varsa onları yok sayarız. Bu işlev kullanılır, çünkü sonraki harfler öncekilerin tüm verilerini içerir. Durum böyle değilse, tüm harflerin bir dizisini döndürebilir veya ilkini ve geri kalanını bir sonraki geçişte işleyebilirsiniz. Genel olarak, her şey her zaman olduğu gibi göreve bağlıdır.

Kancaya iki yardımcı işlev ekliyoruz: bir dosya indirmek için ve bir e-postadan bir bağlantı kullanarak bir dosya indirmek için. Bu arada, operatöre taşınabilirler, bu işlevi kullanma sıklığına bağlıdır. Kancaya başka ne ekleneceği yine göreve bağlıdır: dosyalar mektupta hemen alınırsa, mektuba ekleri indirebilirsiniz, eğer veriler mektupta alınırsa, o zaman mektubu ayrıştırmanız gerekir. vesaire. Benim durumumda, mektup, belirli bir yere koymam ve sonraki işleme sürecini başlatmam gereken arşive tek bir bağlantıyla geliyor.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kod basittir, bu nedenle daha fazla açıklamaya ihtiyaç duymaz. Size imap_conn_id sihirli satırından bahsedeceğim. Apache Airflow, bir dizi tanımlayıcı tarafından erişilebilen bağlantı parametrelerini (oturum açma adı, parola, adres ve diğer parametreler) depolar. Görsel olarak, bağlantı yönetimi şöyle görünür

Apache Airflow'da e-postadan veri almak için ETL işlemi

Veri beklemek için sensör

Nasıl bağlanacağımızı ve postadan veri alacağımızı zaten bildiğimiz için, artık onları beklemek için bir sensör yazabiliriz. Benim durumumda, varsa verileri işleyecek bir operatörü hemen yazmak işe yaramadı çünkü diğer işlemler, diğer kaynaklardan (API, telefon) ilgili verileri alanlar da dahil olmak üzere postadan alınan verilere göre çalışır. , web metrikleri vb.) vb.). Sana bir örnek vereceğim. CRM sisteminde yeni bir kullanıcı belirdi ve onun UUID'sini hala bilmiyoruz. Ardından, SIP telefonundan veri almaya çalışırken, onun UUID'sine bağlı aramalar alacağız, ancak bunları doğru bir şekilde kaydedip kullanamayacağız. Bu tür konularda, özellikle farklı kaynaklardan geliyorsa, verilerin bağımlılığını akılda tutmak önemlidir. Bunlar elbette veri bütünlüğünü korumak için yetersiz önlemlerdir, ancak bazı durumlarda gereklidirler. Evet ve kaynakları işgal etmek için rölantide çalışmak da mantıksız.

Böylece, postada yeni bilgiler varsa sensörümüz grafiğin sonraki köşelerini başlatacak ve ayrıca önceki bilgileri alakasız olarak işaretleyecektir.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Verileri alıyoruz ve kullanıyoruz

Veri almak ve işlemek için ayrı bir operatör yazabilir, hazır olanları kullanabilirsiniz. Şimdiye kadar mantık önemsiz olduğu için - örneğin mektuptan veri almak için standart PythonOperator'ı öneririm

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Bu arada, kurumsal postanız da mail.ru'daysa, mektupları konuya, gönderene vb. Göre arayamazsınız. 2016'da, onu tanıtmaya söz verdiler, ancak görünüşe göre fikirlerini değiştirdiler. Gerekli harfler için ayrı bir klasör oluşturarak ve mail web arayüzünde gerekli harfler için bir filtre ayarlayarak bu sorunu çözdüm. Bu nedenle, benim durumumda, yalnızca arama için gerekli harfler ve koşullar bu klasöre (GÖRÜNMEYEN) girer.

Özetle, aşağıdaki diziye sahibiz: Koşulları karşılayan yeni harflerin olup olmadığını kontrol ediyoruz, varsa, son harfin bağlantısını kullanarak arşivi indiriyoruz.
Son noktaların altında, bu arşivin paketten çıkarılacağı, arşivdeki verilerin temizlenip işleneceği ve sonuç olarak her şeyin ETL sürecinin boru hattına daha da ileri gideceği atlanmıştır, ancak bu zaten ötesindedir. makalenin kapsamı. İlginç ve yararlı olduysa, Apache Airflow için ETL çözümlerini ve bunların parçalarını açıklamaya memnuniyetle devam edeceğim.

Kaynak: habr.com

Yorum ekle