Monitorización de procesos ETL nun pequeno almacén de datos

Moitos usan ferramentas especializadas para crear procedementos para extraer, transformar e cargar datos en bases de datos relacionais. O proceso de ferramentas de traballo está rexistrado, os erros son corrixidos.

En caso de erro, o rexistro contén información de que a ferramenta non puido completar a tarefa e que módulos (a miúdo java) se detiveron onde. Nas últimas liñas, podes atopar un erro de base de datos, por exemplo, unha violación da clave única da táboa.

Para responder á pregunta de que papel xoga a información de erros de ETL, clasifiquei todos os problemas que ocorreron nos últimos dous anos nun repositorio bastante grande.

Monitorización de procesos ETL nun pequeno almacén de datos

Os erros da base de datos inclúen non espazo suficiente, conexión perdida, sesión bloqueada, etc.

Os erros lóxicos inclúen como violación de claves da táboa, obxectos non válidos, falta de acceso a obxectos, etc.
É posible que o programador non se inicie a tempo, pode conxelarse, etc.

Os erros sinxelos non tardan moito en corrixirse. Un bo ETL pode xestionar a maioría deles por si só.

Os erros complexos fan que sexa necesario descubrir e probar procedementos para traballar con datos, explorar fontes de datos. Moitas veces leva á necesidade de probas de cambios e implementación.

Entón, a metade de todos os problemas están relacionados coa base de datos. O 48% de todos os erros son erros simples.
Un terzo de todos os problemas están relacionados co cambio de lóxica ou modelo de almacenamento, máis da metade destes erros son complexos.

E menos da cuarta parte de todos os problemas están relacionados co programador de tarefas, o 18% dos cales son erros simples.

En xeral, o 22% de todos os erros que se producen son complexos, e a súa corrección require a maior atención e tempo. Ocorren aproximadamente unha vez á semana. Mentres que os erros simples ocorren case todos os días.

Obviamente, o seguimento dos procesos ETL será efectivo cando se indique a localización do erro no rexistro coa maior precisión posible e se requira o tempo mínimo para atopar a orixe do problema.

Monitorización eficaz

Que quería ver no proceso de seguimento de ETL?

Monitorización de procesos ETL nun pequeno almacén de datos
Comeza en - cando comezou a traballar,
Fonte - fonte de datos,
Capa: que nivel de almacenamento se está cargando,
Nome do traballo ETL: procedemento de carga, que consta de moitos pequenos pasos,
Número de paso: o número do paso que se está a realizar,
Filas afectadas: cantos datos xa foron procesados,
Duración segundos - canto tempo leva,
Estado: se todo está ben ou non: OK, ERRO, RUNNING, HANGS
Mensaxe: última mensaxe exitosa ou descrición do erro.

Segundo o estado das entradas, pode enviar un correo electrónico. carta a outros membros. Se non hai erros, a carta non é necesaria.

Así, no caso de producirse un erro, indícase claramente a localización do suceso.

Ás veces ocorre que a propia ferramenta de seguimento non funciona. Neste caso, é posible chamar a unha vista (vista) directamente na base de datos, a partir da cal se constrúe o informe.

Táboa de seguimento ETL

Para implementar o seguimento dos procesos ETL, unha táboa e unha vista son suficientes.

Para iso, podes volver a o teu pequeno almacenamento e crear un prototipo na base de datos sqlite.

Táboas DDL

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Ver/Informar DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Comprobando se é posible obter un novo número de sesión

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Características da táboa:

  • o inicio e o final do procedemento de procesamento de datos deben ir seguidos polos pasos ETL_START e ETL_END
  • en caso de erro, debe crearse o paso ETL_ERROR coa súa descrición
  • a cantidade de datos procesados ​​debe destacarse, por exemplo, con asteriscos
  • o mesmo procedemento pódese iniciar ao mesmo tempo co parámetro force_restart=y, sen el o número de sesión emítese só ao procedemento completado
  • no modo normal, non pode executar o mesmo procedemento de procesamento de datos en paralelo

As operacións necesarias para traballar cunha táboa son as seguintes:

  • obtendo o número de sesión do procedemento ETL en execución
  • inserir entrada de rexistro na táboa
  • obtendo o último rexistro exitoso dun procedemento ETL

En bases de datos como Oracle ou Postgres, estas operacións pódense implementar como funcións integradas. sqlite require un mecanismo externo, e neste caso iso prototipado en PHP.

Saída

Así, as mensaxes de erro nas ferramentas de procesamento de datos xogan un papel moi importante. Pero é difícil chamalos óptimos para atopar rapidamente a causa do problema. Cando o número de procedementos se achega a cen, entón o seguimento do proceso convértese nun proxecto complexo.

O artigo ofrece un exemplo dunha posible solución ao problema en forma de prototipo. Todo o pequeno prototipo do repositorio está dispoñible en gitlab Utilidades SQLite PHP ETL.

Fonte: www.habr.com

Engadir un comentario