ETL process for getting data from email in Apache Airflow

ETL process for getting data from email in Apache Airflow

No matter how much technology develops, a string of outdated approaches always trails behind development. This may be due to a smooth transition, human factors, technological needs, or something else. In the field of data processing, data sources are the most revealing in this part. No matter how much we dream of getting rid of this, but so far part of the data is sent in instant messengers and emails, not to mention more archaic formats. I invite you to disassemble one of the options for Apache Airflow, illustrating how you can take data from emails.

prehistory

A lot of data is still transferred via e-mail, from interpersonal communications to standards of interaction between companies. It is good if it is possible to write an interface to obtain data or put people in the office who will enter this information into more convenient sources, but often this may simply not be possible. The specific task that I faced was connecting the notorious CRM system to the data warehouse, and then to the OLAP system. It so happened historically that for our company the use of this system was convenient in a particular area of ​​business. Therefore, everyone really wanted to be able to operate with data from this third-party system as well. First of all, of course, the possibility of obtaining data from an open API was studied. Unfortunately, the API did not cover getting all the necessary data, and, in simple terms, it was in many ways crooked, and technical support did not want or could not meet halfway to provide more comprehensive functionality. But this system provided the opportunity to periodically receive the missing data by mail in the form of a link for unloading the archive.

It should be noted that this was not the only case in which the business wanted to collect data from emails or instant messengers. However, in this case, we could not influence a third-party company that provides part of the data only in this way.

apache airflow

To build ETL processes, we most often use Apache Airflow. In order for a reader who is unfamiliar with this technology to better understand how it looks in the context and in general, I will describe a couple of introductory ones.

Apache Airflow is a free platform that is used to build, execute and monitor ETL (Extract-Transform-Loading) processes in Python. The main concept in Airflow is a directed acyclic graph, where the vertices of the graph are specific processes, and the edges of the graph are the flow of control or information. A process can simply call any Python function, or it can have more complex logic from sequentially calling several functions in the context of a class. For the most frequent operations, there are already many ready-made developments that can be used as processes. Such developments include:

  • operators - for transferring data from one place to another, for example, from a database table to a data warehouse;
  • sensors - for waiting for the occurrence of a certain event and directing the flow of control to the subsequent vertices of the graph;
  • hooks - for lower-level operations, for example, to get data from a database table (used in statements);
  • etc.

It would be inappropriate to describe Apache Airflow in detail in this article. Brief introductions can be viewed here or here.

Hook for getting data

