Fizotry ny ETL hahazoana angon-drakitra amin'ny mailaka amin'ny Apache Airflow

Fizotry ny ETL hahazoana angon-drakitra amin'ny mailaka amin'ny Apache Airflow

Na manao ahoana na manao ahoana ny fivoaran'ny teknôlôjia, ny fomba fiasa efa lany andro dia manaraka hatrany ny fampandrosoana. Mety ho noho ny fifindrana milamina, ny anton'ny maha-olombelona, ​​​​ny filana ara-teknolojia, na zavatra hafa. Eo amin'ny sehatry ny fanodinana angon-drakitra, ny loharanom-baovao no tena mampiharihary indrindra amin'ity ampahany ity. Na manao ahoana na manao ahoana ny manonofinofy manala izany, fa hatramin'izao ny ampahany amin'ny angon-drakitra dia alefa amin'ny iraka sy mailaka eo noho eo, tsy lazaina intsony ny endrika tranainy kokoa. Manasa anao aho hamongotra ny iray amin'ireo safidy ho an'ny Apache Airflow, mampiseho ny fomba ahafahanao maka data amin'ny mailaka.

prehistory

Mbola betsaka ny angona afindra amin'ny alàlan'ny mailaka, manomboka amin'ny fifandraisana amin'ny olona ka hatramin'ny fenitry ny fifandraisana eo amin'ny orinasa. Tsara raha azo atao ny manoratra interface tsara hahazoana angon-drakitra na mametraka olona ao amin'ny birao izay hampiditra ity fampahalalana ity amin'ny loharano mety kokoa, saingy matetika dia mety tsy ho azo atao izany. Ny asa manokana natrehako dia ny fampifandraisana ny rafitra CRM malaza amin'ny trano fanatobiana data, ary avy eo amin'ny rafitra OLAP. Izany no nitranga ara-tantara fa ho an'ny orinasanay ny fampiasana ity rafitra ity dia mety amin'ny sehatry ny orinasa iray. Noho izany, ny rehetra dia tena naniry ny ho afaka miasa miaraka amin'ny angona avy amin'ity rafitra antoko fahatelo ity ihany koa. Voalohany indrindra, mazava ho azy, ny mety hahazoana data avy amin'ny API misokatra dia nodinihina. Indrisy anefa, ny API dia tsy nandrakotra ny fahazoana ny angon-drakitra ilaina rehetra, ary, amin'ny teny tsotra, dia nivadika tamin'ny fomba maro, ary ny fanohanana ara-teknika dia tsy naniry na tsy afaka nihaona tamin'ny antsasany mba hanomezana fiasa feno kokoa. Saingy ity rafitra ity dia nanome fahafahana handray tsindraindray ny angon-drakitra tsy hita amin'ny alàlan'ny mailaka amin'ny endrika rohy amin'ny famoahana ny arisiva.

Marihina fa tsy io ihany no tian'ny orinasa hanangona angona avy amin'ny mailaka na iraka avy hatrany. Na izany aza, amin'ity tranga ity, tsy afaka mitaona orinasa antoko fahatelo izay manome ampahany amin'ny angon-drakitra amin'ity fomba ity ihany izahay.

apache airflow

Mba hananganana fizotry ny ETL dia matetika mampiasa Apache Airflow izahay. Mba hahafahan'ny mpamaky iray tsy mahazatra an'ity teknolojia ity hahatakatra tsara kokoa ny fijery azy amin'ny teny manodidina sy amin'ny ankapobeny, dia hamaritra ireo fampidirana roa aho.

