Procés ETL per obtenir dades del correu electrònic a Apache Airflow

Procés ETL per obtenir dades del correu electrònic a Apache Airflow

Per molt que es desenvolupi la tecnologia, una sèrie d'enfocaments obsolets sempre van darrere del desenvolupament. Això pot ser degut a una transició suau, factors humans, necessitats tecnològiques o alguna altra cosa. En l'àmbit del processament de dades, les fonts de dades són les més reveladores d'aquesta part. Per molt que somiem amb desfer-se'n, però fins ara part de les dades s'envien en missatgeria instantània i correus electrònics, per no parlar de formats més arcaics. Us convido a desmuntar una de les opcions d'Apache Airflow, il·lustrant com podeu agafar dades dels correus electrònics.

prehistòria

Encara es transfereixen moltes dades per correu electrònic, des de comunicacions interpersonals fins a estàndards d'interacció entre empreses. És bo si és possible escriure una interfície per obtenir dades o posar persones a l'oficina que introduiran aquesta informació en fonts més convenients, però sovint això no és possible. La tasca específica que em vaig enfrontar va ser connectar el famós sistema CRM al magatzem de dades i després al sistema OLAP. Va succeir històricament que per a la nostra empresa l'ús d'aquest sistema era convenient en una àrea determinada del negoci. Per tant, tothom volia poder operar també amb dades d'aquest sistema de tercers. En primer lloc, és clar, es va estudiar la possibilitat d'obtenir dades d'una API oberta. Malauradament, l'API no va cobrir l'obtenció de totes les dades necessàries i, en termes senzills, estava en molts aspectes tort, i el suport tècnic no volia o no podia reunir-se a mig camí per oferir una funcionalitat més completa. Però aquest sistema donava l'oportunitat de rebre periòdicament les dades que falten per correu electrònic en forma d'enllaç per descarregar l'arxiu.

Cal destacar que aquest no va ser l'únic cas en què l'empresa volia recollir dades de correus electrònics o missatgeria instantània. No obstant això, en aquest cas, no podríem influir en una empresa aliena que només proporcioni part de les dades d'aquesta manera.

flux d'aire apache

Per crear processos ETL, sovint utilitzem Apache Airflow. Perquè un lector que no estigui familiaritzat amb aquesta tecnologia entengui millor com es veu en el context i en general, en descriuré un parell d'introduccions.

Apache Airflow és una plataforma gratuïta que s'utilitza per crear, executar i supervisar processos ETL (Extract-Transform-Loading) a Python. El concepte principal a Airflow és un gràfic acíclic dirigit, on els vèrtexs del gràfic són processos específics, i les vores del gràfic són el flux de control o informació. Un procés simplement pot cridar a qualsevol funció de Python, o pot tenir una lògica més complexa de cridar seqüencialment diverses funcions en el context d'una classe. Per a les operacions més freqüents, ja hi ha molts desenvolupaments ja fets que es poden utilitzar com a processos. Aquests desenvolupaments inclouen:

  • operadors: per transferir dades d'un lloc a un altre, per exemple, d'una taula de base de dades a un magatzem de dades;
  • sensors: per esperar l'ocurrència d'un determinat esdeveniment i dirigir el flux de control als vèrtexs posteriors del gràfic;
  • ganxos: per a operacions de nivell inferior, per exemple, per obtenir dades d'una taula de base de dades (utilitzada en declaracions);
  • etcètera

Seria inadequat descriure Apache Airflow en detall en aquest article. Es poden veure breus presentacions aquí o aquí.

Ganxo per obtenir dades

En primer lloc, per resoldre el problema, hem d'escriure un ganxo amb el qual podríem:

  • connectar-se al correu electrònic
  • trobar la lletra correcta
  • rebre dades de la carta.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

La lògica és aquesta: connectem, trobem l'última lletra més rellevant, si n'hi ha d'altres, les ignorem. S'utilitza aquesta funció, perquè les lletres posteriors contenen totes les dades de les anteriors. Si no és així, podeu tornar una matriu de totes les lletres o processar la primera i la resta a la passada següent. En general, tot, com sempre, depèn de la tasca.

Afegim dues funcions auxiliars al ganxo: per descarregar un fitxer i per descarregar un fitxer mitjançant un enllaç d'un correu electrònic. Per cert, es poden traslladar a l'operador, depèn de la freqüència d'ús d'aquesta funcionalitat. Què més afegir al ganxo, de nou, depèn de la tasca: si els fitxers es reben immediatament a la carta, podeu descarregar fitxers adjunts a la carta, si les dades es reben a la carta, haureu d'analitzar la carta, etc. En el meu cas, la carta ve amb un enllaç a l'arxiu, que he de posar en un lloc determinat i començar el procés de processament posterior.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

El codi és senzill, per la qual cosa no necessita més explicacions. Només us parlaré de la línia màgica imap_conn_id. Apache Airflow emmagatzema paràmetres de connexió (inici de sessió, contrasenya, adreça i altres paràmetres) als quals es pot accedir mitjançant un identificador de cadena. Visualment, la gestió de la connexió té aquest aspecte

Procés ETL per obtenir dades del correu electrònic a Apache Airflow

Sensor per esperar dades

Com que ja sabem connectar-nos i rebre dades del correu, ara podem escriure un sensor per esperar-los. En el meu cas, no va funcionar immediatament escriure un operador que processés les dades, si n'hi ha, perquè altres processos funcionen en funció de les dades rebudes del correu, inclosos els que prenen dades relacionades d'altres fonts (API, telefonia). , mètriques web, etc.) etc.). Et posaré un exemple. Un nou usuari ha aparegut al sistema CRM i encara no sabem sobre el seu UUID. Aleshores, quan intentem rebre dades de la telefonia SIP, rebrem trucades vinculades al seu UUID, però no les podrem desar i utilitzar correctament. En aquests temes, és important tenir en compte la dependència de les dades, sobretot si són de fonts diferents. Aquestes són, per descomptat, mesures insuficients per preservar la integritat de les dades, però en alguns casos són necessàries. Sí, i el ralentí per ocupar recursos també és irracional.

Així, el nostre sensor llançarà vèrtexs posteriors del gràfic si hi ha informació nova al correu, i també marcarà la informació anterior com a irrellevant.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Rebem i fem servir dades

Per rebre i processar dades, podeu escriure un operador independent, podeu utilitzar-ne de ja fets. Com que fins ara la lògica és trivial: per prendre dades de la carta, per exemple, suggereixo el PythonOperator estàndard

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Per cert, si el vostre correu corporatiu també es troba a mail.ru, no podreu cercar cartes per tema, remitent, etc. L'any 2016, van prometre presentar-lo, però pel que sembla van canviar d'opinió. Vaig resoldre aquest problema creant una carpeta separada per a les lletres necessàries i configurant un filtre per a les lletres necessàries a la interfície web de correu. Així, només les lletres i condicions necessàries per a la cerca, en el meu cas, simplement (NO VISITES) entren en aquesta carpeta.

En resum, tenim la següent seqüència: comprovem si hi ha noves lletres que compleixin les condicions, si n'hi ha, després descarreguem l'arxiu mitjançant l'enllaç de l'última lletra.
Sota els darrers punts, s'omet que aquest arxiu es desempaquetarà, les dades de l'arxiu s'esborraran i es processaran i, com a resultat, tot anirà més enllà de la canalització del procés ETL, però això ja està més enllà. l'abast de l'article. Si va resultar interessant i útil, continuaré amb molt de gust descrivint les solucions ETL i les seves parts per a Apache Airflow.

Font: www.habr.com

Afegeix comentari