Pêvajoya ETL ji bo wergirtina daneyan ji e-nameyê di Apache Airflow de

Pêvajoya ETL ji bo wergirtina daneyan ji e-nameyê di Apache Airflow de

Çiqas teknolojî pêş dikeve jî, rêzek nêzîkatiyên kevnar her gav li paş pêşveçûnê dimeşin. Dibe ku ev ji ber veguheztinek nerm, faktorên mirovî, hewcedariyên teknolojîk, an tiştek din be. Di warê danûstendina daneyan de, di vê beşê de çavkaniyên daneyê yên herî diyar in. Her çi qas em xeyal dikin ku em ji vê xilas bibin, lê heya nuha beşek ji daneyan di qasid û e-nameyên tavilê de têne şandin, ku nebêjin formên arkaîkî. Ez we vedixwînim ku hûn yek ji vebijarkên ji bo Apache Airflow hilweşînin, ku destnîşan dike ka hûn çawa dikarin daneyan ji e-nameyê bigirin.

pêşdîrok

Gelek dane hîn jî bi e-nameyê ve têne veguheztin, ji danûstendinên kesane heya standardên danûstendina di navbera pargîdaniyan de. Baş e ger gengaz e ku meriv navbeynek binivîsîne da ku daneyan bi dest bixe an kesên ku dê vê agahiyê têxin nav çavkaniyên hêsantir binivîsin, lê pir caran dibe ku ev ne gengaz be. Karê taybetî yê ku ez pê re rû bi rû bûm girêdana pergala navdar CRM bi depoya daneyê, û dûv re jî bi pergala OLAP re bû. Wusa di dîrokê de qewimî ku ji bo pargîdaniya me karanîna vê pergalê li herêmek taybetî ya karsaziyê hêsan bû. Ji ber vê yekê, her kesî bi rastî dixwest ku bikaribe bi daneyên vê pergala sêyemîn-sêyemîn re jî bixebite. Berî her tiştî, bê guman, îmkana wergirtina daneyan ji API-ya vekirî hate lêkolîn kirin. Mixabin, API girtina hemî daneyên pêwîst venegirt, û, bi gotinên hêsan, ew bi gelek awayan xelet bû, û piştgiriya teknîkî nexwest an jî nikarîbû nîvê rê bigire da ku fonksiyonek berfireh peyda bike. Lê vê pergalê fersendê da ku bi awayekî periyodîk daneyên wenda bi riya posteyê di forma girêdanek ji bo rakirina arşîvê de werbigire.

Divê were zanîn ku ev ne tenê bûyer bû ku karsazî dixwest ji e-name an peyamberên tavilê daneyan berhev bike. Lêbelê, di vê rewşê de, me nekarî bandor li pargîdaniyek sêyemîn a ku beşek daneyê tenê bi vî rengî peyda dike bandor bike.

hewaya apache

Ji bo avakirina pêvajoyên ETL, em pir caran Apache Airflow bikar tînin. Ji bo ku xwendevanek ku bi vê teknolojiyê nizane çêtir fam bike ka ew di çarçovê de û bi gelemperî çawa xuya dike, ez ê çend pêşgotinê vebêjim.

Apache Airflow platformek belaş e ku ji bo avakirin, bicihanîn û şopandina pêvajoyên ETL (Extract-Transform-Loading) di Python de tê bikar anîn. Têgeha bingehîn a di Airflow de grafiyek aciklîk a rêvekirî ye, ku tê de berikên grafîkê pêvajoyên taybetî ne, û keviyên grafîkê herikîna kontrolê an agahdarî ne. Pêvajoyek bi hêsanî dikare bangî her fonksiyonek Python bike, an jî dikare ji gazîkirina çend fonksiyonan di çarçoweya polê de mentiqek tevlihevtir hebe. Ji bo operasyonên herî pir caran, jixwe gelek pêşkeftinên amade hene ku dikarin wekî pêvajoyê werin bikar anîn. Pêşveçûnên weha hene:

  • operator - ji bo veguheztina daneyan ji cîhek cîhek din, mînakî, ji tabloyek databasê berbi depoyek daneyê;
  • senzor - ji bo ku li benda rûdana bûyerek diyar bin û rêvekirina herikîna kontrolê berbi qonaxên paşîn ên grafîkê ve;
  • hook - ji bo operasyonên asta jêrîn, mînakî, ji bo wergirtina daneyan ji tabloyek databasê (di daxuyaniyan de tê bikar anîn);
  • û vî awayî.

Dê negunca be ku di vê gotarê de Apache Airflow bi hûrgulî were ravekirin. Pêşgotinên kurt dikarin werin dîtin vir an vir.

Ji bo bidestxistina daneyan qut bikin

Berî her tiştî, ji bo çareserkirina pirsgirêkê, em hewce ne ku hookek binivîsin ku em pê dikarin:

  • bi e-nameyê ve girêdin
  • nameya rast bibînin
  • daneyên ji nameyê bistînin.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Mantiq ev e: em bi hev ve girêdidin, nameya herî têkildar a paşîn dibînin, heke yên din hebin, em wan paşguh dikin. Ev fonksiyon tê bikar anîn, ji ber ku tîpên paşîn hemî daneyên yên berê hene. Ger ne wusa be, wê hingê hûn dikarin rêzek ji hemî tîpan vegerînin, an ya yekem, û yên mayî jî li ser pasa paşîn bişopînin. Bi gelemperî, her tişt, wekî her gav, bi peywirê ve girêdayî ye.

