Моніторинг ETL-процесів у невеликому сховищі даних

Багато хто використовує спеціалізовані інструменти для створення процедур вилучення, трансформації та завантаження даних у реляційні бази даних. Процес роботи інструментів логується, помилки фіксуються.

У разі помилки в лозі міститься інформація про те, що інструменту не вдалося виконати завдання та які модулі (часто це java) де зупинилися. В останніх рядках можна знайти помилку бази даних, наприклад, порушення унікального ключа таблиці.

Щоб відповісти на запитання, яку роль відіграє інформація про помилки ETL, я класифікував усі проблеми, що сталися за останні два роки у сховищі.

Моніторинг ETL-процесів у невеликому сховищі даних

До помилок бази даних ставляться такі, як забракло місця, обірвалося з'єднання, зависла сесія тощо.

До логічних помилок належать такі як порушення ключів таблиці, не валідні об'єкти, відсутність доступу до об'єктів тощо.
Планувальник може бути запущений вчасно, може зависнути і т.п.

Прості помилки не вимагають багато часу на виправлення. Здебільшого хороший ETL вміє справлятися самостійно.

Складні помилки викликають необхідність відкривати та перевіряти процедури роботи з даними, досліджувати джерела даних. Часто призводять до необхідності тестування змін та деплойменту.

Отже, половина всіх проблем пов'язана із базою даних. 48% всіх помилок – це прості помилки.
p align="justify"> Третя частина всіх проблем пов'язана зі зміною логіки або моделі сховища, більше половини цих помилок є складними.

І менше чверті всіх проблем пов'язано із планувальником завдань, 18% яких — це прості помилки.

Загалом, 22% всіх помилок є складними, їх виправлення вимагає найбільшої уваги та часу. Відбуваються вони приблизно раз на тиждень. У той час, як прості помилки трапляються майже щодня.

Очевидно, що моніторинг ETL-процесів буде тоді ефективним, коли в лозі максимально точно вказано місце помилки та потрібен мінімальний час на пошук джерела проблеми.

Ефективний моніторинг

Що мені хотілося бачити у процесі моніторингу ETL?

Моніторинг ETL-процесів у невеликому сховищі даних
Start at - коли почав роботу,
Source - джерело даних,
Layer - який рівень сховища завантажується,
ETL Job Name — це процедура завантаження, яка складається з безлічі дрібних кроків,
Step Number - номер виконуваного кроку,
Affected Rows - скільки даних вже обробилося,
Duration sec - як довго виконується,
Status — все добре чи ні: OK, ERROR, RUNNING, HANGS
Message – останнє успішне повідомлення або опис помилки.

На підставі статусу записів можна надіслати ел. лист до інших учасників. Якщо помилок немає, то лист не обов'язковий.

Отже, у разі помилки чітко вказано місце події.

Іноді трапляється, що сам інструмент моніторингу не працює. У такому разі є можливість прямо в базі даних викликати уявлення (схілку), на підставі якої побудовано звіт.

Таблиця моніторингу ETL

Щоб реалізувати моніторинг ETL-процесів, достатньо однієї таблиці та одного уявлення.

Для цього можна повернутися до своє маленьке сховище та створити прототип у базі даних sqlite.

DDL таблиці

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

DDL подання/звіту

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Перевірка можливості отримати новий номер сесії

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Особливості таблиці:

  • початок та кінець процедури обробки даних має супроводжуватися кроками ETL_START та ETL_END
  • у разі помилки має створюватися крок ETL_ERROR з її описом
  • кількість оброблених даних потрібно виділити, наприклад, зірочками
  • одночасно одну і ту ж процедуру можна запустити з параметром force_restart=y, без нього номер сесії видається лише завершеній процедурі
  • у звичайному режимі не можна запустити паралельно одну і ту ж процедуру обробки даних

Необхідними операціями роботи з таблицею є такі:

  • отримання номера сесії процедури, що запускається ETL
  • вставка запису лога до таблиці
  • отримання останнього успішного запису процедури ETL

У таких базах даних, як, наприклад, Oracle або Postgres, ці операції можна реалізувати вбудованими функціями. Для sqlite потрібний зовнішній механізм і в даному випадку він прототипований на PHP.

Висновок

Таким чином, повідомлення про помилки в інструментах обробки даних відіграють важливу роль. Але оптимальними для швидкого пошуку причини проблеми їх важко назвати. Коли кількість процедур наближається до сотні, моніторинг процесів перетворюється на складний проект.

У статті наведено приклад можливого вирішення проблеми у вигляді прототипу. Весь прототип маленького сховища доступний у gitlab SQLite PHP ETL Utilities.

Джерело: habr.com

Додати коментар або відгук