Διαδικασία ETL για λήψη δεδομένων από email στο Apache Airflow

Διαδικασία ETL για λήψη δεδομένων από email στο Apache Airflow

Ανεξάρτητα από το πόσο αναπτύσσεται η τεχνολογία, μια σειρά απαρχαιωμένων προσεγγίσεων πάντα ακολουθεί την ανάπτυξη. Αυτό μπορεί να οφείλεται σε ομαλή μετάβαση, ανθρώπινους παράγοντες, τεχνολογικές ανάγκες ή κάτι άλλο. Στον τομέα της επεξεργασίας δεδομένων, οι πηγές δεδομένων είναι οι πιο αποκαλυπτικές σε αυτό το κομμάτι. Ανεξάρτητα από το πόσο ονειρευόμαστε να απαλλαγούμε από αυτό, αλλά μέχρι στιγμής μέρος των δεδομένων αποστέλλεται σε instant messenger και email, για να μην αναφέρουμε πιο αρχαϊκές μορφές. Σας προσκαλώ να αποσυναρμολογήσετε μια από τις επιλογές για το Apache Airflow, που δείχνει πώς μπορείτε να λαμβάνετε δεδομένα από μηνύματα ηλεκτρονικού ταχυδρομείου.

Ιστορικό

Πολλά δεδομένα εξακολουθούν να μεταφέρονται μέσω e-mail, από τις διαπροσωπικές επικοινωνίες έως τα πρότυπα αλληλεπίδρασης μεταξύ εταιρειών. Είναι καλό εάν μπορείτε να γράψετε μια διεπαφή για τη λήψη δεδομένων ή να τοποθετήσετε άτομα στο γραφείο που θα εισάγουν αυτές τις πληροφορίες σε πιο βολικές πηγές, αλλά συχνά αυτό μπορεί απλώς να μην είναι δυνατό. Η συγκεκριμένη εργασία που αντιμετώπισα ήταν η σύνδεση του διαβόητου συστήματος CRM με την αποθήκη δεδομένων και, στη συνέχεια, με το σύστημα OLAP. Ιστορικά συνέβη ότι για την εταιρεία μας η χρήση αυτού του συστήματος ήταν βολική σε έναν συγκεκριμένο τομέα των επιχειρήσεων. Επομένως, όλοι ήθελαν πραγματικά να μπορούν να λειτουργούν με δεδομένα και από αυτό το σύστημα τρίτων. Πρώτα από όλα, βέβαια, μελετήθηκε η δυνατότητα λήψης δεδομένων από ανοιχτό API. Δυστυχώς, το API δεν κάλυπτε τη λήψη όλων των απαραίτητων δεδομένων και, με απλά λόγια, ήταν από πολλές απόψεις στραβά και η τεχνική υποστήριξη δεν ήθελε ή δεν μπορούσε να ανταποκριθεί στα μισά του δρόμου για να παρέχει πιο ολοκληρωμένη λειτουργικότητα. Αλλά αυτό το σύστημα παρείχε την ευκαιρία να λαμβάνει περιοδικά τα δεδομένα που λείπουν μέσω ταχυδρομείου με τη μορφή συνδέσμου για την εκφόρτωση του αρχείου.

Θα πρέπει να σημειωθεί ότι αυτή δεν ήταν η μοναδική περίπτωση που η επιχείρηση ήθελε να συλλέξει δεδομένα από email ή instant messenger. Ωστόσο, σε αυτήν την περίπτωση, δεν θα μπορούσαμε να επηρεάσουμε μια τρίτη εταιρεία που παρέχει μέρος των δεδομένων μόνο με αυτόν τον τρόπο.

Ροή αέρα Apache

Για τη δημιουργία διαδικασιών ETL, χρησιμοποιούμε συχνότερα το Apache Airflow. Προκειμένου ένας αναγνώστης που δεν είναι εξοικειωμένος με αυτήν την τεχνολογία να κατανοήσει καλύτερα πώς φαίνεται στο πλαίσιο και γενικά, θα περιγράψω μερικές εισαγωγικές.

