Шағын деректер қоймасындағы ETL процестерін бақылау

Көбісі деректерді алу, түрлендіру және реляциялық дерекқорларға жүктеу процедураларын жасау үшін арнайы құралдарды пайдаланады. Жұмыс құралдарының процесі журналға жазылады, қателер түзетіледі.

Қате болған жағдайда, журналда құрал тапсырманы орындай алмағаны және қай модульдер (көбінесе java) қайда тоқтағаны туралы ақпаратты қамтиды. Соңғы жолдарда дерекқор қатесін табуға болады, мысалы, кестенің бірегей кілтінің бұзылуы.

ETL қате туралы ақпарат қандай рөл атқарады деген сұраққа жауап беру үшін мен соңғы екі жылда орын алған барлық мәселелерді үлкен репозиторийде жіктедім.

Шағын деректер қоймасындағы ETL процестерін бақылау

Деректер базасының қателеріне орын жеткіліксіздігі, қосылымның жоғалуы, сеанстың тоқтауы және т.б. жатады.

Логикалық қателерге кесте кілттерінің бұзылуы, жарамсыз объектілер, объектілерге қол жеткізудің болмауы және т.б. жатады.
Жоспарлаушы уақытында іске қосылмауы мүмкін, ол қатып қалуы мүмкін және т.б.

Қарапайым қателерді түзету көп уақытты қажет етпейді. Жақсы ETL олардың көпшілігін өздігінен шеше алады.

Күрделі қателер деректермен жұмыс істеу процедураларын табуды және тексеруді, деректер көздерін зерттеуді қажет етеді. Көбінесе өзгерістерді тестілеу және орналастыру қажеттілігіне әкеледі.

Сонымен, барлық мәселелердің жартысы мәліметтер базасына қатысты. Барлық қателердің 48% - қарапайым қателер.
Барлық мәселелердің үштен бірі сақтау логикасын немесе үлгісін өзгертуге байланысты, бұл қателердің жартысынан көбі күрделі.

Және барлық мәселелердің төрттен бірінен азы тапсырмаларды жоспарлаушыға қатысты, оның 18% -ы қарапайым қателер.

Жалпы алғанда, барлық орын алатын қателердің 22% күрделі және оларды түзету ең көп көңіл бөлуді және уақытты қажет етеді. Олар шамамен аптасына бір рет болады. Ал қарапайым қателер күн сайын дерлік болады.

Әлбетте, ETL процестерінің мониторингі қатенің орны журналда мүмкіндігінше дәл көрсетілгенде және мәселенің көзін табу үшін ең аз уақыт қажет болғанда тиімді болады.

Тиімді мониторинг

Мен ETL бақылау процесінде не көргім келді?

Шағын деректер қоймасындағы ETL процестерін бақылау
Бастау - ол жұмысты бастаған кезде,
Дереккөз - деректер көзі,
Қабат - сақтаудың қандай деңгейі жүктелуде,
ETL Job Name - көптеген шағын қадамдардан тұратын жүктеп салу процедурасы,
Қадам нөмірі - орындалатын қадамның нөмірі,
Зардап шеккен жолдар - қанша деректер өңделген,
Ұзақтығы сек – қанша уақыт алады,
Күй - бәрі жақсы ма, жоқ па: ЖАРАЙДЫ, ҚАТЕ, ЖҮРГІЗУ, АСЫЛДЫ
Хабарлама - Соңғы сәтті хабар немесе қате сипаттамасы.

Жазбалардың күйіне қарай электрондық поштаны жіберуге болады. басқа мүшелерге хат. Егер қателер болмаса, онда әріп қажет емес.

Осылайша, қателік орын алған жағдайда оқиғаның орны нақты көрсетіледі.

Кейде бақылау құралының өзі жұмыс істемейді. Бұл жағдайда есеп құрастырылатын мәліметтер базасында тікелей көріністі (көрініс) шақыруға болады.

ETL мониторинг кестесі

ETL процестерінің мониторингін жүзеге асыру үшін бір кесте және бір көрініс жеткілікті.

Мұны істеу үшін сіз қайтып орала аласыз сіздің шағын қоймаңыз және sqlite дерекқорында прототипті жасаңыз.

DDL кестелері

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

DDL көру/есеп беру

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Жаңа сеанс нөмірін алу мүмкіндігін тексеру

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Кестенің ерекшеліктері:

  • деректерді өңдеу процедурасының басталуы мен аяқталуы ETL_START және ETL_END қадамдарымен орындалуы керек
  • қате болған жағдайда оның сипаттамасы бар ETL_ERROR қадамын жасау керек
  • өңделген деректердің көлемі, мысалы, жұлдызшалармен бөлектелуі керек
  • бірдей процедураны force_restart=y параметрімен бір уақытта бастауға болады, онсыз сеанс нөмірі аяқталған процедураға ғана беріледі
  • қалыпты режимде бірдей деректерді өңдеу процедурасын қатар орындай алмайсыз

Кестемен жұмыс істеуге қажетті операциялар келесідей:

  • іске қосылған ETL процедурасының сеанс нөмірін алу
  • кестеге журнал жазбасын енгізіңіз
  • ETL процедурасының соңғы сәтті жазбасын алу

Oracle немесе Postgres сияқты дерекқорларда бұл әрекеттерді кірістірілген функциялар ретінде іске асыруға болады. sqlite сыртқы механизмді қажет етеді және бұл жағдайда ол PHP тілінде прототиптелген.

қорытынды

Осылайша, деректерді өңдеу құралдарындағы қате туралы хабарлар мега-маңызды рөл атқарады. Бірақ мәселенің себебін тез табу үшін оларды оңтайлы деп айту қиын. Процедуралар саны жүзге жақындағанда, процесті бақылау күрделі жобаға айналады.

Мақалада прототип түріндегі мәселені шешудің мүмкін болатын мысалы келтірілген. Шағын репозиторийдің барлық прототипі gitlab жүйесінде қол жетімді SQLite PHP ETL утилиталары.

Ақпарат көзі: www.habr.com

пікір қалдыру