ETL folyamatok figyelése egy kis adattárházban

Sokan speciális eszközöket használnak az adatok relációs adatbázisokba való kinyerésére, átalakítására és betöltésére szolgáló eljárások létrehozására. A munkaeszközök folyamatát naplózza, a hibákat javítja.

Hiba esetén a napló információkat tartalmaz arról, hogy az eszköz nem tudta végrehajtani a feladatot, és mely modulok (gyakran a java) hol álltak le. Az utolsó sorokban adatbázis-hibát találhat, például egy tábla egyedi kulcsának megsértését.

Annak a kérdésnek a megválaszolásához, hogy milyen szerepet játszik az ETL hibainformáció, az elmúlt két évben fellépő összes problémát egy meglehetősen nagy adattárba soroltam.

ETL folyamatok figyelése egy kis adattárházban

Az adatbázishibák közé tartozik a kevés hely, a kapcsolat megszakadása, a munkamenet lefagyása stb.

A logikai hibák közé tartozik például a táblakulcsok megsértése, nem érvényes objektumok, az objektumokhoz való hozzáférés hiánya stb.
Előfordulhat, hogy az ütemező nem indul el időben, lefagy stb.

Az egyszerű hibák kijavítása nem tart sokáig. Egy jó ETL a legtöbbet önmagában is képes kezelni.

Az összetett hibák szükségessé teszik az adatokkal való munkavégzés eljárásainak felderítését és tesztelését, az adatforrások feltárását. Gyakran szükségessé teszik a változtatások tesztelését és telepítését.

Tehát az összes probléma fele az adatbázishoz kapcsolódik. Az összes hiba 48%-a egyszerű hiba.
A problémák harmada a tárolási logika vagy modell megváltoztatásával kapcsolatos, ezeknek a hibáknak több mint fele összetett.

És az összes probléma kevesebb mint egynegyede kapcsolódik a feladatütemezőhöz, amelyek 18%-a egyszerű hiba.

Általánosságban elmondható, hogy az összes előforduló hiba 22%-a összetett, ezek javítása igényel a legtöbb figyelmet és időt. Hetente körülbelül egyszer fordulnak elő. Míg az egyszerű hibák szinte minden nap előfordulnak.

Nyilvánvalóan az ETL-folyamatok monitorozása akkor lesz hatékony, ha a hiba helye a lehető legpontosabban megjelenik a naplóban, és minimális idő szükséges a probléma forrásának megtalálásához.

Hatékony monitorozás

Mit szerettem volna látni az ETL felügyeleti folyamatban?

ETL folyamatok figyelése egy kis adattárházban
Kezdje - amikor elkezdett dolgozni,
Forrás – adatforrás,
Réteg – milyen szintű tárhely van betöltve,
ETL Job Name - feltöltési eljárás, amely sok kis lépésből áll,
Lépés száma – az éppen végrehajtott lépés száma,
Érintett sorok – mennyi adatot már feldolgoztak,
Időtartam másodperc – mennyi ideig tart,
Állapot - hogy minden rendben van-e vagy sem: OK, HIBA, FUT, FÜGG
Üzenet – Utolsó sikeres üzenet vagy hiba leírása.

A rekordok állapota alapján e-mailt küldhet. levelet a többi tagnak. Ha nincsenek hibák, akkor a levél nem szükséges.

Így hiba esetén egyértelműen fel van tüntetve az incidens helye.

Néha előfordul, hogy maga a megfigyelő eszköz nem működik. Ilyenkor lehetőség van közvetlenül az adatbázisban egy nézet (nézet) meghívására, amely alapján a riport épül.

ETL megfigyelő táblázat

Az ETL-folyamatok figyelésének megvalósításához elegendő egy tábla és egy nézet.

Ehhez visszatérhet a a kis tárolód és prototípust hozzon létre az sqlite adatbázisban.

DDL táblázatok

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

DDL megtekintése/jelentése

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Annak ellenőrzése, hogy lehetséges-e új munkamenetszám beszerzése

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

A táblázat jellemzői:

  • az adatfeldolgozási eljárás kezdetét és végét az ETL_START és az ETL_END lépéseknek kell követniük
  • hiba esetén létre kell hozni az ETL_ERROR lépést a leírásával együtt
  • a feldolgozott adatok mennyiségét például csillagokkal kell kiemelni
  • ugyanaz az eljárás indítható egyszerre a force_restart=y paraméterrel, enélkül a munkamenetszám csak a befejezett eljáráshoz kerül kiadásra
  • normál módban nem futtathatja párhuzamosan ugyanazt az adatfeldolgozási eljárást

A táblázattal való munkavégzéshez szükséges műveletek a következők:

  • a futó ETL eljárás munkamenetszámának lekérése
  • naplóbejegyzés beszúrása a táblázatba
  • egy ETL-eljárás utolsó sikeres rekordjának lekérése

Az olyan adatbázisokban, mint az Oracle vagy a Postgres, ezek a műveletek beépített függvényként valósíthatók meg. Az sqlite külső mechanizmust igényel, és ebben az esetben azt prototípusa PHP-ben.

Teljesítmény

Így az adatfeldolgozó eszközök hibaüzenetei megafontos szerepet játszanak. De nehéz őket optimálisnak nevezni a probléma okának gyors megtalálásához. Ha az eljárások száma megközelíti a százat, akkor a folyamatfelügyelet összetett projektté válik.

A cikk a probléma lehetséges megoldására mutat példát prototípus formájában. A teljes kis tároló prototípus elérhető a gitlabban SQLite PHP ETL segédprogramok.

Forrás: will.com

Hozzászólás