小規模なデータ ウェアハウスでの ETL プロセスの監視

多くの場合、特殊なツールを使用して、データを抽出、変換し、リレーショナル データベースにロードするための手順を作成します。 ツールの作業プロセスがログに記録され、エラーが修正されます。

エラーが発生した場合、ログには、ツールがタスクを完了できなかったこと、およびどのモジュール (多くの場合 Java) がどこで停止したかに関する情報が含まれます。 最後の行では、テーブルの一意キー違反などのデータベース エラーを見つけることができます。

ETL エラー情報がどのような役割を果たしているかという質問に答えるために、私は過去 XNUMX 年間に発生したすべての問題をかなり大規模なリポジトリに分類しました。

小規模なデータ ウェアハウスでの ETL プロセスの監視

データベース エラーには、十分なスペースがない、接続の切断、セッションのハングなどが含まれます。

論理エラーには、テーブルキーの違反、無効なオブジェクト、オブジェクトへのアクセスの欠如などが含まれます。
スケジューラが時間通りに起動しない、フリーズするなどの可能性があります。

単純な間違いなら修正するのにそれほど時間はかかりません。 優れた ETL は、それらのほとんどを単独で処理できます。

複雑なバグがあるため、データを操作するための手順を発見してテストし、データ ソースを探索する必要があります。 多くの場合、変更のテストと展開が必要になります。

したがって、すべての問題の半分はデータベースに関連しています。 全間違いの 48% は単純な間違いです。
すべての問題の XNUMX 分の XNUMX はストレージ ロジックまたはモデルの変更に関連しており、これらのエラーの半分以上は複雑です。

また、タスク スケジューラに関連する問題は全体の 18 分の XNUMX 未満で、そのうち XNUMX% は単純なエラーです。

一般に、発生するすべてのエラーの 22% は複雑であり、その修正には最も多くの注意と時間を必要とします。 それらはおよそ週に XNUMX 回発生します。 一方、単純な間違いはほぼ毎日起こります。

ETL プロセスの監視が効果的であるのは、エラーの場所がログにできるだけ正確に示され、問題の原因を見つけるのに必要な時間が最小限に抑えられる場合に効果的であることは明らかです。

効果的なモニタリング

ETL 監視プロセスで何を確認したかったのでしょうか?

小規模なデータ ウェアハウスでの ETL プロセスの監視
彼が仕事を始めたときから開始し、
ソース - データソース、
レイヤー - どのレベルのストレージがロードされているか、
ETL ジョブ名 - アップロード手順。多くの小さなステップで構成されます。
ステップ番号 - 実行されているステップの番号、
影響を受ける行 - すでに処理されたデータの量
期間秒 - 所要時間、
ステータス - すべてが正常かどうか: OK、ERROR、RUNNING、HANGS
メッセージ - 最後に成功したメッセージまたはエラーの説明。

記録のステータスに基づいて、電子メールを送信できます。 他のメンバーへの手紙。 間違いがなければ、手紙は必要ありません。

したがって、エラーが発生した場合、問題の発生場所が明確に示されます。

場合によっては、監視ツール自体が機能しない場合があります。 この場合、データベース内でビュー (ビュー) を直接呼び出し、それに基づいてレポートを作成することができます。

ETL監視テーブル

ETL プロセスの監視を実装するには、XNUMX つのテーブルと XNUMX つのビューで十分です。

これを行うには、に戻ることができます あなたの小さなストレージ そしてsqliteデータベースでプロトタイプを作成します。

DDL テーブル

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

DDL の表示/レポート

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

新しいセッション番号を取得できるかどうかをSQLでチェックする

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

テーブルの特徴:

  • データ処理手順の開始と終了の後には、ステップ ETL_START と ETL_END が続く必要があります
  • エラーが発生した場合は、ETL_ERROR ステップとその説明を作成する必要があります。
  • 処理されたデータの量は、アスタリスクなどで強調表示する必要があります。
  • 同じプロシージャは、force_restart=y パラメータを使用して同時に開始できます。このパラメータを使用しない場合、完了したプロシージャに対してのみセッション番号が発行されます。
  • 通常モードでは、同じデータ処理手順を並行して実行できません。

テーブルを操作するために必要な操作は次のとおりです。

  • 実行中のETLプロシージャのセッション番号を取得する
  • ログエントリをテーブルに挿入
  • ETL プロシージャの最後に成功したレコードの取得

Oracle や Postgres などのデータベースでは、これらの操作を組み込み関数として実装できます。 sqlite には外部メカニズムが必要ですが、この場合は PHPでプロトタイプを作成.

出力

したがって、データ処理ツールのエラー メッセージは非常に重要な役割を果たします。 しかし、問題の原因を迅速に見つけるのに最適とは言いがたいです。 手順の数が XNUMX に近づくと、プロセス監視は複雑なプロジェクトになります。

この記事では、問題に対する考えられる解決策の例をプロトタイプの形で提供します。 小さなリポジトリのプロトタイプ全体が gitlab で入手可能です SQLite PHP ETL ユーティリティ.

出所: habr.com

コメントを追加します