Küçük bir veri ambarında ETL süreçlerini izleme

Birçoğu, verileri ayıklamak, dönüştürmek ve ilişkisel veritabanlarına yüklemek için prosedürler oluşturmak için özel araçlar kullanır. Çalışma araçlarının süreci günlüğe kaydedilir, hatalar giderilir.

Bir hata durumunda, günlük, aracın görevi tamamlayamadığı ve hangi modüllerin (genellikle java) nerede durduğu bilgilerini içerir. Son satırlarda, bir veritabanı hatası, örneğin bir tablo benzersiz anahtarı ihlali bulabilirsiniz.

ETL hata bilgisinin nasıl bir rol oynadığı sorusunu cevaplamak için, son iki yılda meydana gelen tüm sorunları oldukça geniş bir depoda sınıflandırdım.

Küçük bir veri ambarında ETL süreçlerini izleme

Veritabanı hataları, yeterli alan olmaması, bağlantının kesilmesi, oturumun askıya alınması vb.

Mantıksal hatalar, tablo anahtarlarının ihlali, geçersiz nesneler, nesnelere erişim eksikliği vb.
Zamanlayıcı zamanında başlamayabilir, donabilir, vb.

Basit hataların düzeltilmesi uzun sürmez. İyi bir ETL, bunların çoğunu kendi başına halledebilir.

Karmaşık hatalar, veri kaynaklarını keşfetmek için verilerle çalışmak için prosedürleri keşfetmeyi ve test etmeyi gerekli kılar. Genellikle değişiklik testi ve devreye alma ihtiyacına yol açar.

Bu nedenle, tüm sorunların yarısı veritabanıyla ilgilidir. Tüm hataların %48'i basit hatalardır.
Tüm sorunların üçte biri, depolama mantığını veya modelini değiştirmekle ilgilidir ve bu hataların yarısından fazlası karmaşıktır.

Ve tüm sorunların dörtte birinden azı, %18'i basit hatalardan oluşan görev zamanlayıcıyla ilgilidir.

Genel olarak, meydana gelen tüm hataların %22'si karmaşıktır ve bunların düzeltilmesi en fazla dikkat ve zamanı gerektirir. Haftada bir kez oluyorlar. Oysa basit hatalar neredeyse her gün oluyor.

Açıktır ki, ETL süreçlerinin izlenmesi, hata konumu günlükte mümkün olduğunca doğru bir şekilde belirtildiğinde ve sorunun kaynağını bulmak için minimum süre gerektiğinde etkili olacaktır.

Etkili izleme

ETL izleme sürecinde ne görmek istedim?

Küçük bir veri ambarında ETL süreçlerini izleme
Başla - işe başladığı zaman,
Kaynak - veri kaynağı,
Katman - hangi düzeyde depolama yükleniyor,
ETL İş Adı - birçok küçük adımdan oluşan yükleme prosedürü,
Adım Numarası - gerçekleştirilen adımın numarası,
Etkilenen Satırlar - halihazırda işlenmiş olan veri miktarı,
Süre sn - ne kadar sürer,
Durum - her şey yolunda olsun ya da olmasın: TAMAM, HATA, ÇALIŞIYOR, TAKILIYOR
Mesaj - Son başarılı mesaj veya hata açıklaması.

Kayıtların durumuna göre e-posta gönderebilirsiniz. Diğer üyelere mektup. Hata yoksa, harf gerekli değildir.

Böylece herhangi bir hata durumunda olayın meydana geldiği yer açıkça belirtilir.

Bazen izleme aracının kendisi çalışmaz. Bu durumda, raporun oluşturulduğu temelinde doğrudan veritabanında bir görünüm (görünüm) çağırmak mümkündür.

ETL izleme tablosu

ETL süreçlerinin izlenmesini gerçekleştirmek için bir tablo ve bir görünüm yeterlidir.

Bunu yapmak için geri dönebilirsiniz. senin küçük depon ve sqlite veritabanında prototip oluşturun.

DDL tabloları

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

DDL'yi Görüntüle/Raporla

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

Yeni bir oturum numarası almanın mümkün olup olmadığını kontrol eden SQL

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Tablo özellikleri:

  • veri işleme prosedürünün başlangıcını ve bitişini ETL_START ve ETL_END adımları takip etmelidir
  • hata olması durumunda açıklamasıyla birlikte ETL_ERROR adımı oluşturulmalıdır.
  • işlenen veri miktarı, örneğin yıldız işaretleri ile vurgulanmalıdır.
  • aynı prosedür force_restart=y parametresi ile aynı anda başlatılabilir, onsuz oturum numarası sadece tamamlanan prosedüre verilir
  • normal modda, aynı veri işleme prosedürünü paralel olarak çalıştıramazsınız

Bir tablo ile çalışmak için gerekli işlemler aşağıdaki gibidir:

  • çalışan ETL prosedürünün oturum numarasını alma
  • günlük girişini tabloya ekle
  • bir ETL prosedürünün son başarılı kaydını almak

Oracle veya Postgres gibi veritabanlarında bu işlemler yerleşik işlevler olarak uygulanabilir. sqlite harici bir mekanizma gerektirir ve bu durumda PHP'de prototiplendi.

Aviator apk

Bu nedenle, veri işleme araçlarındaki hata mesajları çok önemli bir rol oynar. Ancak, sorunun nedenini hızlı bir şekilde bulmak için onları optimal olarak adlandırmak zordur. Prosedür sayısı yüze yaklaştığında, süreç izleme karmaşık bir projeye dönüşür.

Makale, soruna bir prototip biçiminde olası bir çözüm örneği sunmaktadır. Tüm küçük depo prototipi gitlab'de mevcuttur SQLite PHP ETL Yardımcı Programları.

Kaynak: habr.com

Yorum ekle