Em du fonksiyonên alîkar li hookê zêde dikin: ji bo dakêşana pelê û dakêşana pelê bi karanîna girêdanek ji e-nameyê. Bi awayê, ew dikarin ji operatorê re werin veguheztin, ew bi frekansa karanîna vê fonksiyonê ve girêdayî ye. Tiştê din ku meriv li qulikê zêde bike, dîsa bi peywirê ve girêdayî ye: heke pelan tavilê di nameyê de werin wergirtin, wê hingê hûn dikarin pêvekên nameyê dakêşin, heke data di nameyê de werin wergirtin, wê hingê hûn hewce ne ku nameyê parsek bikin, etc. Di doza min de, nameyek bi yek lînka arşîvê re tê, ku ez hewce dikim ku li cîhek diyar bikim û pêvajoya pêvajoyek din dest pê bikim.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Kod hêsan e, ji ber vê yekê ew ne hewce ye ku bêtir şirove bike. Ez ê tenê li ser xeta sêrbaz imap_conn_id ji we re bibêjim. Apache Airflow parametreyên pêwendiyê (têketin, şîfre, navnîşan, û pîvanên din) yên ku ji hêla nasnavek rêzikê ve têne gihîştin hilîne. Bi dîtbarî, rêveberiya girêdanê bi vî rengî xuya dike

Pêvajoya ETL ji bo wergirtina daneyan ji e-nameyê di Apache Airflow de

Sensor ku li benda daneyê bimîne

Ji ber ku em jixwe dizanin ku meriv çawa ji e-nameyê daneyan girêdide û werdigire, naha em dikarin senzorek binivîsin ku li benda wan bisekinin. Di doza min de, nexebite ku tavilê operatorek binivîsîne ku dê daneyan pêvajoyê bike, heke hebe, ji ber ku pêvajoyên din li ser bingeha daneyên ku ji nameyê hatine wergirtin dixebitin, tevî yên ku daneyên têkildar ji çavkaniyên din digirin (API, têlefonî. , metrîkên malperê, hwd.) hwd.). Ez ji we re mînakek bidim. Bikarhênerek nû di pergala CRM de derketiye, û em hîn jî di derbarê UUID-a wî de nizanin. Dûv re, dema ku em hewl bidin ku daneyan ji têlefoniya SIP-ê bistînin, em ê bangên bi UUID-a wê ve girêdayî ne bistînin, lê em ê nikaribin wan rast hilînin û bikar bînin. Di mijarên weha de, girîng e ku meriv pêwendiya daneyan ji bîr neke, nemaze heke ew ji çavkaniyên cihê ne. Ev, bê guman, tedbîrên ne bes in ji bo parastina yekrêziya daneyê, lê di hin rewşan de ew hewce ne. Erê, û betalkirina ji bo dagirkirina çavkaniyan jî bêaqil e.

Ji ber vê yekê, senzora me heke di e-nameyê de agahdariya nû hebe, dê rêzikên paşîn ên grafîkê bide destpêkirin, û di heman demê de agahdariya berê wekî negirêdayî nîşan bide.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Em daneyan distînin û bikar tînin

Ji bo wergirtin û pêvajoykirina daneyan, hûn dikarin operatorek cûda binivîsin, hûn dikarin yên amade bikar bînin. Ji ber ku mantiq hîn jî sivik e - mînakî ji bo girtina daneyan ji nameyê, ez PythonOperatora standard pêşniyar dikim.

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Bi awayê, heke nameya weya pargîdanî jî li ser mail.ru be, wê hingê hûn ê nikaribin li nameyan li gorî mijar, şander, hwd bigerin. Di sala 2016-an de, wan soz da ku wê bidin nasîn, lê xuya ye ku ramana xwe guhertiye. Min ev pirsgirêk bi afirandina peldankek cihêreng ji bo tîpên pêwîst û sazkirina parzûnek ji bo tîpên pêwîst di navgîniya tevna nameyê de çareser kir. Bi vî rengî, tenê tîp û şertên hewce yên ji bo lêgerînê, di doza min de, bi tenê (BÊNÎNE) dikevin vê peldankê.

Bi kurtasî, rêzika jêrîn li me heye: em kontrol dikin ka tîpên nû hene ku şertan pêk tînin, heke hebin, wê hingê em bi karanîna lînka nameya paşîn arşîvê dakêşin.
Di bin xalên paşîn de, ji holê radibe ku ev arşîv dê bê pakkirin, daneyên ji arşîvê dê werin paqijkirin û pêvajo kirin, û di encamê de, hemî tişt dê berbi lûleya pêvajoya ETL ve biçe, lê ev jixwe wêdetir e. çarçoveya gotarê. Ger ew balkêş û kêrhatî derket, wê hingê ez ê bi kêfxweşî berdewam bikim ku çareseriyên ETL û beşên wan ji bo Apache Airflow rave bikim.

Source: www.habr.com

Add a comment