ETL procesas, skirtas gauti duomenis iš el. pašto „Apache Airflow“.

ETL procesas, skirtas gauti duomenis iš el. pašto „Apache Airflow“.

Nesvarbu, kiek technologijos vystosi, pasenusių metodų virtinė visada atsilieka nuo vystymosi. Tai gali lemti sklandus perėjimas, žmogiškieji veiksniai, technologiniai poreikiai ar kažkas kita. Duomenų apdorojimo srityje šioje dalyje labiausiai atskleidžia duomenų šaltiniai. Kad ir kaip svajotume to atsikratyti, tačiau kol kas dalis duomenų siunčiami momentiniais pranešimais ir elektroniniais laiškais, jau nekalbant apie archajiškesnius formatus. Kviečiu išardyti vieną iš „Apache Airflow“ parinkčių, iliustruojančių, kaip galite paimti duomenis iš el.

priešistorė

Daug duomenų vis dar perduodama el. paštu – nuo ​​tarpasmeninio bendravimo iki įmonių sąveikos standartų. Gerai, jei galima parašyti sąsają duomenims gauti arba į biurą patalpinti žmones, kurie įves šią informaciją į patogesnius šaltinius, tačiau dažnai tai gali būti tiesiog neįmanoma. Konkreti užduotis, su kuria susidūriau, buvo liūdnai pagarsėjusios CRM sistemos prijungimas prie duomenų saugyklos, o tada prie OLAP sistemos. Taip istoriškai susiklostė, kad mūsų įmonei šios sistemos naudojimas buvo patogus konkrečioje verslo srityje. Todėl visi labai norėjo turėti galimybę dirbti ir su šios trečiosios šalies sistemos duomenimis. Pirmiausia, žinoma, buvo ištirta galimybė gauti duomenis iš atviros API. Deja, API neapėmė visų reikalingų duomenų gavimo ir, paprastai tariant, ji daugeliu atžvilgių buvo kreiva, o techninis aptarnavimas nenorėjo arba negalėjo pasiekti pusiaukelėje, kad būtų užtikrintas išsamesnis funkcionalumas. Tačiau ši sistema suteikė galimybę periodiškai gauti trūkstamus duomenis paštu kaip archyvo iškrovimo saitą.

Pažymėtina, kad tai nebuvo vienintelis atvejis, kai įmonė norėjo rinkti duomenis iš el. laiškų ar momentinių žinučių. Tačiau šiuo atveju negalėjome paveikti trečiosios šalies įmonės, kuri teikia dalį duomenų tik tokiu būdu.

Apache oro srautas

ETL procesams kurti dažniausiai naudojame „Apache Airflow“. Kad skaitytojas, kuris nėra susipažinęs su šia technologija, geriau suprastų, kaip ji atrodo kontekste ir apskritai, aprašysiu porą įvadinių.

„Apache Airflow“ yra nemokama platforma, naudojama „Python“ ETL (Extract-Transform-Loading) procesams kurti, vykdyti ir stebėti. Pagrindinė „Airflow“ sąvoka yra nukreiptas aciklinis grafikas, kur grafo viršūnės yra specifiniai procesai, o grafo kraštai yra valdymo arba informacijos srautas. Procesas gali tiesiog iškviesti bet kurią „Python“ funkciją arba gali turėti sudėtingesnę logiką, nes nuosekliai iškviečiamos kelios funkcijos klasės kontekste. Dažniausioms operacijoms jau yra daug paruoštų patobulinimų, kuriuos galima naudoti kaip procesus. Tokie pokyčiai apima:

  • operatoriai – duomenims perkelti iš vienos vietos į kitą, pavyzdžiui, iš duomenų bazės lentelės į duomenų saugyklą;
  • jutikliai - tam tikro įvykio laukimui ir valdymo srauto nukreipimui į paskesnes grafo viršūnes;
  • kabliukai – žemesnio lygio operacijoms, pavyzdžiui, norint gauti duomenis iš duomenų bazės lentelės (naudojami teiginiuose);
  • ir tt

Šiame straipsnyje būtų netikslinga išsamiai aprašyti „Apache Airflow“. Galima peržiūrėti trumpus prisistatymus čia arba čia.

Kabliukas duomenims gauti

