監控小型數據倉庫中的 ETL 過程

許多人使用專門的工具來創建用於將數據提取、轉換和加載到關係數據庫中的過程。 記錄工作工具的過程,修復錯誤。

如果出現錯誤,日誌包含工具未能完成任務以及哪些模塊(通常是 java)在何處停止的信息。 在最後幾行中,您可以找到數據庫錯誤,例如表唯一鍵衝突。

為了回答ETL錯誤信息起什麼作用的問題,我將過去兩年發生的所有問題分類在一個相當大的存儲庫中。

監控小型數據倉庫中的 ETL 過程

數據庫錯誤包括空間不足、連接丟失、會話掛起等。

邏輯錯誤包括例如違反表鍵、無效對象、無法訪問對像等。
調度程序可能無法按時啟動,它可能會凍結等。

簡單的錯誤不需要很長時間即可修復。 一個好的 ETL 可以自己處理其中的大部分。

複雜的錯誤使得有必要發現和測試處理數據的過程,以探索數據源。 經常導致需要更改測試和部署。

所以,一半的問題都與數據庫有關。 48% 的錯誤是簡單的錯誤。
所有問題中有三分之一與更改存儲邏輯或模型有關,這些錯誤中超過一半是複雜的。

而所有問題中只有不到四分之一與任務調度程序有關,其中 18% 是簡單錯誤。

一般來說,發生的所有錯誤中有 22% 是複雜的,糾正它們需要最多的注意力和時間。 它們大約每週發生一次。 而簡單的錯誤幾乎每天都在發生。

顯然,只有在日誌中盡可能準確地指出錯誤位置,並且用最少的時間找到問題的根源時,ETL 過程的監控才會有效。

有效監控

我希望在 ETL 監控過程中看到什麼?

監控小型數據倉庫中的 ETL 過程
開始於 - 當他開始工作時,
源 - 數據源,
Layer - 正在加載什麼級別的存儲,
ETL Job Name - 上傳程序,由許多小步驟組成,
Step Number - 正在執行的步驟的編號,
受影響的行 - 已經處理了多少數據,
Duration sec - 需要多長時間,
狀態——是否一切正常:OK、ERROR、RUNNING、HANGS
消息 - 最後一條成功消息或錯誤描述。

根據條目的狀態,您可以發送電子郵件。 給其他成員的信。 如果沒有錯誤,則不需要這封信。

因此,在出現錯誤的情況下,事件的位置會被清楚地指示出來。

有時會發生監控工具本身不起作用的情況。 在這種情況下,可以直接在數據庫中調用視圖(視圖),並以此為基礎構建報表。

ETL監控表

實現對ETL過程的監控,一張表一視圖就夠了。

為此,您可以返回 你的小儲藏室 並在 sqlite 數據庫中創建原型。

DDL表

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

查看/報告 DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL 檢查是否可以獲得新的會話號

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

表的特點:

  • 數據處理過程的開始和結束必須遵循步驟 ETL_START 和 ETL_END
  • 如果出現錯誤,應創建 ETL_ERROR 步驟及其描述
  • 處理過的數據量應該突出顯示,例如用星號
  • 可以使用 force_restart=y 參數同時啟動相同的過程,沒有它,會話號僅發布給已完成的過程
  • 在正常模式下,您不能並行運行相同的數據處理過程

使用表的必要操作如下:

  • 獲取正在運行的 ETL 過程的會話號
  • 將日誌條目插入表中
  • 獲取 ETL 過程的最後成功記錄

在 Oracle 或 Postgres 等數據庫中,這些操作可以作為內置函數實現。 sqlite 需要一個外部機制,在這種情況下它 以PHP為原型.

產量

因此,數據處理工具中的錯誤消息起著極其重要的作用。 但是很難稱它們為快速找到問題原因的最佳選擇。 當程序數量接近一百時,過程監控就變成了一個複雜的項目。

本文以原型的形式提供了一個可能的問題解決方案示例。 整個小型存儲庫原型在 gitlab 中可用 SQLite PHP ETL 實用程序.

來源: www.habr.com

添加評論