Monitorizarea proceselor ETL într-un depozit de date mic

Mulți folosesc instrumente specializate pentru a crea proceduri pentru extragerea, transformarea și încărcarea datelor în baze de date relaționale. Procesul de instrumente de lucru este înregistrat, erorile sunt remediate.

În cazul unei erori, jurnalul conține informații că instrumentul nu a reușit să finalizeze sarcina și care module (adesea java) s-au oprit unde. În ultimele rânduri, puteți găsi o eroare a bazei de date, de exemplu, o încălcare a cheii unice de tabel.

Pentru a răspunde la întrebarea ce rol joacă informațiile de eroare ETL, am clasificat toate problemele care au apărut în ultimii doi ani într-un depozit destul de mare.

Monitorizarea proceselor ETL într-un depozit de date mic

Erorile din baza de date includ spațiu insuficient, conexiune pierdută, sesiune blocată etc.

Erorile logice includ încălcarea cheilor de tabel, obiecte nevalide, lipsa accesului la obiecte etc.
Este posibil ca programatorul să nu pornească la timp, să înghețe etc.

Greșelile simple nu durează mult să fie remediate. Un ETL bun poate gestiona cele mai multe dintre ele singur.

Bug-urile complexe fac necesară descoperirea și testarea procedurilor de lucru cu date, explorarea surselor de date. Adesea duc la necesitatea testării schimbării și implementării.

Deci, jumătate din toate problemele sunt legate de baza de date. 48% din toate greșelile sunt greșeli simple.
O treime din toate problemele sunt legate de schimbarea logicii sau modelului de stocare, mai mult de jumătate dintre aceste erori sunt complexe.

Și mai puțin de un sfert din toate problemele sunt legate de programatorul de sarcini, dintre care 18% sunt erori simple.

În general, 22% din toate erorile care apar sunt complexe, iar corectarea lor necesită cea mai mare atenție și timp. Se întâmplă aproximativ o dată pe săptămână. În timp ce greșelile simple se întâmplă aproape în fiecare zi.

Evident, monitorizarea proceselor ETL va fi eficientă atunci când locația erorii este indicată în jurnal cât mai precis posibil și este necesar un timp minim pentru a găsi sursa problemei.

Monitorizare eficientă

Ce am vrut să văd în procesul de monitorizare ETL?

Monitorizarea proceselor ETL într-un depozit de date mic
Începe la - când a început să lucreze,
Sursă - sursă de date,
Strat - ce nivel de stocare este încărcat,
ETL Job Name - procedura de încărcare, care constă din mulți pași mici,
Numărul pasului - numărul pasului efectuat,
Rânduri afectate - câte date au fost deja procesate,
Durata sec - cât timp durează,
Stare - indiferent dacă totul este bine sau nu: OK, EROARE, RUNNING, HANGS
Mesaj - Ultimul mesaj de succes sau descrierea erorii.

Pe baza stării înregistrărilor, puteți trimite un e-mail. scrisoare către alți membri. Dacă nu există erori, atunci scrisoarea nu este necesară.

Astfel, în cazul unei erori, locul incidentului este clar indicat.

Uneori se întâmplă ca instrumentul de monitorizare în sine să nu funcționeze. În acest caz, este posibil să apelați o vizualizare (vizualizare) direct în baza de date, pe baza căreia este construit raportul.

Tabel de monitorizare ETL

Pentru a implementa monitorizarea proceselor ETL, un tabel și o vizualizare sunt suficiente.

Pentru a face acest lucru, puteți reveni la micul tău depozit și creați prototip în baza de date sqlite.

Tabelele DDL

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Vizualizați/Raportați DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Verificarea dacă este posibil să obțineți un număr nou de sesiune

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Caracteristicile mesei:

  • începutul și sfârșitul procedurii de prelucrare a datelor trebuie urmate de pașii ETL_START și ETL_END
  • în cazul unei erori, trebuie creat pasul ETL_ERROR cu descrierea acestuia
  • cantitatea de date prelucrate ar trebui evidențiată, de exemplu, cu asteriscuri
  • aceeași procedură poate fi pornită în același timp cu parametrul force_restart=y, fără acesta numărul de sesiune este emis doar procedurii finalizate
  • în modul normal, nu puteți rula aceeași procedură de procesare a datelor în paralel

Operațiile necesare pentru a lucra cu un tabel sunt următoarele:

  • obținerea numărului de sesiune al procedurii ETL care rulează
  • introduceți intrarea în jurnal în tabel
  • obținerea ultimei înregistrări cu succes a unei proceduri ETL

În bazele de date precum Oracle sau Postgres, aceste operațiuni pot fi implementate ca funcții încorporate. sqlite necesită un mecanism extern, iar în acest caz acesta prototip în PHP.

Producție

Astfel, mesajele de eroare din instrumentele de prelucrare a datelor joacă un rol mega-important. Dar este dificil să le numim optime pentru a găsi rapid cauza problemei. Când numărul de proceduri se apropie de o sută, atunci monitorizarea proceselor se transformă într-un proiect complex.

Articolul oferă un exemplu de soluție posibilă a problemei sub forma unui prototip. Întregul prototip de depozit mic este disponibil în gitlab Utilitare SQLite PHP ETL.

Sursa: www.habr.com

Adauga un comentariu