Το Apache Airflow είναι μια δωρεάν πλατφόρμα που χρησιμοποιείται για τη δημιουργία, εκτέλεση και παρακολούθηση διαδικασιών ETL (Extract-Transform-Loading) στην Python. Η κύρια ιδέα στη ροή αέρα είναι ένα κατευθυνόμενο άκυκλο γράφημα, όπου οι κορυφές του γραφήματος είναι συγκεκριμένες διαδικασίες και οι ακμές του γραφήματος είναι η ροή ελέγχου ή πληροφοριών. Μια διεργασία μπορεί απλά να καλέσει οποιαδήποτε συνάρτηση Python ή μπορεί να έχει πιο πολύπλοκη λογική από την διαδοχική κλήση πολλών συναρτήσεων στο πλαίσιο μιας κλάσης. Για τις πιο συχνές λειτουργίες, υπάρχουν ήδη πολλές έτοιμες εξελίξεις που μπορούν να χρησιμοποιηθούν ως διαδικασίες. Τέτοιες εξελίξεις περιλαμβάνουν:

  • χειριστές - για τη μεταφορά δεδομένων από το ένα μέρος στο άλλο, για παράδειγμα, από έναν πίνακα βάσης δεδομένων σε μια αποθήκη δεδομένων.
  • αισθητήρες - για αναμονή για την εμφάνιση ενός συγκεκριμένου γεγονότος και κατεύθυνση της ροής ελέγχου στις επόμενες κορυφές του γραφήματος.
  • hooks - για λειτουργίες χαμηλότερου επιπέδου, για παράδειγμα, για λήψη δεδομένων από πίνακα βάσης δεδομένων (χρησιμοποιείται σε δηλώσεις).
  • κλπ.

Θα ήταν ακατάλληλο να περιγράψουμε λεπτομερώς το Apache Airflow σε αυτό το άρθρο. Μπορείτε να δείτε σύντομες εισαγωγές εδώ ή εδώ.

Άγκιστρο για λήψη δεδομένων

Πρώτα απ 'όλα, για να λύσουμε το πρόβλημα, πρέπει να γράψουμε ένα άγκιστρο με το οποίο θα μπορούσαμε:

  • συνδεθείτε στο email
  • βρείτε το σωστό γράμμα
  • λάβετε δεδομένα από την επιστολή.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Η λογική είναι η εξής: συνδέουμε, βρίσκουμε το τελευταίο πιο σχετικό γράμμα, αν υπάρχουν άλλα, τα αγνοούμε. Αυτή η συνάρτηση χρησιμοποιείται, επειδή τα μεταγενέστερα γράμματα περιέχουν όλα τα δεδομένα των προηγούμενων. Εάν αυτό δεν συμβαίνει, τότε μπορείτε να επιστρέψετε μια σειρά από όλα τα γράμματα ή να επεξεργαστείτε το πρώτο και τα υπόλοιπα στο επόμενο πάσο. Γενικά, όλα, όπως πάντα, εξαρτώνται από την εργασία.

Προσθέτουμε δύο βοηθητικές λειτουργίες στο άγκιστρο: για τη λήψη ενός αρχείου και για τη λήψη ενός αρχείου χρησιμοποιώντας έναν σύνδεσμο από ένα email. Παρεμπιπτόντως, μπορούν να μετακινηθούν στον χειριστή, εξαρτάται από τη συχνότητα χρήσης αυτής της λειτουργίας. Τι άλλο να προσθέσετε στο άγκιστρο, πάλι, εξαρτάται από την εργασία: εάν τα αρχεία ληφθούν αμέσως στην επιστολή, τότε μπορείτε να κάνετε λήψη συνημμένων στο γράμμα, εάν τα δεδομένα ληφθούν στο γράμμα, τότε πρέπει να αναλύσετε το γράμμα, και τα λοιπά. Στην περίπτωσή μου, η επιστολή συνοδεύεται από έναν σύνδεσμο προς το αρχείο, τον οποίο πρέπει να τοποθετήσω σε ένα συγκεκριμένο μέρος και να ξεκινήσω την περαιτέρω διαδικασία επεξεργασίας.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Ο κώδικας είναι απλός, επομένως δεν χρειάζεται περαιτέρω εξήγηση. Θα σας πω μόνο για τη μαγική γραμμή imap_conn_id. Το Apache Airflow αποθηκεύει παραμέτρους σύνδεσης (σύνδεση, κωδικός πρόσβασης, διεύθυνση και άλλες παραμέτρους) στις οποίες μπορείτε να έχετε πρόσβαση από ένα αναγνωριστικό συμβολοσειράς. Οπτικά, η διαχείριση σύνδεσης μοιάζει με αυτό

Διαδικασία ETL για λήψη δεδομένων από email στο Apache Airflow

Αισθητήρας για αναμονή δεδομένων

