ETL-processen monitoren in een klein datawarehouse

Velen gebruiken gespecialiseerde tools om procedures te creëren voor het extraheren, transformeren en laden van gegevens in relationele databases. Het proces van werkende tools wordt gelogd, fouten worden verholpen.

In het geval van een fout bevat het logboek informatie dat de tool de taak niet heeft voltooid en welke modules (vaak java) waar zijn gestopt. In de laatste regels kunt u een databasefout vinden, bijvoorbeeld een overtreding van een unieke tabelsleutel.

Om de vraag te beantwoorden welke rol ETL-foutinformatie speelt, heb ik alle problemen die zich de afgelopen twee jaar hebben voorgedaan in een vrij grote repository ondergebracht.

ETL-processen monitoren in een klein datawarehouse

Databasefouten zijn onder meer onvoldoende ruimte, verbroken verbinding, vastgelopen sessie, enz.

Logische fouten zijn onder meer schending van tabelsleutels, ongeldige objecten, geen toegang tot objecten, enz.
De planner start mogelijk niet op tijd, loopt vast, etc.

Eenvoudige fouten hebben niet veel tijd nodig om te herstellen. Een goede ETL kan de meeste zelf aan.

Complexe bugs maken het noodzakelijk om procedures voor het werken met gegevens te ontdekken en te testen, om gegevensbronnen te verkennen. Leidt vaak tot de noodzaak van het testen en implementeren van wijzigingen.

De helft van alle problemen heeft dus te maken met de database. 48% van alle fouten zijn simpele fouten.
Een derde van alle problemen houdt verband met het wijzigen van de opslaglogica of het opslagmodel, meer dan de helft van deze fouten is complex.

En minder dan een kwart van alle problemen heeft te maken met de taakplanner, waarvan 18% simpele fouten zijn.

Over het algemeen is 22% van alle fouten die optreden complex en vereist het corrigeren ervan de meeste aandacht en tijd. Ze gebeuren ongeveer een keer per week. Terwijl simpele fouten bijna elke dag voorkomen.

Vanzelfsprekend is het monitoren van ETL-processen effectief wanneer de foutlocatie zo nauwkeurig mogelijk in het logboek wordt aangegeven en er zo min mogelijk tijd nodig is om de oorzaak van het probleem te vinden.

Effectieve bewaking

Wat wilde ik zien in het ETL-bewakingsproces?

ETL-processen monitoren in een klein datawarehouse
Begin bij - toen hij begon te werken,
Bron - gegevensbron,
Laag - welk opslagniveau wordt geladen,
ETL-taaknaam - uploadprocedure, die uit veel kleine stappen bestaat,
Stapnummer - het nummer van de stap die wordt uitgevoerd,
Betrokken rijen - hoeveel gegevens er al zijn verwerkt,
Duur sec - hoe lang het duurt,
Status - of alles in orde is of niet: OK, ERROR, RUNNING, HANGS
Bericht - Laatste succesvolle bericht of foutbeschrijving.

Op basis van de status van de boekingen kunt u een e-mail sturen. brief aan andere leden. Als er geen fouten zijn, is de brief niet nodig.

Zo wordt bij een fout duidelijk aangegeven waar het incident zich heeft voorgedaan.

Soms komt het voor dat de monitoringtool zelf niet werkt. In dit geval is het mogelijk om direct in de database een view (view) aan te roepen, op basis waarvan het rapport is opgebouwd.

ETL-bewakingstabel

Om monitoring van ETL-processen te implementeren, zijn één tabel en één weergave voldoende.

Om dit te doen, kunt u terugkeren naar uw kleine opslag en maak een prototype in de sqlite-database.

DDL-tabellen

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

DDL bekijken/rapporteren

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Controleren of het mogelijk is om een ​​nieuw sessienummer te krijgen

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Tabel kenmerken:

  • het begin en einde van de gegevensverwerkingsprocedure moeten worden gevolgd door de stappen ETL_START en ETL_END
  • in geval van een fout moet de ETL_ERROR-stap met zijn beschrijving worden gemaakt
  • de hoeveelheid verwerkte gegevens moet worden gemarkeerd, bijvoorbeeld met sterretjes
  • dezelfde procedure kan tegelijkertijd worden gestart met de parameter force_restart=y, zonder dat wordt het sessienummer alleen toegekend aan de voltooide procedure
  • in de normale modus kunt u niet dezelfde gegevensverwerkingsprocedure parallel uitvoeren

De noodzakelijke bewerkingen voor het werken met een tabel zijn als volgt:

  • het sessienummer ophalen van de lopende ETL-procedure
  • loginvoer in de tabel invoegen
  • het verkrijgen van de laatste succesvolle registratie van een ETL-procedure

In databases zoals Oracle of Postgres kunnen deze bewerkingen worden geïmplementeerd als ingebouwde functies. sqlite vereist een extern mechanisme, en in dit geval het geprototypeerd in PHP.

Uitgang

Foutmeldingen in gegevensverwerkingstools spelen dus een megabelangrijke rol. Maar het is moeilijk om ze optimaal te noemen om snel de oorzaak van het probleem te vinden. Als het aantal procedures de honderd nadert, wordt procesbewaking een complex project.

Het artikel geeft een voorbeeld van een mogelijke oplossing voor het probleem in de vorm van een prototype. Het hele kleine repository-prototype is beschikbaar in gitlab SQLite PHP ETL-hulpprogramma's.

Bron: www.habr.com

Voeg een reactie