许多人使用专门的工具来创建用于提取、转换数据并将数据加载到关系数据库中的过程。 记录工作工具的过程,修复错误。
如果出现错误,日志包含该工具未能完成任务以及哪些模块(通常是 java)在何处停止的信息。 在最后几行中,您可以找到数据库错误,例如表唯一键冲突。
为了回答ETL错误信息扮演什么角色的问题,我把这两年发生的所有问题都分类到一个相当大的存储库中。
数据库错误包括空间不足、连接丢失、会话挂起等。
逻辑错误包括违反表键、无效对象、缺乏对对象的访问权限等。
调度程序可能无法按时启动,可能会冻结等。
简单的错误不需要很长时间就能修复。 一个好的 ETL 可以自行处理其中的大部分问题。
复杂的错误使得有必要发现和测试处理数据的程序,以探索数据源。 通常会导致需要进行变更测试和部署。
因此,一半的问题与数据库有关。 48% 的错误都是简单错误。
所有问题中有三分之一与更改存储逻辑或模型有关,其中一半以上的错误很复杂。
所有问题中只有不到四分之一与任务调度程序有关,其中 18% 是简单错误。
一般来说,发生的所有错误中有 22% 是复杂的,并且纠正它们需要最多的注意力和时间。 它们大约每周发生一次。 而简单的错误几乎每天都会发生。
显然,当错误位置尽可能准确地在日志中指示出来并且需要最短的时间找到问题根源时,对 ETL 过程的监控将是有效的。
有效监控
我想在 ETL 监控流程中看到什么?
开始于 - 当他开始工作时,
来源-数据源,
层 - 正在加载什么级别的存储,
ETL 作业名称 - 上传过程,由许多小步骤组成,
步骤编号 - 正在执行的步骤编号,
受影响的行 - 已经处理了多少数据,
持续时间秒 - 需要多长时间,
状态 - 一切是否正常:OK、ERROR、RUNNING、HANGS
消息 - 最后成功的消息或错误描述。
根据记录的状态,您可以发送电子邮件。 致其他成员的信。 如果没有错误,则不需要这封信。
因此,一旦发生错误,事件发生的位置就会被清楚地指示出来。
有时会出现监控工具本身不起作用的情况。 在这种情况下,可以直接调用数据库中的视图(view),在此基础上构建报表。
ETL监控表
要实现ETL过程的监控,一张表一张视图就足够了。
为此,您可以返回
DDL 表
CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
SID INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
LOG_DT INTEGER NOT NULL DEFAULT 0, /* Date time */
LOG_D INTEGER NOT NULL DEFAULT 0, /* Date */
JOB_NAME TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
STEP_NAME TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
STEP_DESCR TEXT, /* Description of task or error message */
UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);
查看/报告 DDL
CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
SELECT LOG_D,
LOG_DT,
UTL_JOB_STATUS_ID,
SID,
CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
ELSE 'N/A' END AS LAYER,
CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
ELSE 'N/A' END AS SOURCE,
JOB_NAME,
STEP_NAME,
CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
FROM UTL_JOB_STATUS
WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
JB.MIN_LOG_DT AS START_DT,
strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
JB.SOURCE,
JB.LAYER,
JB.JOB_NAME,
CASE
WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
ELSE 'OK'
END AS STATUS,
ERR.STEP_LOG AS STEP_LOG,
JB.CNT AS STEP_CNT,
JB.AFFECTED_ROWS AS AFFECTED_ROWS,
strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
( SELECT SID, SOURCE, LAYER, JOB_NAME,
MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
MAX(START_FLAG) AS START_FLAG,
MAX(END_FLAG) AS END_FLAG,
MAX(ERROR_FLAG) AS ERROR_FLAG,
MIN(LOG_DT) AS MIN_LOG_DT,
MAX(LOG_DT) AS MAX_LOG_DT,
SUM(1) AS CNT,
SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
FROM SRC
GROUP BY SID, SOURCE, LAYER, JOB_NAME
) JB,
( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
FROM SRC
WHERE 1 = 1
) ERR
WHERE 1 = 1
AND JB.SID = ERR.SID
AND JB.JOB_NAME = ERR.JOB_NAME
AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;
SQL 检查是否可以获得新的会话号
SELECT SUM (
CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
THEN 1 ELSE 0
END ) AS IS_RUNNING
FROM
( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
LEFT OUTER JOIN
( SELECT JOB_NAME, SID, 1 AS dummy
FROM UTL_JOB_STATUS
WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
AND STEP_NAME = 'ETL_START'
GROUP BY JOB_NAME, SID
) start_job /* starts */
ON d_job.dummy = start_job.dummy
LEFT OUTER JOIN
( SELECT JOB_NAME, SID
FROM UTL_JOB_STATUS
WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
GROUP BY JOB_NAME, SID
) end_job /* ends */
ON start_job.JOB_NAME = end_job.JOB_NAME
AND start_job.SID = end_job.SID
桌子特点:
- 数据处理过程的开始和结束必须遵循步骤 ETL_START 和 ETL_END
- 如果出现错误,应创建 ETL_ERROR 步骤及其描述
- 应突出显示已处理的数据量,例如用星号
- 使用force_restart = y参数可以同时启动相同的过程,如果没有它,会话号仅发布给已完成的过程
- 在正常模式下,不能并行运行相同的数据处理过程
使用表的必要操作如下:
- 获取正在运行的 ETL 过程的会话号
- 将日志条目插入表中
- 获取 ETL 过程的最后一次成功记录
在Oracle或Postgres等数据库中,这些操作可以作为内置函数实现。 sqlite 需要一个外部机制,在这种情况下它
结论
因此,数据处理工具中的错误消息起着非常重要的作用。 但很难说它们是快速找到问题原因的最佳选择。 当流程数量接近一百个时,流程监控就变成了一个复杂的工程。
本文以原型的形式提供了该问题的可能解决方案的示例。 整个小型存储库原型可以在 gitlab 中找到
来源: habr.com