ETL-proses foar it krijen fan gegevens fan e-post yn Apache Airflow

ETL-proses foar it krijen fan gegevens fan e-post yn Apache Airflow

Nettsjinsteande hoefolle technology ûntwikkelt, in string ferâldere oanpak folget altyd efter ûntwikkeling. Dit kin wêze fanwege in glêde oergong, minsklike faktoaren, technologyske behoeften, of wat oars. Op it mêd fan gegevensferwurking binne gegevensboarnen yn dit diel it meast iepenbierend. Nettsjinsteande hoefolle wy dreame fan dit kwyt te reitsjen, mar oant no ta wurdt in diel fan 'e gegevens ferstjoerd yn instant messengers en e-mails, om net te hawwen oer mear archayske formaten. Ik noegje jo út om ien fan 'e opsjes foar Apache Airflow te disassemble, yllustrearje hoe't jo gegevens fan e-post kinne nimme.

prehistoarje

In soad gegevens wurde noch oerdroegen fia e-post, fan ynterpersoanlike kommunikaasje oant noarmen fan ynteraksje tusken bedriuwen. It is goed as it mooglik is om in ynterface te skriuwen om gegevens te krijen of minsken yn it kantoar te pleatsen dy't dizze ynformaasje yn handiger boarnen ynfiere, mar faaks kin dit gewoan net mooglik wêze. De spesifike taak dy't ik tsjinkaam wie it ferbinen fan it beruchte CRM-systeem oan it datapakhús, en dan nei it OLAP-systeem. It barde sa histoarysk dat foar ús bedriuw it gebrûk fan dit systeem handich wie yn in bepaald gebiet fan bedriuw. Dêrom woe elkenien echt ek mei gegevens fan dit systeem fan tredden kinne operearje. Alderearst waard fansels de mooglikheid ûndersocht om gegevens te krijen fan in iepen API. Spitigernôch die de API net om alle nedige gegevens te krijen, en, yn ienfâldige termen, wie it op in protte manieren krom, en technyske stipe woe of koe net healwei foldwaan om mear wiidweidige funksjonaliteit te leverjen. Mar dit systeem joech de kâns om de ûntbrekkende gegevens periodyk te ûntfangen per post yn 'e foarm fan in keppeling foar it laden fan it argyf.

Dêrby moat opmurken wurde dat dit net it ienige gefal wie wêryn it bedriuw gegevens fan e-mails of instant messengers sammelje woe. Yn dit gefal koene wy ​​lykwols gjin ynfloed op in bedriuw fan tredden dat allinich op dizze manier in diel fan 'e gegevens leveret.

apache luchtstream

Om ETL-prosessen te bouwen, brûke wy meast Apache Airflow. Om in lêzer dy't net bekend is mei dizze technology better te begripen hoe't it derút sjocht yn 'e kontekst en yn' t algemien, sil ik in pear ynliedende beskriuwe.

Apache Airflow is in fergese platfoarm dat wurdt brûkt om ETL-prosessen (Extract-Transform-Loading) yn Python te bouwen, út te fieren en te kontrolearjen. It haadkonsept yn Airflow is in rjochte acyclyske grafyk, wêrby't de hoekpunten fan 'e grafyk spesifike prosessen binne, en de rânen fan' e grafyk binne de stream fan kontrôle of ynformaasje. In proses kin gewoan in Python-funksje neame, of it kin kompleksere logika hawwe fan it opfolgjen fan ferskate funksjes yn 'e kontekst fan in klasse. Foar de meast foarkommende operaasjes binne d'r al in protte klearebare ûntwikkelingen dy't brûkt wurde kinne as prosessen. Sokke ûntjouwings omfetsje:

  • operators - foar it oerdragen fan gegevens fan it iene plak nei it oare, bygelyks fan in databanktabel nei in datapakhús;
  • sensoren - foar it wachtsjen op it foarkommen fan in bepaald barren en it rjochtsjen fan de stream fan kontrôle nei de folgjende hoekpunten fan 'e grafyk;
  • haken - foar operaasjes op legere nivo's, bygelyks om gegevens te krijen fan in databanktabel (brûkt yn útspraken);
  • en sa fierder.

It soe net passend wêze om Apache Airflow yn detail te beskriuwen yn dit artikel. Koarte ynliedingen kinne wurde besjoen hjir of hjir.

Hook foar it krijen fan gegevens

Earst fan alles, om it probleem op te lossen, moatte wy in heak skriuwe wêrmei wy koenen:

  • ferbine mei e-post
  • fine it rjocht letter
  • ûntfange gegevens út de brief.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

