Παρακολούθηση διαδικασιών ETL σε μια μικρή αποθήκη δεδομένων

Πολλοί χρησιμοποιούν εξειδικευμένα εργαλεία για να δημιουργήσουν διαδικασίες εξαγωγής, μετατροπής και φόρτωσης δεδομένων σε σχεσιακές βάσεις δεδομένων. Η διαδικασία των εργαλείων εργασίας καταγράφεται, τα σφάλματα διορθώνονται.

Σε περίπτωση σφάλματος, το αρχείο καταγραφής περιέχει πληροφορίες ότι το εργαλείο απέτυχε να ολοκληρώσει την εργασία και ποιες λειτουργικές μονάδες (συχνά η java) σταμάτησαν πού. Στις τελευταίες γραμμές, μπορείτε να βρείτε ένα σφάλμα βάσης δεδομένων, για παράδειγμα, μια παραβίαση μοναδικού κλειδιού πίνακα.

Για να απαντήσω στο ερώτημα τι ρόλο παίζουν οι πληροφορίες σφαλμάτων ETL, έχω ταξινομήσει όλα τα προβλήματα που έχουν εμφανιστεί τα τελευταία δύο χρόνια σε ένα αρκετά μεγάλο χώρο αποθήκευσης.

Παρακολούθηση διαδικασιών ETL σε μια μικρή αποθήκη δεδομένων

Τα σφάλματα βάσης δεδομένων περιλαμβάνουν έλλειψη αρκετόυ χώρου, απώλεια σύνδεσης, ανάρτηση περιόδου λειτουργίας κ.λπ.

Τα λογικά σφάλματα περιλαμβάνουν όπως παραβίαση κλειδιών πίνακα, μη έγκυρα αντικείμενα, έλλειψη πρόσβασης σε αντικείμενα κ.λπ.
Ο προγραμματιστής μπορεί να μην ξεκινήσει στην ώρα του, μπορεί να παγώσει κ.λπ.

Τα απλά λάθη δεν χρειάζονται πολύ για να διορθωθούν. Ένα καλό ETL μπορεί να χειριστεί τα περισσότερα από μόνο του.

Τα σύνθετα σφάλματα καθιστούν απαραίτητο τον εντοπισμό και τη δοκιμή διαδικασιών για την εργασία με δεδομένα, την εξερεύνηση πηγών δεδομένων. Συχνά οδηγούν στην ανάγκη για δοκιμή αλλαγής και ανάπτυξη.

Έτσι, τα μισά από όλα τα προβλήματα σχετίζονται με τη βάση δεδομένων. Το 48% όλων των λαθών είναι απλά λάθη.
Το ένα τρίτο όλων των προβλημάτων σχετίζονται με την αλλαγή της λογικής ή του μοντέλου αποθήκευσης, περισσότερα από τα μισά από αυτά τα σφάλματα είναι πολύπλοκα.

Και λιγότερο από το ένα τέταρτο όλων των προβλημάτων σχετίζονται με τον προγραμματιστή εργασιών, το 18% των οποίων είναι απλά σφάλματα.

Γενικά, το 22% όλων των σφαλμάτων που συμβαίνουν είναι πολύπλοκα και η διόρθωση τους απαιτεί τη μεγαλύτερη προσοχή και χρόνο. Συμβαίνουν περίπου μία φορά την εβδομάδα. Ενώ απλά λάθη συμβαίνουν σχεδόν κάθε μέρα.

Προφανώς, η παρακολούθηση των διαδικασιών ETL θα είναι αποτελεσματική όταν η θέση σφάλματος υποδεικνύεται στο αρχείο καταγραφής όσο το δυνατόν ακριβέστερα και απαιτείται ο ελάχιστος χρόνος για να βρεθεί η πηγή του προβλήματος.

Αποτελεσματική παρακολούθηση

Τι ήθελα να δω στη διαδικασία παρακολούθησης ETL;

