Supervisión de procesos ETL en un pequeño almacén de datos

Muchos usan herramientas especializadas para crear procedimientos para extraer, transformar y cargar datos en bases de datos relacionales. El proceso de herramientas de trabajo se registra, los errores se corrigen.

En caso de error, el registro contiene información de que la herramienta no pudo completar la tarea y qué módulos (a menudo Java) se detuvieron y dónde. En las últimas líneas, puede encontrar un error de base de datos, por ejemplo, una violación de clave única de tabla.

Para responder a la pregunta de qué papel juega la información de errores de ETL, he clasificado todos los problemas que han ocurrido en los últimos dos años en un repositorio bastante grande.

Supervisión de procesos ETL en un pequeño almacén de datos

Los errores de la base de datos incluyen espacio insuficiente, conexión perdida, sesión colgada, etc.

Los errores lógicos incluyen, por ejemplo, la violación de las claves de la tabla, los objetos no válidos, la falta de acceso a los objetos, etc.
Es posible que el programador no comience a tiempo, que se congele, etc.

Los errores simples no tardan en corregirse. Un buen ETL puede manejar la mayoría de ellos por sí solo.

Los errores complejos hacen que sea necesario descubrir y probar procedimientos para trabajar con datos, para explorar fuentes de datos. A menudo conducen a la necesidad de pruebas e implementación de cambios.

Entonces, la mitad de todos los problemas están relacionados con la base de datos. El 48% de todos los errores son errores simples.
Un tercio de todos los problemas están relacionados con el cambio de la lógica o el modelo de almacenamiento, más de la mitad de estos errores son complejos.

Y menos de una cuarta parte de todos los problemas están relacionados con el programador de tareas, el 18 % de los cuales son errores simples.

En general, el 22% de todos los errores que ocurren son complejos y su corrección requiere la mayor atención y tiempo. Ocurren aproximadamente una vez por semana. Mientras que los errores simples ocurren casi todos los días.

Obviamente, el monitoreo de los procesos ETL será efectivo cuando la ubicación del error se indique en el registro con la mayor precisión posible y se requiera el tiempo mínimo para encontrar la fuente del problema.

Monitoreo efectivo

¿Qué quería ver en el proceso de monitoreo de ETL?

Supervisión de procesos ETL en un pequeño almacén de datos
Comenzar en - cuando comenzó a trabajar,
Fuente - fuente de datos,
Capa: qué nivel de almacenamiento se está cargando,
Nombre del trabajo ETL: procedimiento de carga, que consta de muchos pasos pequeños,
Número de paso: el número del paso que se está realizando,
Filas afectadas: cuántos datos ya se han procesado,
Duración seg - cuánto tiempo lleva,
Estado: si todo está bien o no: OK, ERROR, EN EJECUCIÓN, BLOQUEOS
Mensaje: último mensaje correcto o descripción del error.

Según el estado de las entradas, puede enviar un correo electrónico. carta a otros miembros. Si no hay errores, entonces la carta no es necesaria.

Así, en caso de error, se indica claramente la ubicación de la incidencia.

A veces sucede que la herramienta de monitoreo en sí no funciona. En este caso, es posible llamar una vista (vista) directamente en la base de datos, sobre la base de la cual se construye el informe.

Tabla de monitoreo de ETL

Para implementar el monitoreo de procesos ETL, una tabla y una vista son suficientes.

Para ello, puede volver a tu pequeño almacenamiento y crear un prototipo en la base de datos sqlite.

tablas DDL

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Ver/Informar DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Comprobando si es posible obtener un nuevo número de sesión

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Características de la mesa:

  • el inicio y el final del procedimiento de procesamiento de datos deben ser seguidos por los pasos ETL_START y ETL_END
  • en caso de error se debe crear el paso ETL_ERROR con su descripción
  • se debe resaltar la cantidad de datos procesados, por ejemplo, con asteriscos
  • el mismo procedimiento se puede iniciar al mismo tiempo con el parámetro force_restart=y, sin él, el número de sesión se emite solo para el procedimiento completado
  • en modo normal, no puede ejecutar el mismo procedimiento de procesamiento de datos en paralelo

Las operaciones necesarias para trabajar con una tabla son las siguientes:

  • obtener el número de sesión del procedimiento ETL en ejecución
  • insertar entrada de registro en la tabla
  • obtener el último registro exitoso de un procedimiento ETL

En bases de datos como Oracle o Postgres, estas operaciones se pueden implementar como funciones integradas. sqlite requiere un mecanismo externo, y en este caso es prototipo en PHP.

conclusión

Por lo tanto, los mensajes de error en las herramientas de procesamiento de datos juegan un papel muy importante. Pero es difícil llamarlos óptimos para encontrar rápidamente la causa del problema. Cuando el número de procedimientos se acerca a cien, el seguimiento del proceso se convierte en un proyecto complejo.

El artículo proporciona un ejemplo de una posible solución al problema en forma de prototipo. Todo el prototipo de repositorio pequeño está disponible en gitlab Utilidades SQLite PHP ETL.

Fuente: habr.com

Añadir un comentario