Apache Airflow dia sehatra maimaim-poana izay ampiasaina hananganana, hanatanterahana ary hanaraha-maso ny fizotran'ny ETL (Extract-Transform-Loading) amin'ny Python. Ny foto-kevitra fototra ao amin'ny Airflow dia graph acyclic mivantana, izay ny vertices amin'ny grafika dia dingana manokana, ary ny sisin'ny grafika dia ny fikorianan'ny fanaraha-maso na fampahalalana. Ny dingana iray dia afaka miantso tsotra fotsiny ny fiasan'ny Python, na mety manana lojika sarotra kokoa amin'ny fiantsoana fiasa maromaro amin'ny tontolon'ny kilasy iray. Ho an'ny fampandehanana matetika dia efa betsaka ny fivoarana efa vita izay azo ampiasaina ho dingana. Ny fivoarana toy izany dia ahitana:

  • operators - ho an'ny famindrana angona avy amin'ny toerana iray mankany amin'ny iray hafa, ohatra, avy amin'ny latabatra database mankany amin'ny trano fanatobiana data;
  • sensor - amin'ny fiandrasana ny fisehoan-javatra iray sy ny fitarihana ny fikorianan'ny fanaraha-maso mankany amin'ny vertices manaraka ny grafika;
  • hooks - ho an'ny asa ambany kokoa, ohatra, mba hahazoana angona avy amin'ny latabatra database (ampiasaina amin'ny fanambarana);
  • sy ny sisa.

Tsy mety ny mamaritra ny Apache Airflow amin'ny antsipiriany ato amin'ity lahatsoratra ity. Azo jerena ny fampidiran-dresaka fohy eto na eto.

Hook mba hahazoana data

Voalohany indrindra, mba hamahana ny olana dia mila manoratra hook izay azontsika atao:

  • mifandray amin'ny mailaka
  • tadiavo ny taratasy mety
  • mandray angona avy amin'ny taratasy.

from airflow.hooks.base_hook import BaseHook
import imaplib
import logging

class IMAPHook(BaseHook):
    def __init__(self, imap_conn_id):
        """
           IMAP hook для получения данных с электронной почты

           :param imap_conn_id:       Идентификатор подключения к почте
           :type imap_conn_id:        string
        """
        self.connection = self.get_connection(imap_conn_id)
        self.mail = None

    def authenticate(self):
        """ 
            Подключаемся к почте
        """
        mail = imaplib.IMAP4_SSL(self.connection.host)
        response, detail = mail.login(user=self.connection.login, password=self.connection.password)
        if response != "OK":
            raise AirflowException("Sign in failed")
        else:
            self.mail = mail

    def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
        """
            Метод для получения идентификатора последнего письма, 
            удовлетвораяющего условиям поиска

            :param check_seen:      Отмечать последнее письмо как прочитанное
            :type check_seen:       bool
            :param box:             Наименования ящика
            :type box:              string
            :param condition:       Условия поиска писем
            :type condition:        string
        """
        self.authenticate()
        self.mail.select(mailbox=box)
        response, data = self.mail.search(None, condition)
        mail_ids = data[0].split()
        logging.info("В ящике найдены следующие письма: " + str(mail_ids))

        if not mail_ids:
            logging.info("Не найдено новых писем")
            return None

        mail_id = mail_ids[0]

        # если таких писем несколько
        if len(mail_ids) > 1:
            # отмечаем остальные прочитанными
            for id in mail_ids:
                self.mail.store(id, "+FLAGS", "\Seen")

            # возвращаем последнее
            mail_id = mail_ids[-1]

        # нужно ли отметить последнее прочитанным
        if not check_seen:
            self.mail.store(mail_id, "-FLAGS", "\Seen")

        return mail_id

Ny lojika dia izao: mifandray isika, mitadiava ny taratasy farany manan-danja indrindra, raha misy hafa dia tsy raharahantsika. Ity asa ity dia ampiasaina, satria ny litera any aoriana dia misy ny angon-drakitra rehetra an'ny teo aloha. Raha tsy izany no izy, dia azonao atao ny mamerina andiana litera rehetra, na manamboatra ny voalohany, ary ny ambiny amin'ny pass manaraka. Amin'ny ankapobeny, ny zava-drehetra, toy ny mahazatra, dia miankina amin'ny asa.

Manampy asa fanampiny roa amin'ny hook izahay: amin'ny fampidinana rakitra ary amin'ny fampidinana rakitra amin'ny alàlan'ny rohy avy amin'ny mailaka. Teny an-dalana, azo afindra any amin'ny mpandraharaha izy ireo, miankina amin'ny fatran'ny fampiasana an'io fiasa io. Inona koa no ampiana amin'ny hook indray, miankina amin'ny asa: raha voaray avy hatrany ao amin'ny taratasy ny rakitra, dia azonao atao ny misintona ny fametahana amin'ny taratasy, raha voaray amin'ny taratasy ny angon-drakitra, dia mila manara-maso ny taratasy ianao, Sns Raha ny amiko, ny taratasy dia miaraka amin'ny rohy iray mankany amin'ny arisiva, izay ilaiko apetraka amin'ny toerana iray ary manomboka ny dingana fanodinana bebe kokoa.

    def download_from_url(self, url, path, chunk_size=128):
        """
            Метод для скачивания файла

            :param url:              Адрес загрузки
            :type url:               string
            :param path:             Куда положить файл
            :type path:              string
            :param chunk_size:       По сколько байтов писать
            :type chunk_size:        int
        """
        r = requests.get(url, stream=True)
        with open(path, "wb") as fd:
            for chunk in r.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

    def download_mail_href_attachment(self, mail_id, path):
        """
            Метод для скачивания файла по ссылке из письма

            :param mail_id:         Идентификатор письма
            :type mail_id:          string
            :param path:            Куда положить файл
            :type path:             string
        """
        response, data = self.mail.fetch(mail_id, "(RFC822)")
        raw_email = data[0][1]
        raw_soup = raw_email.decode().replace("r", "").replace("n", "")
        parse_soup = BeautifulSoup(raw_soup, "html.parser")
        link_text = ""

        for a in parse_soup.find_all("a", href=True, text=True):
            link_text = a["href"]

        self.download_from_url(link_text, path)

Tsotra ny code, ka zara raha mila fanazavana fanampiny. Holazaiko anao fotsiny ny momba ny tsipika majika imap_conn_id. Apache Airflow dia mitahiry ny mari-pamantarana fifandraisana (fidirana, tenimiafina, adiresy, ary mari-pamantarana hafa) izay azo idirana amin'ny alàlan'ny famantarana tady. Raha jerena amin'ny maso dia toa izao ny fitantanana fifandraisana

Fizotry ny ETL hahazoana angon-drakitra amin'ny mailaka amin'ny Apache Airflow

Sensor hiandry angona

Koa satria efa haintsika ny mampifandray sy mandray angona avy amin'ny mailaka, dia afaka manoratra sensor isika izao mba hiandry azy ireo. Raha ny ahy dia tsy nahomby ny fanoratana mpandraharaha avy hatrany izay handamina ny angon-drakitra, raha misy, satria ny dingana hafa dia miasa mifototra amin'ny angon-drakitra voaray avy amin'ny mailaka, anisan'izany ireo izay maka data mifandraika amin'ny loharano hafa (API, telephony). , metrika tranonkala, sns.). sns.). Omeko ohatra ianao. Nisy mpampiasa vaovao niseho tao amin'ny rafitra CRM, ary mbola tsy fantatsika ny momba ny UUID-ny. Avy eo, rehefa manandrana mandray angona avy amin'ny SIP telephony isika, dia hahazo antso mifatotra amin'ny UUID-ny, saingy tsy afaka mitahiry sy mampiasa azy ireo araka ny tokony ho izy. Amin'ny tranga toy izany dia ilaina ny mitadidy ny fiankinan'ny angon-drakitra, indrindra raha avy amin'ny loharano samihafa izy ireo. Mazava ho azy fa fepetra tsy ampy hitandrovana ny fahamarinan'ny angon-drakitra ireo, saingy amin'ny tranga sasany dia ilaina izany. Eny, ary tsy mitombina ihany koa ny fidonanaham-poana mibodo harena.

Noho izany, ny sensor-tsika dia hanomboka ny vertices manaraka ny grafika raha misy vaovao vaovao ao amin'ny mailaka, ary koa manamarika ny vaovao teo aloha ho tsy manan-danja.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from my_plugin.hooks.imap_hook import IMAPHook

class MailSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.check_seen = check_seen
        self.box = box
        self.condition = condition

    def poke(self, context):
        conn = IMAPHook(self.conn_id)
        mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)

        if mail_id is None:
            return False
        else:
            return True