Παρακολούθηση διαδικασιών ETL σε μια μικρή αποθήκη δεδομένων
Ξεκινήστε από - όταν άρχισε να δουλεύει,
Πηγή - πηγή δεδομένων,
Επίπεδο - ποιο επίπεδο αποθήκευσης φορτώνεται,
ETL Job Name - διαδικασία μεταφόρτωσης, η οποία αποτελείται από πολλά μικρά βήματα,
Αριθμός βήματος - ο αριθμός του βήματος που εκτελείται,
Σειρές που επηρεάζονται - πόσα δεδομένα έχουν ήδη υποστεί επεξεργασία,
Διάρκεια sec - πόσος χρόνος χρειάζεται,
Κατάσταση - είτε όλα είναι καλά είτε όχι: OK, ERROR, RUNNING, HANGS
Μήνυμα - Τελευταίο επιτυχημένο μήνυμα ή περιγραφή σφάλματος.

Με βάση την κατάσταση των εγγραφών, μπορείτε να στείλετε ένα email. επιστολή προς άλλα μέλη. Εάν δεν υπάρχουν σφάλματα, τότε το γράμμα δεν είναι απαραίτητο.

Έτσι, σε περίπτωση λάθους, επισημαίνεται με σαφήνεια η τοποθεσία του συμβάντος.

Μερικές φορές συμβαίνει ότι το ίδιο το εργαλείο παρακολούθησης δεν λειτουργεί. Σε αυτήν την περίπτωση, είναι δυνατή η κλήση μιας προβολής (προβολή) απευθείας στη βάση δεδομένων, βάσει της οποίας δημιουργείται η αναφορά.

Πίνακας παρακολούθησης ETL

Για την υλοποίηση της παρακολούθησης των διαδικασιών ETL, αρκεί ένας πίνακας και μία προβολή.

Για να το κάνετε αυτό, μπορείτε να επιστρέψετε στο ο μικρός σου αποθηκευτικός χώρος και δημιουργήστε πρωτότυπο στη βάση δεδομένων sqlite.

πίνακες DDL

CREATE TABLE UTL_JOB_STATUS (
/* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */
  UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  SID               INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */
  LOG_DT            INTEGER NOT NULL DEFAULT 0,  /* Date time */
  LOG_D             INTEGER NOT NULL DEFAULT 0,  /* Date */
  JOB_NAME          TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */
  STEP_NAME         TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */
  STEP_DESCR        TEXT,                        /* Description of task or error message */
  UNIQUE (SID, JOB_NAME, STEP_NAME)
);
INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1);

Προβολή/Αναφορά DDL

CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V
AS /* Content: Package Execution Log for last 3 Months. */
WITH SRC AS (
  SELECT LOG_D,
    LOG_DT,
    UTL_JOB_STATUS_ID,
    SID,
	CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */
	     WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */
	     WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */
	     WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */
	     WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */
		 WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */
	     WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */
	     ELSE 'N/A' END AS LAYER,
	CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */
	     WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */
	     WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */
	     ELSE 'N/A' END AS SOURCE,
    JOB_NAME,
    STEP_NAME,
    CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG,
    CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG,
    CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG,
    STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG,
	SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS
  FROM UTL_JOB_STATUS
  WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month')
)
SELECT JB.SID,
  JB.MIN_LOG_DT AS START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT,
  JB.SOURCE,
  JB.LAYER,
  JB.JOB_NAME,
  CASE
  WHEN JB.ERROR_FLAG = 1 THEN 'ERROR'
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */
  WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING'
  ELSE 'OK'
  END AS STATUS,
  ERR.STEP_LOG     AS STEP_LOG,
  JB.CNT           AS STEP_CNT,
  JB.AFFECTED_ROWS AS AFFECTED_ROWS,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT,
  strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT,
  JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC
