Surveillance des processus ETL dans un petit entrepôt de données

Beaucoup utilisent des outils spécialisés pour créer des procédures d'extraction, de transformation et de chargement de données dans des bases de données relationnelles. Le processus des outils de travail est enregistré, les erreurs sont corrigées.

En cas d'erreur, le journal contient des informations indiquant que l'outil n'a pas réussi à terminer la tâche et quels modules (souvent java) se sont arrêtés où. Dans les dernières lignes, vous pouvez trouver une erreur de base de données, par exemple, une violation de clé unique de table.

Pour répondre à la question du rôle joué par les informations d'erreur ETL, j'ai classé tous les problèmes survenus au cours des deux dernières années dans un référentiel assez volumineux.

Surveillance des processus ETL dans un petit entrepôt de données

Les erreurs de base de données incluent un espace insuffisant, une connexion perdue, une session suspendue, etc.

Les erreurs logiques incluent telles que la violation des clés de table, les objets non valides, le manque d'accès aux objets, etc.
Le planificateur peut ne pas démarrer à l'heure, il peut se bloquer, etc.

Les erreurs simples ne tardent pas à être corrigées. Un bon ETL peut gérer la plupart d'entre eux par lui-même.

Les bogues complexes obligent à découvrir et à tester des procédures pour travailler avec des données, pour explorer des sources de données. Conduisent souvent à la nécessité de tester et de déployer les modifications.

Ainsi, la moitié de tous les problèmes sont liés à la base de données. 48% de toutes les erreurs sont des erreurs simples.
Un tiers de tous les problèmes sont liés à la modification de la logique ou du modèle de stockage, plus de la moitié de ces erreurs sont complexes.

Et moins d'un quart de tous les problèmes sont liés au planificateur de tâches, dont 18% sont de simples erreurs.

En général, 22 % de toutes les erreurs qui se produisent sont complexes et leur correction nécessite le plus d'attention et de temps. Ils se produisent environ une fois par semaine. Alors que de simples erreurs se produisent presque tous les jours.

Il est évident que la surveillance des processus ETL sera efficace lorsque l'emplacement de l'erreur est indiqué dans le journal le plus précisément possible et qu'un minimum de temps est nécessaire pour trouver la source du problème.

Surveillance efficace

Que voulais-je voir dans le processus de surveillance ETL ?

Surveillance des processus ETL dans un petit entrepôt de données
Commencer à - quand il a commencé à travailler,
Source - source de données,
Couche - quel niveau de stockage est chargé,
ETL Job Name - procédure de téléchargement, qui consiste en plusieurs petites étapes,
Numéro d'étape - le numéro de l'étape en cours d'exécution,
Lignes affectées - combien de données ont déjà été traitées,
Durée sec - combien de temps cela prend,
Statut - si tout va bien ou non : OK, ERREUR, EN COURS, ACCROCHE
Message - Dernier message réussi ou description de l'erreur.

En fonction de l'état des enregistrements, vous pouvez envoyer un e-mail. lettre aux autres membres. S'il n'y a pas d'erreurs, la lettre n'est pas nécessaire.

Ainsi, en cas d'erreur, le lieu de l'incident est clairement indiqué.

Il arrive parfois que l'outil de surveillance lui-même ne fonctionne pas. Dans ce cas, il est possible d'appeler une vue (view) directement dans la base de données, sur la base de laquelle le rapport est construit.

Tableau de suivi ETL

Pour implémenter la surveillance des processus ETL, une table et une vue suffisent.

Pour ce faire, vous pouvez revenir à votre petit rangement et créer un prototype dans la base de données sqlite.

Tableaux DDL

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Afficher/Rapport DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Vérification s'il est possible d'obtenir un nouveau numéro de session

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Caractéristiques du tableau :

  • le début et la fin de la procédure de traitement des données doivent être suivis des étapes ETL_START et ETL_END
  • en cas d'erreur, l'étape ETL_ERROR avec sa description doit être créée
  • la quantité de données traitées doit être mise en évidence, par exemple, avec des astérisques
  • la même procédure peut être démarrée en même temps avec le paramètre force_restart=y, sans lui le numéro de session n'est attribué qu'à la procédure terminée
  • en mode normal, vous ne pouvez pas exécuter la même procédure de traitement de données en parallèle

Les opérations nécessaires pour travailler avec une table sont les suivantes :

  • obtenir le numéro de session de la procédure ETL en cours d'exécution
  • insérer une entrée de journal dans la table
  • obtenir le dernier enregistrement réussi d'une procédure ETL

Dans des bases de données telles qu'Oracle ou Postgres, ces opérations peuvent être implémentées en tant que fonctions intégrées. sqlite a besoin d'un mécanisme externe, et dans ce cas il prototypé en PHP.

conclusion

Ainsi, les messages d'erreur dans les outils de traitement de données jouent un rôle méga-important. Mais il est difficile de les qualifier d'optimaux pour trouver rapidement la cause du problème. Lorsque le nombre de procédures approche la centaine, alors le suivi des processus se transforme en un projet complexe.

L'article fournit un exemple d'une solution possible au problème sous la forme d'un prototype. L'ensemble du petit prototype de référentiel est disponible dans gitlab Utilitaires SQLite PHP ETL.

Source: habr.com

Ajouter un commentaire