Mandray sy mampiasa data izahay

Mba handraisana sy hikarakarana angon-drakitra dia azonao atao ny manoratra mpandraharaha misaraka, azonao ampiasaina ireo efa vita. Satria mbola tsy misy dikany ny lojika - mba haka angona avy amin'ny taratasy, ohatra, dia manoro hevitra ny PythonOperator mahazatra aho

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.sensors.my_plugin import MailSensor
from my_plugin.hooks.imap_hook import IMAPHook

start_date = datetime(2020, 4, 4)

# Стандартное конфигурирование графа
args = {
    "owner": "example",
    "start_date": start_date,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=15),
    "provide_context": False,
}

dag = DAG(
    dag_id="test_etl",
    default_args=args,
    schedule_interval="@hourly",
)

# Определяем сенсор
mail_check_sensor = MailSensor(
    task_id="check_new_emails",
    poke_interval=10,
    conn_id="mail_conn_id",
    timeout=10,
    soft_fail=True,
    box="my_box",
    dag=dag,
    mode="poke",
)

# Функция для получения данных из письма
def prepare_mail():
    imap_hook = IMAPHook("mail_conn_id")
    mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
    if mail_id is None:
        raise AirflowException("Empty mailbox")

    conn.download_mail_href_attachment(mail_id, "./path.zip")

prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)

# Описание остальных вершин графа
...

# Задаем связь на графе
mail_check_sensor >> prepare_mail_data
prepare_data >> ...
# Описание остальных потоков управления

Raha ny marina, raha ao amin'ny mail.ru koa ny mailakao orinasa, dia tsy ho afaka hikaroka taratasy amin'ny lohahevitra, mpandefa, sns ianao. Tamin’ny taona 2016 no nampanantenain’izy ireo ny hampiditra izany, saingy hita fa niova hevitra. Namaha ity olana ity aho tamin'ny famoronana lahatahiry misaraka ho an'ireo litera ilaina ary fametrahana sivana ho an'ireo litera ilaina ao amin'ny serasera web mail. Noho izany, ny litera sy ny fepetra ilaina amin'ny fikarohana ihany, raha ny amiko, dia miditra amin'ity lahatahiry ity fotsiny (tsy hita).

Raha fintinina dia manana izao filaharana manaraka izao isika: manamarina raha misy litera vaovao mifanaraka amin'ny fepetra, raha misy, dia alainay ny arisiva amin'ny alàlan'ny rohy avy amin'ny taratasy farany.
Eo ambanin'ny teboka farany, dia nesorina fa ity arsiva ity dia hovoahana, ny angon-drakitra avy amin'ny arisiva dia hodiovina sy hokarakaraina, ary vokatr'izany, ny zava-drehetra dia handeha lavitra kokoa amin'ny fantsona ny fizotran'ny ETL, saingy efa mihoatra izany. ny faritry ny lahatsoratra. Raha toa ka mahaliana sy mahasoa izany, dia ho faly aho hanohy hamaritra ny vahaolana ETL sy ny ampahany amin'ny Apache Airflow.

Source: www.habr.com

Add a comment