Overvåking av ETL-prosesser i et lite datavarehus

Mange bruker spesialiserte verktøy for å lage prosedyrer for å trekke ut, transformere og laste data inn i relasjonsdatabaser. Prosessen med arbeidsverktøy logges, feil er fikset.

Ved feil inneholder loggen informasjon om at verktøyet ikke klarte å fullføre oppgaven og hvilke moduler (ofte java) som stoppet hvor. På de siste linjene kan du finne en databasefeil, for eksempel et brudd på en unik tabellnøkkel.

For å svare på spørsmålet om hvilken rolle ETL-feilinformasjon spiller, har jeg klassifisert alle problemene som har oppstått de siste to årene i et ganske stort depot.

Overvåking av ETL-prosesser i et lite datavarehus

Databasefeil inkluderer ikke nok plass, tapt tilkobling, økt hengt, etc.

Logiske feil inkluderer for eksempel brudd på tabellnøkler, ugyldige objekter, manglende tilgang til objekter, etc.
Planleggeren starter kanskje ikke i tide, den kan fryse osv.

Enkle feil tar ikke lang tid å fikse. En god ETL kan håndtere de fleste av dem på egen hånd.

Komplekse feil gjør det nødvendig å oppdage og teste prosedyrer for arbeid med data, for å utforske datakilder. Fører ofte til behov for endringstesting og implementering.

Så halvparten av alle problemer er relatert til databasen. 48 % av alle feil er enkle feil.
En tredjedel av alle problemer er knyttet til endring av lagringslogikk eller modell, mer enn halvparten av disse feilene er komplekse.

Og mindre enn en fjerdedel av alle problemer er relatert til oppgaveplanleggeren, hvorav 18 % er enkle feil.

Generelt er 22 % av alle feil som oppstår komplekse, og korrigeringen krever mest oppmerksomhet og tid. De skjer omtrent en gang i uken. Mens enkle feil skjer nesten hver dag.

Det er åpenbart at overvåking av ETL-prosesser vil være effektiv når plasseringen av feilen er angitt i loggen så nøyaktig som mulig og minimumstiden er nødvendig for å finne kilden til problemet.

Effektiv overvåking

Hva ønsket jeg å se i ETL-overvåkingsprosessen?

Overvåking av ETL-prosesser i et lite datavarehus
Start ved - da han begynte å jobbe,
Kilde - datakilde,
Lag - hvilket lagringsnivå som lastes,
ETL Job Name - opplastingsprosedyre, som består av mange små trinn,
Step Number - nummeret på trinnet som utføres,
Berørte rader – hvor mye data som allerede er behandlet,
Varighet sek - hvor lang tid det tar,
Status - om alt er i orden eller ikke: OK, FEIL, KJØRER, HENGER
Melding - Siste vellykkede melding eller feilbeskrivelse.

Basert på statusen til postene kan du sende en e-post. brev til andre medlemmer. Hvis det ikke er noen feil, er brevet ikke nødvendig.

Ved en feil er således stedet for hendelsen tydelig angitt.

Noen ganger hender det at selve overvåkingsverktøyet ikke fungerer. I dette tilfellet er det mulig å kalle opp en visning (visning) direkte i databasen, som rapporten er bygget på grunnlag av.

ETL overvåkingstabell

For å implementere overvåking av ETL-prosesser er én tabell og én visning nok.

For å gjøre dette kan du gå tilbake til din lille lagringsplass og lag prototype i sqlite-databasen.

DDL-tabeller

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Vis/rapporter DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Sjekker om det er mulig å få et nytt sesjonsnummer

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Tabellfunksjoner:

  • starten og slutten av databehandlingsprosedyren må følges av trinnene ETL_START og ETL_END
  • i tilfelle en feil, bør trinnet ETL_ERROR med beskrivelsen opprettes
  • mengden behandlede data bør fremheves, for eksempel med stjerner
  • samme prosedyre kan startes samtidig med force_restart=y parameteren, uten den utstedes sesjonsnummeret kun til den fullførte prosedyren
  • i normal modus kan du ikke kjøre den samme databehandlingsprosedyren parallelt

De nødvendige operasjonene for å jobbe med et bord er som følger:

  • får sesjonsnummeret til den kjørende ETL-prosedyren
  • sett inn loggoppføring i tabellen
  • få den siste vellykkede registreringen av en ETL-prosedyre

I databaser som Oracle eller Postgres kan disse operasjonene implementeres som innebygde funksjoner. sqlite krever en ekstern mekanisme, og i dette tilfellet prototype i PHP.

Utgang

Dermed spiller feilmeldinger i databehandlingsverktøy en megaviktig rolle. Men det er vanskelig å kalle dem optimale for raskt å finne årsaken til problemet. Når antallet prosedyrer nærmer seg hundre, blir prosessovervåking til et komplekst prosjekt.

Artikkelen gir et eksempel på en mulig løsning på problemet i form av en prototype. Hele prototypen for det lille depotet er tilgjengelig i gitlab SQLite PHP ETL-verktøy.

Kilde: www.habr.com

Legg til en kommentar