First of all, to solve the problem, we need to write a hook with which we could:

  • connect to email
  • find the right letter
  • receive data from the letter.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получСния Π΄Π°Π½Π½Ρ‹Ρ… с элСктронной ΠΏΠΎΡ‡Ρ‚Ρ‹

           :param imap_conn_id:       Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            ΠŸΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΊ ΠΏΠΎΡ‡Ρ‚Π΅
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для получСния ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Π° послСднСго письма, 
            ΡƒΠ΄ΠΎΠ²Π»Π΅Ρ‚Π²ΠΎΡ€Π°ΡΡŽΡ‰Π΅Π³ΠΎ условиям поиска

            :param check_seen:      ΠžΡ‚ΠΌΠ΅Ρ‡Π°Ρ‚ΡŒ послСднСС письмо ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½ΠΎΠ΅
            :type check_seen:       bool
            :param box:             НаимСнования ящика
            :type box:              string
            :param condition:       Условия поиска писСм
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("Π’ ящикС Π½Π°ΠΉΠ΄Π΅Π½Ρ‹ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("НС Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½ΠΎΠ²Ρ‹Ρ… писСм")
            return None

        mail_id = mail_ids[0]

        # Ссли Ρ‚Π°ΠΊΠΈΡ… писСм нСсколько
        if len(mail_ids) > 1:
            # ΠΎΡ‚ΠΌΠ΅Ρ‡Π°Π΅ΠΌ ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌΠΈ
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌ послСднСС
            mail_id = mail_ids[-1]

        # Π½ΡƒΠΆΠ½ΠΎ Π»ΠΈ ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ послСднСС ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹ΠΌ
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

The logic is this: we connect, find the last most relevant letter, if there are others, we ignore them. This function is used, because later letters contain all the data of earlier ones. If this is not the case, then you can return an array of all letters, or process the first one, and the rest on the next pass. In general, everything, as always, depends on the task.

We add two auxiliary functions to the hook: for downloading a file and for downloading a file using a link from an email. By the way, they can be moved to the operator, it depends on the frequency of using this functionality. What else to add to the hook, again, depends on the task: if files are received immediately in the letter, then you can download attachments to the letter, if the data is received in the letter, then you need to parse the letter, etc. In my case, the letter comes with one link to the archive, which I need to put in a certain place and start the further processing process.

    def download_from_url(self, url, path, chunk_size=128):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π°

            :param url:              АдрСс Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ
            :type url:               string
            :param path:             ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:              string
            :param chunk_size:       По сколько Π±Π°ΠΉΡ‚ΠΎΠ² ΠΏΠΈΡΠ°Ρ‚ΡŒ
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            ΠœΠ΅Ρ‚ΠΎΠ΄ для скачивания Ρ„Π°ΠΉΠ»Π° ΠΏΠΎ ссылкС ΠΈΠ· письма

            :param mail_id:         Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ письма
            :type mail_id:          string
            :param path:            ΠšΡƒΠ΄Π° ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚ΡŒ Ρ„Π°ΠΉΠ»
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

The code is simple, so it hardly needs further explanation. I'll just tell you about the magic line imap_conn_id. Apache Airflow stores connection parameters (login, password, address, and other parameters) that can be accessed by a string identifier. Visually, connection management looks like this

ETL process for getting data from email in Apache Airflow

Sensor to wait for data

Since we already know how to connect and receive data from mail, we can now write a sensor to wait for them. In my case, it didn’t work to write an operator right away that will process the data, if any, because other processes work based on the data received from the mail, including those that take related data from other sources (API, telephony, web metrics, etc.). etc.). I'll give you an example. A new user has appeared in the CRM system, and we still do not know about his UUID. Then, when trying to receive data from SIP telephony, we will receive calls tied to its UUID, but we will not be able to save and use them correctly. In such matters, it is important to keep in mind the dependence of the data, especially if they are from different sources. These are, of course, insufficient measures to preserve data integrity, but in some cases they are necessary. Yes, and idling to occupy resources is also irrational.

Thus, our sensor will launch subsequent vertices of the graph if there is fresh information in the mail, and also mark the previous information as irrelevant.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

We receive and use data

To receive and process data, you can write a separate operator, you can use ready-made ones. Since the logic is still trivial - to take data from the letter, for example, I suggest the standard PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Π‘Ρ‚Π°Π½Π΄Π°Ρ€Ρ‚Π½ΠΎΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π³Ρ€Π°Ρ„Π°
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# ΠžΠΏΡ€Π΅Π΄Π΅Π»ΡΠ΅ΠΌ сСнсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Ѐункция для получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… Π²Π΅Ρ€ΡˆΠΈΠ½ Π³Ρ€Π°Ρ„Π°
...

# Π—Π°Π΄Π°Π΅ΠΌ связь Π½Π° Π³Ρ€Π°Ρ„Π΅
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# ОписаниС ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΠΎΡ‚ΠΎΠΊΠΎΠ² управлСния

By the way, if your corporate mail is also on mail.ru, then you will not be able to search for letters by subject, sender, etc. Back in 2016, they promised to introduce it, but apparently changed their minds. I solved this problem by creating a separate folder for the necessary letters and setting up a filter for the necessary letters in the mail web interface. Thus, only the necessary letters and conditions for the search, in my case, simply (UNSEEN) get into this folder.

Summarizing, we have the following sequence: we check if there are new letters that meet the conditions, if there are, then we download the archive using the link from the last letter.
Under the last dots, it is omitted that this archive will be unpacked, the data from the archive will be cleared and processed, and as a result, the whole thing will go further to the pipeline of the ETL process, but this is already beyond the scope of the article. If it turned out interesting and useful, then I will gladly continue to describe ETL solutions and their parts for Apache Airflow.

Source: habr.com

Add a comment