Δεδομένου ότι γνωρίζουμε ήδη πώς να συνδέουμε και να λαμβάνουμε δεδομένα από την αλληλογραφία, μπορούμε τώρα να γράψουμε έναν αισθητήρα για να τα περιμένουμε. Στην περίπτωσή μου, δεν λειτούργησε να γράψω αμέσως έναν χειριστή που θα επεξεργάζεται τα δεδομένα, εάν υπάρχουν, επειδή άλλες διεργασίες λειτουργούν με βάση τα δεδομένα που λαμβάνονται από την αλληλογραφία, συμπεριλαμβανομένων εκείνων που λαμβάνουν σχετικά δεδομένα από άλλες πηγές (API, τηλεφωνία , μετρήσεις ιστού, κ.λπ.). κ.λπ.). Θα σας δώσω ένα παράδειγμα. Ένας νέος χρήστης εμφανίστηκε στο σύστημα CRM και ακόμα δεν γνωρίζουμε για το UUID του. Στη συνέχεια, όταν προσπαθούμε να λάβουμε δεδομένα από την τηλεφωνία SIP, θα λαμβάνουμε κλήσεις συνδεδεμένες με το UUID της, αλλά δεν θα μπορούμε να τις αποθηκεύσουμε και να τις χρησιμοποιήσουμε σωστά. Σε τέτοια θέματα, είναι σημαντικό να έχετε κατά νου την εξάρτηση των δεδομένων, ειδικά εάν προέρχονται από διαφορετικές πηγές. Αυτά είναι, φυσικά, ανεπαρκή μέτρα για τη διατήρηση της ακεραιότητας των δεδομένων, αλλά σε ορισμένες περιπτώσεις είναι απαραίτητα. Ναι, και το ρελαντί για την κατάληψη πόρων είναι επίσης παράλογο.

Έτσι, ο αισθητήρας μας θα εκκινήσει τις επόμενες κορυφές του γραφήματος εάν υπάρχουν νέες πληροφορίες στο ταχυδρομείο και θα επισημάνει επίσης τις προηγούμενες πληροφορίες ως άσχετες.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Λαμβάνουμε και χρησιμοποιούμε δεδομένα

Για να λάβετε και να επεξεργαστείτε δεδομένα, μπορείτε να γράψετε έναν ξεχωριστό τελεστή, μπορείτε να χρησιμοποιήσετε έτοιμα. Δεδομένου ότι μέχρι στιγμής η λογική είναι ασήμαντη - για να ληφθούν δεδομένα από το γράμμα, για παράδειγμα, προτείνω τον τυπικό PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Παρεμπιπτόντως, εάν η εταιρική σας αλληλογραφία βρίσκεται επίσης στο mail.ru, τότε δεν θα μπορείτε να αναζητήσετε γράμματα ανά θέμα, αποστολέα κ.λπ. Το 2016, υποσχέθηκαν να το παρουσιάσουν, αλλά προφανώς άλλαξαν γνώμη. Έλυσα αυτό το πρόβλημα δημιουργώντας έναν ξεχωριστό φάκελο για τα απαραίτητα γράμματα και ρυθμίζοντας ένα φίλτρο για τα απαραίτητα γράμματα στη διεπαφή ιστού αλληλογραφίας. Έτσι, μόνο τα απαραίτητα γράμματα και προϋποθέσεις για την αναζήτηση, στην περίπτωσή μου, απλά (UNSEEN) μπαίνουν σε αυτόν τον φάκελο.

Συνοψίζοντας, έχουμε την ακόλουθη σειρά: ελέγχουμε αν υπάρχουν νέα γράμματα που πληρούν τις προϋποθέσεις, αν υπάρχουν, κατόπιν κατεβάζουμε το αρχείο χρησιμοποιώντας τον σύνδεσμο από το τελευταίο γράμμα.
Κάτω από τις τελευταίες κουκκίδες, παραλείπεται ότι αυτό το αρχείο θα αποσυσκευαστεί, τα δεδομένα από το αρχείο θα εκκαθαριστούν και θα υποβληθούν σε επεξεργασία, και ως αποτέλεσμα, το όλο θέμα θα προχωρήσει περαιτέρω στον αγωγό της διαδικασίας ETL, αλλά αυτό είναι ήδη πέρα ​​από το πεδίο εφαρμογής του άρθρου. Εάν αποδείχθηκε ενδιαφέρον και χρήσιμο, τότε θα συνεχίσω με χαρά να περιγράφω τις λύσεις ETL και τα μέρη τους για το Apache Airflow.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο