Proses ETL ar gyfer cael data o e-bost yn Apache Airflow

Proses ETL ar gyfer cael data o e-bost yn Apache Airflow

Ni waeth faint o dechnoleg sy'n datblygu, mae cyfres o ddulliau hen ffasiwn bob amser yn dilyn datblygiad. Gall hyn fod oherwydd trosglwyddiad esmwyth, ffactorau dynol, anghenion technolegol, neu rywbeth arall. Ym maes prosesu data, ffynonellau data yw'r rhai mwyaf dadlennol yn y rhan hon. Ni waeth faint yr ydym yn breuddwydio am gael gwared ar hyn, ond hyd yn hyn mae rhan o'r data yn cael ei anfon mewn negeswyr sydyn ac e-byst, heb sôn am fformatau mwy hynafol. Rwy'n eich gwahodd i ddadosod un o'r opsiynau ar gyfer Apache Airflow, gan ddangos sut y gallwch chi gymryd data o e-byst.

cynhanes

Mae llawer o ddata yn dal i gael ei drosglwyddo trwy e-bost, o gyfathrebu rhyngbersonol i safonau rhyngweithio rhwng cwmnïau. Mae'n dda os yw'n bosibl ysgrifennu rhyngwyneb i gael data neu roi pobl yn y swyddfa a fydd yn mewnbynnu'r wybodaeth hon i ffynonellau mwy cyfleus, ond yn aml efallai na fydd hyn yn bosibl. Y dasg benodol a wynebais oedd cysylltu’r system CRM drwg-enwog â’r warws data, ac yna â’r system OLAP. Digwyddodd felly yn hanesyddol bod defnyddio'r system hon yn gyfleus i'n cwmni mewn maes busnes penodol. Felly, roedd pawb wir eisiau gallu gweithredu gyda data o'r system trydydd parti hon hefyd. Yn gyntaf oll, wrth gwrs, astudiwyd y posibilrwydd o gael data o API agored. Yn anffodus, nid oedd yr API yn cwmpasu cael yr holl ddata angenrheidiol, ac, yn syml, roedd yn gam mewn sawl ffordd, ac nid oedd cymorth technegol eisiau neu ni allai gwrdd hanner ffordd i ddarparu ymarferoldeb mwy cynhwysfawr. Ond roedd y system hon yn rhoi'r cyfle i dderbyn y data coll o bryd i'w gilydd trwy'r post ar ffurf dolen ar gyfer dadlwytho'r archif.

Dylid nodi nad dyma'r unig achos lle'r oedd y busnes eisiau casglu data o e-byst neu negeswyr gwib. Fodd bynnag, yn yr achos hwn, ni allem ddylanwadu ar gwmni trydydd parti sy'n darparu rhan o'r data yn y modd hwn yn unig.

Llif Awyr Apache

I adeiladu prosesau ETL, rydym yn aml yn defnyddio Apache Airflow. Er mwyn i ddarllenydd sy'n anghyfarwydd â'r dechnoleg hon ddeall yn well sut mae'n edrych yn y cyd-destun ac yn gyffredinol, byddaf yn disgrifio cwpl o rai rhagarweiniol.

Mae Apache Airflow yn blatfform rhad ac am ddim a ddefnyddir i adeiladu, gweithredu a monitro prosesau ETL (Extract-Transform-Loading) yn Python. Y prif gysyniad yn Llif Awyr yw graff acyclic cyfeiriedig, lle mae fertigau'r graff yn brosesau penodol, ac ymylon y graff yw llif rheolaeth neu wybodaeth. Gall proses alw unrhyw swyddogaeth Python yn syml, neu gall fod â rhesymeg fwy cymhleth o alw sawl swyddogaeth yn olynol yng nghyd-destun dosbarth. Ar gyfer y gweithrediadau mwyaf aml, mae yna eisoes lawer o ddatblygiadau parod y gellir eu defnyddio fel prosesau. Mae datblygiadau o’r fath yn cynnwys:

  • gweithredwyr - ar gyfer trosglwyddo data o un lle i'r llall, er enghraifft, o dabl cronfa ddata i warws data;
  • synwyryddion - ar gyfer aros am ddigwyddiad penodol a chyfeirio llif y rheolaeth i fertigau dilynol y graff;
  • bachau - ar gyfer gweithrediadau lefel is, er enghraifft, i gael data o dabl cronfa ddata (a ddefnyddir mewn datganiadau);
  • ac ati

Byddai'n amhriodol disgrifio Apache Airflow yn fanwl yn yr erthygl hon. Gellir gweld cyflwyniadau byr yma neu yma.

Bachyn am gael data

Yn gyntaf oll, i ddatrys y broblem, mae angen i ni ysgrifennu bachyn y gallem ei ddefnyddio:

  • cysylltu ag e-bost
  • dod o hyd i'r llythyren gywir
  • derbyn data o'r llythyr.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Y rhesymeg yw hyn: rydym yn cysylltu, yn dod o hyd i'r llythyren fwyaf perthnasol olaf, os oes rhai eraill, rydym yn eu hanwybyddu. Defnyddir y swyddogaeth hon, oherwydd mae llythrennau diweddarach yn cynnwys holl ddata rhai cynharach. Os nad yw hyn yn wir, yna gallwch ddychwelyd amrywiaeth o'r holl lythyrau, neu brosesu'r un cyntaf, a'r gweddill ar y tocyn nesaf. Yn gyffredinol, mae popeth, fel bob amser, yn dibynnu ar y dasg.

Rydym yn ychwanegu dwy swyddogaeth ategol i'r bachyn: ar gyfer lawrlwytho ffeil ac ar gyfer lawrlwytho ffeil gan ddefnyddio dolen o e-bost. Gyda llaw, gellir eu symud i'r gweithredwr, mae'n dibynnu ar amlder defnyddio'r swyddogaeth hon. Mae beth arall i'w ychwanegu at y bachyn, eto, yn dibynnu ar y dasg: os derbynnir ffeiliau ar unwaith yn y llythyr, yna gallwch chi lawrlwytho atodiadau i'r llythyr, os derbynnir y data yn y llythyr, yna mae angen i chi ddosrannu'r llythyr, etc. Yn fy achos i, mae'r llythyr yn dod ag un dolen i'r archif, y mae angen i mi ei roi mewn man penodol a dechrau'r broses brosesu bellach.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Mae'r cod yn syml, felly prin fod angen esboniad pellach arno. 'N annhymerus' jyst yn dweud wrthych am y llinell hud imap_conn_id. Mae Apache Airflow yn storio paramedrau cysylltiad (mewngofnodi, cyfrinair, cyfeiriad, a pharamedrau eraill) y gellir eu cyrchu trwy ddynodwr llinyn. Yn weledol, mae rheoli cysylltiad yn edrych fel hyn

Proses ETL ar gyfer cael data o e-bost yn Apache Airflow

Synhwyrydd i aros am ddata

Gan ein bod eisoes yn gwybod sut i gysylltu a derbyn data o'r post, gallwn nawr ysgrifennu synhwyrydd i aros amdanynt. Yn fy achos i, ni weithiodd i ysgrifennu gweithredwr ar unwaith a fydd yn prosesu'r data, os o gwbl, oherwydd bod prosesau eraill yn gweithio yn seiliedig ar y data a dderbyniwyd o'r post, gan gynnwys y rhai sy'n cymryd data cysylltiedig o ffynonellau eraill (API, teleffoni , metrigau gwe, etc.) etc.). Rhoddaf enghraifft ichi. Mae defnyddiwr newydd wedi ymddangos yn y system CRM, ac nid ydym yn gwybod o hyd am ei UUID. Yna, wrth geisio derbyn data o deleffoni SIP, byddwn yn derbyn galwadau sy'n gysylltiedig â'i UUID, ond ni fyddwn yn gallu eu cadw a'u defnyddio'n gywir. Mewn materion o'r fath, mae'n bwysig cadw dibyniaeth y data mewn cof, yn enwedig os ydynt yn dod o wahanol ffynonellau. Mae'r rhain, wrth gwrs, yn fesurau annigonol i gadw cywirdeb data, ond mewn rhai achosion maent yn angenrheidiol. Ydy, ac mae segura i feddiannu adnoddau hefyd yn afresymol.

Felly, bydd ein synhwyrydd yn lansio fertigau dilynol y graff os oes gwybodaeth newydd yn y post, a hefyd yn nodi bod y wybodaeth flaenorol yn amherthnasol.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Rydym yn derbyn ac yn defnyddio data

I dderbyn a phrosesu data, gallwch ysgrifennu gweithredwr ar wahân, gallwch ddefnyddio rhai parod. Ers hyd yn hyn mae'r rhesymeg yn ddibwys - i gymryd data o'r llythyr, er enghraifft, rwy'n awgrymu'r PythonOperator safonol

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Gyda llaw, os yw'ch post corfforaethol hefyd ar mail.ru, yna ni fyddwch yn gallu chwilio am lythyrau yn ôl pwnc, anfonwr, ac ati. Yn ôl yn 2016, fe wnaethon nhw addo ei gyflwyno, ond mae'n debyg eu bod wedi newid eu meddyliau. Datrysais y broblem hon trwy greu ffolder ar wahân ar gyfer y llythyrau angenrheidiol a gosod hidlydd ar gyfer y llythyrau angenrheidiol yn y rhyngwyneb gwe post. Felly, dim ond y llythyrau a'r amodau angenrheidiol ar gyfer y chwiliad, yn fy achos i, yn syml (UNSEEN) sy'n mynd i mewn i'r ffolder hwn.

Gan grynhoi, mae gennym y dilyniant canlynol: rydym yn gwirio a oes llythyrau newydd sy'n bodloni'r amodau, os oes, yna rydym yn lawrlwytho'r archif gan ddefnyddio'r ddolen o'r llythyr olaf.
O dan y dotiau olaf, mae'n cael ei hepgor y bydd yr archif hwn yn cael ei ddadbacio, bydd y data o'r archif yn cael ei glirio a'i brosesu, ac o ganlyniad, bydd yr holl beth yn mynd ymhellach i linell y broses ETL, ond mae hyn eisoes y tu hwnt i hynny. cwmpas yr erthygl. Pe bai'n ddiddorol ac yn ddefnyddiol, yna byddaf yn falch o barhau i ddisgrifio atebion ETL a'u rhannau ar gyfer Apache Airflow.

Ffynhonnell: hab.com

Ychwanegu sylw