Visų pirma, norėdami išspręsti problemą, turime parašyti kabliuką, su kuriuo galėtume:

  • prisijungti prie el
  • rasti tinkamą raidę
  • gauti duomenis iš laiško.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Logika tokia: susijungiame, surandame paskutinę aktualiausią raidę, jei yra kitų – ignoruojame. Ši funkcija naudojama, nes vėlesnėse raidėse yra visi ankstesnių duomenys. Jei taip nėra, galite grąžinti visų raidžių masyvą arba apdoroti pirmąją, o likusias – kitame žingsnyje. Apskritai viskas, kaip visada, priklauso nuo užduoties.

Prie kabliuko pridedame dvi pagalbines funkcijas: failo atsisiuntimui ir failo atsisiuntimui naudojant nuorodą iš el. Beje, juos galima perkelti į operatorių, tai priklauso nuo šios funkcijos naudojimo dažnumo. Ką dar pridėti prie kabliuko, vėlgi, priklauso nuo užduoties: jei failai gaunami iš karto į laišką, tada galite atsisiųsti laiško priedus, jei duomenys gauti laiške, tada reikia išanalizuoti laišką, ir tt Mano atveju laiške yra viena nuoroda į archyvą, kurią man reikia įdėti į tam tikrą vietą ir pradėti tolimesnį apdorojimo procesą.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kodas yra paprastas, todėl vargu ar reikia daugiau paaiškinti. Aš tik papasakosiu apie magišką liniją imap_conn_id. Apache Airflow saugo ryšio parametrus (prisijungimo vardą, slaptažodį, adresą ir kitus parametrus), kuriuos galima pasiekti naudojant eilutės identifikatorių. Vizualiai ryšio valdymas atrodo taip

ETL procesas, skirtas gauti duomenis iš el. pašto „Apache Airflow“.

Jutiklis laukti duomenų

Kadangi jau žinome, kaip prisijungti ir gauti duomenis iš pašto, dabar galime parašyti jutiklį, kuris jų lauks. Mano atveju nepavyko iš karto parašyti operatoriaus, kuris apdoros duomenis, jei tokių bus, nes kiti procesai veikia remiantis duomenimis, gautais iš pašto, įskaitant tuos, kurie ima susijusius duomenis iš kitų šaltinių (API, telefonijos). , žiniatinklio metrika ir kt.). Pateiksiu pavyzdį. CRM sistemoje atsirado naujas vartotojas, kurio UUID vis dar nežinome. Tada bandydami gauti duomenis iš SIP telefonijos gausime su jos UUID susietus skambučius, tačiau negalėsime jų tinkamai išsaugoti ir panaudoti. Tokiais klausimais svarbu nepamiršti duomenų priklausomybės, ypač jei jie yra iš skirtingų šaltinių. Žinoma, tai yra nepakankamos priemonės duomenų vientisumui išsaugoti, tačiau kai kuriais atvejais jos būtinos. Taip, ir tuščiąja eiga užimti išteklius taip pat neracionalu.

Taigi, mūsų jutiklis paleis paskesnes grafiko viršūnes, jei laiške bus naujos informacijos, taip pat pažymės ankstesnę informaciją kaip nesvarbią.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Duomenis gauname ir naudojame

Norėdami gauti ir apdoroti duomenis, galite parašyti atskirą operatorių, galite naudoti paruoštus. Kadangi kol kas logika triviali – pavyzdžiui, paimti duomenis iš laiško, siūlau standartinį PythonOperator

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Beje, jei jūsų įmonės paštas taip pat yra mail.ru, tada negalėsite ieškoti laiškų pagal temą, siuntėją ir pan. Dar 2016 metais žadėjo ją pristatyti, bet, matyt, persigalvojo. Šią problemą išsprendžiau sukūręs atskirą aplanką reikalingoms raidėms ir nustatęs reikiamų raidžių filtrą pašto žiniatinklio sąsajoje. Taigi į šį aplanką patenka tik būtinos raidės ir paieškos sąlygos, mano atveju, tiesiog (NEMATĖTAS).

Apibendrinant, turime tokią seką: patikriname, ar yra naujų raidžių, atitinkančių sąlygas, jei yra, tada atsisiunčiame archyvą naudodami nuorodą iš paskutinės raidės.
Po paskutiniais taškais yra praleista, kad šis archyvas bus išpakuotas, duomenys iš archyvo bus išvalyti ir apdoroti, ir dėl to viskas nukeliaus į ETL proceso vamzdyną, bet tai jau viršija straipsnio apimtis. Jei pasirodė įdomu ir naudinga, tada mielai toliau aprašysiu ETL sprendimus ir jų dalis, skirtus Apache Airflow.

Šaltinis: www.habr.com

Добавить комментарий