FROM
  ( SELECT SID, SOURCE, LAYER, JOB_NAME,
           MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID,
           MAX(START_FLAG)       AS START_FLAG,
           MAX(END_FLAG)         AS END_FLAG,
           MAX(ERROR_FLAG)       AS ERROR_FLAG,
           MIN(LOG_DT)           AS MIN_LOG_DT,
           MAX(LOG_DT)           AS MAX_LOG_DT,
           SUM(1)                AS CNT,
           SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS
    FROM SRC
    GROUP BY SID, SOURCE, LAYER, JOB_NAME
  ) JB,
  ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG
    FROM SRC
    WHERE 1 = 1
  ) ERR
WHERE 1 = 1
  AND JB.SID = ERR.SID
  AND JB.JOB_NAME = ERR.JOB_NAME
  AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID
ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE;

SQL Έλεγχος εάν είναι δυνατή η λήψη νέου αριθμού συνεδρίας

SELECT SUM (
  CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */
	    AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */
       THEN 1 ELSE 0
  END ) AS IS_RUNNING
  FROM
    ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID, 1 AS dummy
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */
	    AND STEP_NAME = 'ETL_START'
      GROUP BY JOB_NAME, SID
    ) start_job /* starts */
  ON d_job.dummy = start_job.dummy
  LEFT OUTER JOIN
    ( SELECT JOB_NAME, SID
      FROM UTL_JOB_STATUS
      WHERE JOB_NAME = 'RPT_ACCESS_LOG'  /* job name PARAMETER */
	    AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */
      GROUP BY JOB_NAME, SID
    ) end_job /* ends */
  ON start_job.JOB_NAME = end_job.JOB_NAME
     AND start_job.SID = end_job.SID

Χαρακτηριστικά πίνακα:

  • η έναρξη και το τέλος της διαδικασίας επεξεργασίας δεδομένων πρέπει να ακολουθούνται από τα βήματα ETL_START και ETL_END
  • Σε περίπτωση σφάλματος, θα πρέπει να δημιουργηθεί το βήμα ETL_ERROR με την περιγραφή του
  • ο όγκος των επεξεργασμένων δεδομένων πρέπει να επισημαίνεται, για παράδειγμα, με αστερίσκους
  • η ίδια διαδικασία μπορεί να ξεκινήσει ταυτόχρονα με την παράμετρο force_restart=y, χωρίς αυτήν ο αριθμός συνεδρίας εκδίδεται μόνο στην ολοκληρωμένη διαδικασία
  • σε κανονική λειτουργία, δεν μπορείτε να εκτελέσετε την ίδια διαδικασία επεξεργασίας δεδομένων παράλληλα

Οι απαραίτητες λειτουργίες για την εργασία με έναν πίνακα είναι οι εξής:

  • λήψη του αριθμού συνεδρίας της τρέχουσας διαδικασίας ETL
  • εισαγάγετε την καταχώριση αρχείου καταγραφής στον πίνακα
  • λήψη του τελευταίου επιτυχημένου ρεκόρ μιας διαδικασίας ETL

Σε βάσεις δεδομένων όπως η Oracle ή η Postgres, αυτές οι λειτουργίες μπορούν να υλοποιηθούν ως ενσωματωμένες συναρτήσεις. Το sqlite απαιτεί έναν εξωτερικό μηχανισμό, και σε αυτήν την περίπτωση πρωτότυπο σε PHP.

Παραγωγή

Έτσι, τα μηνύματα λάθους στα εργαλεία επεξεργασίας δεδομένων παίζουν πολύ σημαντικό ρόλο. Αλλά είναι δύσκολο να τα ονομάσουμε βέλτιστα για την ταχεία εύρεση της αιτίας του προβλήματος. Όταν ο αριθμός των διαδικασιών πλησιάζει τις εκατό, τότε η παρακολούθηση της διαδικασίας μετατρέπεται σε ένα πολύπλοκο έργο.

Το άρθρο παρέχει ένα παράδειγμα πιθανής λύσης στο πρόβλημα με τη μορφή πρωτοτύπου. Ολόκληρο το πρωτότυπο του μικρού αποθετηρίου είναι διαθέσιμο στο gitlab SQLite PHP ETL Utilities.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο