Overvågning af ETL-processer i et lille datavarehus

Mange bruger specialiserede værktøjer til at skabe procedurer til at udtrække, transformere og indlæse data i relationelle databaser. Processen med arbejdsværktøjer logges, fejl er rettet.

I tilfælde af fejl indeholder loggen information om, at værktøjet ikke kunne fuldføre opgaven, og hvilke moduler (ofte java) stoppede hvor. I de sidste linjer kan du finde en databasefejl, for eksempel en tabel-uniknøglebrud.

For at besvare spørgsmålet om, hvilken rolle ETL-fejlinformation spiller, har jeg klassificeret alle de problemer, der er opstået i løbet af de sidste to år, i et ret stort lager.

Overvågning af ETL-processer i et lille datavarehus

Databasefejl inkluderer ikke nok plads, mistet forbindelse, session hængt osv.

Logiske fejl omfatter f.eks. krænkelse af tabelnøgler, ugyldige objekter, manglende adgang til objekter osv.
Planlæggeren starter muligvis ikke til tiden, den kan fryse osv.

Simple fejl tager ikke lang tid at rette. En god ETL kan klare de fleste af dem alene.

Komplekse fejl gør det nødvendigt at opdage og teste procedurer for at arbejde med data, at udforske datakilder. Fører ofte til behovet for forandringstest og implementering.

Så halvdelen af ​​alle problemer er relateret til databasen. 48% af alle fejl er simple fejl.
En tredjedel af alle problemer er relateret til at ændre lagerlogikken eller -modellen, mere end halvdelen af ​​disse fejl er komplekse.

Og mindre end en fjerdedel af alle problemer er relateret til opgaveplanlæggeren, hvoraf 18% er simple fejl.

Generelt er 22 % af alle fejl, der opstår, komplekse, og deres rettelse kræver mest opmærksomhed og tid. De sker cirka en gang om ugen. Hvorimod simple fejl sker næsten hver dag.

Det er klart, at overvågning af ETL-processer vil være effektiv, når fejlplaceringen er angivet i loggen så nøjagtigt som muligt, og den minimale tid, der kræves for at finde kilden til problemet.

Effektiv overvågning

Hvad ville jeg se i ETL-overvågningsprocessen?

Overvågning af ETL-processer i et lille datavarehus
Start ved - da han begyndte at arbejde,
Kilde - datakilde,
Lag - hvilket lagerniveau indlæses,
ETL Jobnavn - upload procedure, som består af mange små trin,
Trinnummer - nummeret på det trin, der udføres,
Berørte rækker - hvor meget data er allerede blevet behandlet,
Varighed sek - hvor lang tid tager det,
Status - uanset om alt er i orden eller ej: OK, FEJL, KØRER, HÆNGER
Meddelelse - Sidste vellykkede meddelelse eller fejlbeskrivelse.

Baseret på status for posterne kan du sende en e-mail. brev til andre medlemmer. Hvis der ikke er fejl, er brevet ikke nødvendigt.

I tilfælde af en fejl er det således tydeligt angivet, hvor hændelsen fandt sted.

Nogle gange sker det, at selve overvågningsværktøjet ikke virker. I dette tilfælde er det muligt at kalde et view (view) direkte i databasen, som rapporten er bygget ud fra.

ETL overvågningstabel

For at implementere overvågning af ETL-processer er én tabel og én visning nok.

For at gøre dette kan du vende tilbage til din lille opbevaring og opret prototype i sqlite database.

DDL tabeller

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Se/rapporter DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Kontrollerer om det er muligt at få et nyt sessionsnummer

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Bordfunktioner:

  • starten og slutningen af ​​databehandlingsproceduren skal følges af trinene ETL_START og ETL_END
  • i tilfælde af en fejl, skal ETL_ERROR-trinnet med dets beskrivelse oprettes
  • mængden af ​​behandlede data skal fremhæves f.eks. med asterisker
  • den samme procedure kan startes på samme tid med parameteren force_restart=y, uden den udstedes sessionsnummeret kun til den afsluttede procedure
  • i normal tilstand kan du ikke køre den samme databehandlingsprocedure parallelt

De nødvendige operationer for at arbejde med et bord er som følger:

  • få sessionsnummeret for den kørende ETL-procedure
  • indsæt logindtastning i tabellen
  • at få den sidste vellykkede registrering af en ETL-procedure

I databaser som Oracle eller Postgres kan disse operationer implementeres som indbyggede funktioner. sqlite kræver en ekstern mekanisme, og i dette tilfælde er det prototype i PHP.

Output

Fejlmeddelelser i databehandlingsværktøjer spiller således en megavigtig rolle. Men det er svært at kalde dem optimale til hurtigt at finde årsagen til problemet. Når antallet af procedurer nærmer sig hundrede, bliver procesovervågning til et komplekst projekt.

Artiklen giver et eksempel på en mulig løsning på problemet i form af en prototype. Hele den lille repository prototype er tilgængelig i gitlab SQLite PHP ETL Utilities.

Kilde: www.habr.com

Tilføj en kommentar