Övervakning av ETL-processer i ett litet datalager

Många använder specialiserade verktyg för att skapa procedurer för att extrahera, transformera och ladda data till relationsdatabaser. Processen med arbetsverktyg loggas, fel åtgärdas.

I händelse av ett fel innehåller loggen information om att verktyget misslyckades med att slutföra uppgiften och vilka moduler (ofta java) som stannade var. På de sista raderna kan du hitta ett databasfel, till exempel en tabell unik nyckelöverträdelse.

För att svara på frågan om vilken roll ETL-felinformation spelar har jag klassificerat alla problem som har uppstått under de senaste två åren i ett ganska stort förråd.

Övervakning av ETL-processer i ett litet datalager

Databasfel inkluderar inte tillräckligt med utrymme, förlorad anslutning, session hängd, etc.

Logiska fel inkluderar såsom kränkning av tabellnycklar, ogiltiga objekt, bristande tillgång till objekt, etc.
Schemaläggaren kanske inte startar i tid, den kan frysa osv.

Enkla misstag tar inte lång tid att åtgärda. En bra ETL kan hantera de flesta av dem på egen hand.

Komplexa buggar gör det nödvändigt att upptäcka och testa procedurer för att arbeta med data, för att utforska datakällor. Leder ofta till behov av förändringstestning och implementering.

Så hälften av alla problem är relaterade till databasen. 48% av alla misstag är enkla misstag.
En tredjedel av alla problem är relaterade till att ändra lagringslogik eller modell, mer än hälften av dessa fel är komplexa.

Och mindre än en fjärdedel av alla problem är relaterade till uppgiftsschemaläggaren, varav 18% är enkla fel.

I allmänhet är 22 % av alla fel som uppstår komplexa, och deras korrigering kräver mest uppmärksamhet och tid. De händer ungefär en gång i veckan. Enkla misstag händer nästan varje dag.

Uppenbarligen kommer övervakning av ETL-processer att vara effektiv när felplatsen anges i loggen så exakt som möjligt och den minsta tid som krävs för att hitta källan till problemet.

Effektiv övervakning

Vad ville jag se i ETL-övervakningsprocessen?

Övervakning av ETL-processer i ett litet datalager
Börja vid - när han började arbeta,
Källa - datakälla,
Lager - vilken lagringsnivå som laddas,
ETL Job Name - uppladdningsprocedur, som består av många små steg,
Stegnummer - numret på steget som utförs,
Berörda rader – hur mycket data har redan bearbetats,
Varaktighet sek - hur lång tid det tar,
Status - om allt är bra eller inte: OK, FEL, KÖR, HÄNGER
Meddelande - Senaste lyckade meddelande eller felbeskrivning.

Baserat på statusen för posterna kan du skicka ett e-postmeddelande. brev till övriga medlemmar. Om det inte finns några fel, är brevet inte nödvändigt.

I händelse av ett fel är således platsen för händelsen tydligt angiven.

Ibland händer det att själva övervakningsverktyget inte fungerar. I det här fallet är det möjligt att anropa en vy (vy) direkt i databasen, utifrån vilken rapporten är uppbyggd.

ETL övervakningstabell

För att implementera övervakning av ETL-processer räcker det med en tabell och en vy.

För att göra detta kan du gå tillbaka till din lilla förvaring och skapa prototyp i SQLite-databasen.

DDL-tabeller

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Visa/rapportera DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Kontrollerar om det är möjligt att få ett nytt sessionsnummer

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Tabellfunktioner:

  • början och slutet av databehandlingsproceduren måste följas av stegen ETL_START och ETL_END
  • i händelse av ett fel ska steget ETL_ERROR med dess beskrivning skapas
  • mängden bearbetad data ska markeras, till exempel med asterisker
  • samma procedur kan startas samtidigt med parametern force_restart=y, utan den ges sessionsnumret endast till den avslutade proceduren
  • i normalt läge kan du inte köra samma databehandlingsprocedur parallellt

De nödvändiga operationerna för att arbeta med en tabell är följande:

  • få sessionsnumret för den pågående ETL-proceduren
  • infoga loggposten i tabellen
  • få den sista framgångsrika posten av en ETL-procedur

I databaser som Oracle eller Postgres kan dessa operationer implementeras som inbyggda funktioner. sqlite kräver en extern mekanism, och i det här fallet prototyp i PHP.

Utgång

Således spelar felmeddelanden i databehandlingsverktyg en megaviktig roll. Men det är svårt att kalla dem optimala för att snabbt hitta orsaken till problemet. När antalet procedurer närmar sig hundra blir processövervakning till ett komplext projekt.

Artikeln ger ett exempel på en möjlig lösning på problemet i form av en prototyp. Hela prototypen för det lilla förvaret finns tillgänglig i gitlab SQLite PHP ETL Utilities.

Källa: will.com

Lägg en kommentar