De logika is dit: wy ferbine, fine de lêste meast relevante brief, as der oaren binne, negearje wy se. Dizze funksje wurdt brûkt, om't lettere letters alle gegevens fan eardere befetsje. As dit net it gefal is, dan kinne jo in array fan alle letters weromjaan, of de earste ferwurkje, en de rest op 'e folgjende pass. Yn it algemien, alles, lykas altyd, hinget ôf fan de taak.

Wy foegje twa helpfunksjes ta oan 'e heak: foar it downloaden fan in bestân en foar it downloaden fan in bestân mei in keppeling fan in e-post. Trouwens, se kinne wurde ferpleatst nei de operator, it hinget ôf fan 'e frekwinsje fan it brûken fan dizze funksjonaliteit. Wat oars te foegjen oan 'e heak, wer, hinget ôf fan' e taak: as triemmen wurde ûntfongen fuortendaliks yn 'e brief, dan kinne jo downloade taheaksels oan' e brief, as de gegevens wurde ûntfongen yn 'e brief, dan moatte parse de brief, ensfh. Yn myn gefal komt de brief mei ien keppeling nei it argyf, dy't ik op in bepaald plak pleatse moat en it fierdere ferwurkingsproses begjinne.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

De koade is ienfâldich, dus it hoecht amper fierdere útlis. Ik sil gewoan fertelle oer de magyske line imap_conn_id. Apache Airflow bewarret ferbiningsparameters (oanmelding, wachtwurd, adres en oare parameters) dy't tagonklik wurde kinne troch in tekenrige identifier. Visueel sjocht ferbiningsbehear der sa út

ETL-proses foar it krijen fan gegevens fan e-post yn Apache Airflow

Sensor om te wachtsjen op gegevens

Om't wy al witte hoe't jo gegevens ferbine en ûntfange fan post, kinne wy ​​no in sensor skriuwe om op har te wachtsjen. Yn myn gefal wurke it net om direkt in operator te skriuwen dy't de gegevens ferwurkje, as der binne, om't oare prosessen wurkje basearre op de gegevens ûntfongen fan 'e post, ynklusyf dyjingen dy't relatearre gegevens fan oare boarnen nimme (API, telefoanysk) , webmetriken, ensfh.) ensfh.). Ik sil jo in foarbyld jaan. In nije brûker is ferskynd yn it CRM-systeem, en wy witte noch net oer syn UUID. Dan, as wy besykje gegevens te ûntfangen fan SIP-tillefoany, sille wy petearen ûntfange dy't ferbûn binne oan syn UUID, mar wy kinne se net opslaan en korrekt brûke. Yn sokke saken is it wichtich om de ôfhinklikens fan 'e gegevens yn' e rekken te hâlden, benammen as se út ferskate boarnen binne. Dit binne fansels net genôch maatregels om de yntegriteit fan gegevens te behâlden, mar yn guon gefallen binne se nedich. Ja, en idling om middels te besetten is ek irrasjoneel.

Sa sil ús sensor de folgjende hoekpunten fan 'e grafyk lansearje as d'r frisse ynformaasje yn' e post is, en ek de foarige ynformaasje as irrelevant markearje.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Wy ûntfange en brûke gegevens

Om gegevens te ûntfangen en te ferwurkjen, kinne jo in aparte operator skriuwe, jo kinne klear makke brûke. Om't de logika noch triviaal is - om bygelyks gegevens út 'e brief te nimmen, stel ik de standert PythonOperator foar

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Trouwens, as jo bedriuwspost ek op mail.ru is, dan kinne jo net sykje nei brieven op ûnderwerp, stjoerder, ensfh. Werom yn 2016 beloofden se it yn te fieren, mar feroare blykber fan gedachten. Ik haw dit probleem oplost troch in aparte map te meitsjen foar de nedige letters en in filter foar de nedige letters yn te stellen yn 'e mailwebynterface. Sa komme allinich de nedige letters en betingsten foar it sykjen, yn myn gefal, gewoan (UNSEEN) yn dizze map.

Gearfetsjend hawwe wy de folgjende folchoarder: wy kontrolearje oft der nije brieven binne dy't foldogge oan de betingsten, as der binne, dan downloade wy it argyf mei de keppeling fan 'e lêste brief.
Under de lêste stippen wurdt weilitten dat dit argyf útpakt wurdt, de gegevens út it argyf wiske en ferwurke wurde, en dêrtroch giet it gehiel fierder nei de pipeline fan it ETL-proses, mar dit is al foarby de omfang fan it artikel. As it nijsgjirrich en nuttich wie, dan sil ik graach trochgean mei it beskriuwen fan ETL-oplossingen en har dielen foar Apache Airflow.

Boarne: www.habr.com

Add a comment