Monitoraggio dei processi ETL in un piccolo data warehouse

Molti utilizzano strumenti specializzati per creare procedure per l'estrazione, la trasformazione e il caricamento dei dati in database relazionali. Il processo degli strumenti di lavoro viene registrato, gli errori vengono corretti.

In caso di errore, il registro contiene informazioni che lo strumento non è riuscito a completare l'attività e quali moduli (spesso java) si sono fermati e dove. Nelle ultime righe è possibile trovare un errore del database, ad esempio una violazione della chiave univoca della tabella.

Per rispondere alla domanda sul ruolo svolto dalle informazioni sugli errori ETL, ho classificato tutti i problemi che si sono verificati negli ultimi due anni in un repository piuttosto ampio.

Monitoraggio dei processi ETL in un piccolo data warehouse

Gli errori del database includono spazio insufficiente, connessione persa, blocco della sessione e così via.

Gli errori logici includono la violazione delle chiavi della tabella, oggetti non validi, mancanza di accesso agli oggetti, ecc.
Lo scheduler potrebbe non avviarsi in tempo, potrebbe bloccarsi, ecc.

Gli errori semplici non richiedono molto tempo per essere corretti. Un buon ETL può gestirne la maggior parte da solo.

Bug complessi rendono necessario scoprire e testare le procedure per lavorare con i dati, per esplorare le fonti dei dati. Spesso portano alla necessità di test e implementazione delle modifiche.

Quindi, la metà di tutti i problemi sono legati al database. Il 48% di tutti gli errori sono errori semplici.
Un terzo di tutti i problemi è legato alla modifica della logica o del modello di archiviazione, più della metà di questi errori sono complessi.

E meno di un quarto di tutti i problemi sono legati all'utilità di pianificazione, il 18% dei quali sono semplici errori.

In generale, il 22% di tutti gli errori che si verificano sono complessi e la loro correzione richiede la massima attenzione e tempo. Succedono circa una volta alla settimana. Considerando che gli errori semplici accadono quasi ogni giorno.

Ovviamente, il monitoraggio dei processi ETL sarà efficace quando la posizione dell'errore sarà indicata nel registro nel modo più accurato possibile e sarà necessario il tempo minimo per trovare l'origine del problema.

Monitoraggio efficace

Cosa volevo vedere nel processo di monitoraggio ETL?

Monitoraggio dei processi ETL in un piccolo data warehouse
Inizia da - quando ha iniziato a lavorare,
Fonte - origine dati,
Livello: quale livello di archiviazione viene caricato,
ETL Job Name - procedura di caricamento, che consiste in tanti piccoli passaggi,
Numero del passaggio: il numero del passaggio eseguito,
Righe interessate: quanti dati sono già stati elaborati,
Durata sec - quanto tempo ci vuole,
Stato: se tutto va bene o no: OK, ERRORE, IN CORSO, BLOCCATO
Messaggio: ultimo messaggio riuscito o descrizione dell'errore.

In base allo stato dei record, puoi inviare un'e-mail. lettera agli altri soci. Se non ci sono errori, la lettera non è necessaria.

Pertanto, in caso di errore, il luogo dell'incidente è chiaramente indicato.

A volte capita che lo strumento di monitoraggio stesso non funzioni. In questo caso, è possibile richiamare una vista (vista) direttamente nel database, sulla base della quale viene costruito il report.

Tabella di monitoraggio ETL

Per implementare il monitoraggio dei processi ETL sono sufficienti una tabella e una vista.

Per fare ciò, puoi tornare a il tuo piccolo deposito e creare prototipi nel database sqlite.

Tabelle DDL

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Visualizza/Segnala DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Verifica se è possibile ottenere un nuovo numero di sessione

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Caratteristiche del tavolo:

  • l'inizio e la fine della procedura di trattamento dei dati devono essere seguiti dai passi ETL_START e ETL_END
  • in caso di errore, deve essere creato il passo ETL_ERROR con la sua descrizione
  • la quantità di dati trattati deve essere evidenziata, ad esempio, con asterischi
  • la stessa procedura può essere avviata contemporaneamente con il parametro force_restart=y, senza di esso il numero di sessione viene rilasciato solo alla procedura completata
  • in modalità normale non è possibile eseguire in parallelo la stessa procedura di elaborazione dati

Le operazioni necessarie per lavorare con una tabella sono le seguenti:

  • ottenere il numero di sessione della procedura ETL in esecuzione
  • inserire la voce di registro nella tabella
  • ottenere l'ultimo record di successo di una procedura ETL

In database come Oracle o Postgres, queste operazioni possono essere implementate come funzioni integrate. sqlite richiede un meccanismo esterno, e in questo caso it prototipato in PHP.

conclusione

Pertanto, i messaggi di errore negli strumenti di elaborazione dei dati svolgono un ruolo estremamente importante. Ma è difficile definirli ottimali per trovare rapidamente la causa del problema. Quando il numero di procedure si avvicina al centinaio, il monitoraggio del processo si trasforma in un progetto complesso.

L'articolo fornisce un esempio di una possibile soluzione al problema sotto forma di prototipo. L'intero prototipo del piccolo repository è disponibile in gitlab Utilità SQLite PHP ETL.

Fonte: habr.com

Aggiungi un commento