DBA: organizează cu competență sincronizările și importurile

Pentru procesarea complexă a seturilor mari de date (diferite procese ETL: importuri, conversii și sincronizare cu o sursă externă) adesea este nevoie temporar „amintiți-vă” și procesați imediat rapid ceva voluminos.

O sarcină tipică de acest fel sună de obicei cam așa: "Chiar aici departament contabilitate descărcat de la banca client ultimele plăți primite, trebuie să le încărcați rapid pe site și să le conectați la conturile dvs."

Dar când volumul acestui „ceva” începe să se măsoare în sute de megaocteți, iar serviciul trebuie să continue să funcționeze cu baza de date 24x7, apar multe efecte secundare care îți vor ruina viața.
DBA: organizează cu competență sincronizările și importurile
Pentru a le face față în PostgreSQL (și nu numai în el), puteți folosi câteva optimizări care vă vor permite să procesați totul mai rapid și cu un consum mai mic de resurse.

1. Unde să expediați?

Mai întâi, să decidem unde putem încărca datele pe care dorim să le „procesăm”.

1.1. Tabele temporare (TABUL TEMPORAR)

În principiu, pentru PostgreSQL tabelele temporare sunt la fel ca oricare altele. Prin urmare, superstiții ca „Totul acolo este stocat doar în memorie și se poate termina”. Dar există și câteva diferențe semnificative.

Propriul „spațiu de nume” pentru fiecare conexiune la baza de date

Dacă două conexiuni încearcă să se conecteze în același timp CREATE TABLE x, atunci cineva va primi cu siguranță eroare de non-unicitate obiectele bazei de date.

Dar dacă amândoi încearcă să execute CREATE TEMPORARY TABLE x, atunci ambii o vor face normal și toată lumea va primi copia ta Mese. Și nu va fi nimic în comun între ei.

„Se autodistruge” la deconectare

Când conexiunea este închisă, toate tabelele temporare sunt șterse automat, deci manual DROP TABLE x nu are rost decât...

Dacă lucrezi pgbouncer în modul tranzacție, atunci baza de date continuă să creadă că această conexiune este încă activă și în ea încă există acest tabel temporar.

Prin urmare, încercarea de a-l crea din nou, de la o conexiune diferită la pgbouncer, va avea ca rezultat o eroare. Dar acest lucru poate fi ocolit prin folosire CREATE TEMPORARY TABLE IF NOT EXISTS x.

Adevărat, este mai bine să nu faceți acest lucru oricum, pentru că atunci puteți găsi „deodată” acolo datele rămase de la „proprietarul anterior”. În schimb, este mult mai bine să citiți manualul și să vedeți că atunci când creați un tabel este posibil să adăugați ON COMMIT DROP - adică la finalizarea tranzacției, tabelul va fi șters automat.

Nereplicare

Deoarece aparțin doar unei anumite conexiuni, tabelele temporare nu sunt replicate. Dar acest lucru elimină necesitatea înregistrării duble a datelor în heap + WAL, așa că INSERT/UPDATE/DELETE în el este mult mai rapid.

Dar, deoarece un tabel temporar este încă un tabel „aproape obișnuit”, nu poate fi creat nici pe o replică. Cel puțin deocamdată, deși plasturele corespunzător circulă de mult timp.

1.2. TABEL NELOGAT

Dar ce ar trebui să faceți, de exemplu, dacă aveți un fel de proces ETL greoi care nu poate fi implementat într-o singură tranzacție, dar încă aveți pgbouncer în modul tranzacție? ..

Sau fluxul de date este atât de mare încât Nu există suficientă lățime de bandă pe o singură conexiune dintr-o bază de date (citește, un proces per CPU)?...

Sau se fac unele operațiuni asincron in diferite legaturi?...

Există o singură opțiune aici - creați temporar un tabel non-temporar. Joc de cuvinte, da. Acesta este:

  • am creat „propriile mele” tabele cu nume maxim aleatorii pentru a nu se intersecta cu nimeni
  • Extrage: le-a umplut cu date dintr-o sursă externă
  • Transforma: convertit, completat în câmpurile cheie de legătură
  • A incarca: a turnat date gata în tabelele țintă
  • tabelele „mei” au fost șterse

Și acum - o muscă în unguent. De fapt, toate scrierile în PostgreSQL au loc de două ori - primul în WAL, apoi în corpurile tabelului/indexului. Toate acestea sunt făcute pentru a sprijini ACID și a corecta vizibilitatea datelor între COMMIT'nucoasa si ROLLBACK„tranzacții nule.

Dar nu avem nevoie de asta! Avem tot procesul Ori a fost complet de succes, ori nu a fost.. Nu contează câte tranzacții intermediare vor fi - nu ne interesează „continuarea procesului de la mijloc”, mai ales când nu este clar unde a fost.

Pentru a face acest lucru, dezvoltatorii PostgreSQL, încă din versiunea 9.1, au introdus așa ceva ca tabele NELOGGATE:

Cu această indicație, tabelul este creat ca nelogat. Datele scrise în tabelele neînregistrate nu trec prin jurnalul de scriere anticipată (vezi Capitolul 29), determinând astfel de tabele să lucrează mult mai repede decât de obicei. Cu toate acestea, ei nu sunt imuni la eșec; în caz de defecțiune a serverului sau închidere de urgență, un tabel neînregistrat trunchiată automat. În plus, conținutul tabelului neînregistrat nereplicat la serverele sclave. Orice index creat pe un tabel neînregistrat devin automat neînregistrat.

Pe scurt, va fi mult mai rapid, dar dacă serverul de baze de date „cade”, va fi neplăcut. Dar cât de des se întâmplă acest lucru și procesul dumneavoastră ETL știe cum să corecteze acest lucru corect „de la mijloc” după „revitalizarea” bazei de date?...

Dacă nu, iar cazul de mai sus este similar cu al tău, folosește UNLOGGEDdar niciodată nu activați acest atribut pe tabele reale, datele din care vă sunt dragi.

1.3. ON COMMIT { ȘTERGERE RÂNDURI | CĂDERE BRUSCA}

Acest construct vă permite să specificați comportamentul automat atunci când o tranzacție este finalizată la crearea unui tabel.

despre ON COMMIT DROP Am scris deja mai sus, generează DROP TABLE, dar cu ON COMMIT DELETE ROWS situația este mai interesantă – se generează aici TRUNCATE TABLE.

Întrucât întreaga infrastructură pentru stocarea meta-descripției unui tabel temporar este exact aceeași cu cea a unui tabel obișnuit, atunci Crearea și ștergerea constantă a tabelelor temporare duce la „umflarea” severă a tabelelor de sistem pg_class, pg_attribute, pg_attrdef, pg_depend,...

Acum imaginați-vă că aveți un lucrător cu o conexiune directă la baza de date, care deschide o nouă tranzacție în fiecare secundă, creează, completează, procesează și șterge un tabel temporar... Va exista un exces de gunoi acumulat în tabelele de sistem și aceasta va cauza frânări suplimentare pentru fiecare operațiune.

În general, nu face asta! În acest caz, este mult mai eficient CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS scoateți-l din ciclul tranzacției - apoi, la începutul fiecărei tranzacții noi, tabelele sunt deja va exista (salvați un apel CREATE), dar va fi goală, mulțumită TRUNCATE (am salvat și apelul acestuia) la finalizarea tranzacției anterioare.

1.4. LIKE...INCLUSIV...

Am menționat la început că unul dintre cazurile de utilizare tipice pentru tabelele temporare este diferitele tipuri de importuri - iar dezvoltatorul copie și lipește obosit lista de câmpuri din tabelul țintă în declarația sa temporară...

Dar lenea este motorul progresului! De aceea creați un nou tabel „pe baza eșantionului” poate fi mult mai simplu:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Deoarece apoi puteți genera o mulțime de date în acest tabel, căutarea prin el nu va fi niciodată rapidă. Dar există o soluție tradițională pentru aceasta - indici! Si da, un tabel temporar poate avea și indecși.

Deoarece, adesea, indecșii necesari coincid cu indecșii tabelului țintă, puteți scrie pur și simplu LIKE target_table INCLUDING INDEXES.

Daca si tu ai nevoie DEFAULT-valori (de exemplu, pentru a completa valorile cheii primare), puteți utiliza LIKE target_table INCLUDING DEFAULTS. Sau pur și simplu - LIKE target_table INCLUDING ALL — copiază valorile implicite, indexurile, constrângerile,...

Dar aici trebuie să înțelegi că dacă ai creat importați imediat tabelul cu indecși, apoi încărcarea datelor va dura mai multdecât dacă mai întâi umpleți totul și abia apoi rulați indecșii - uitați-vă cum face acest lucru ca exemplu pg_dump.

În general, RTFM!

2. Cum se scrie?

Lasă-mă să spun doar - folosește-l COPY-flow în loc de „pachet” INSERT, accelerare uneori. Puteți chiar și direct dintr-un fișier pre-generat.

3. Cum se procesează?

Deci, să lăsăm introducerea noastră să arate cam așa:

  • aveți un tabel cu datele clienților stocate în baza de date 1 milion de înregistrări
  • in fiecare zi un client iti trimite unul nou „imagine” completă
  • din experiență știi că din când în când nu se modifică mai mult de 10 de înregistrări

Un exemplu clasic al unei astfel de situații este Baza KLADR — sunt o mulțime de adrese în total, dar în fiecare încărcare săptămânală sunt foarte puține modificări (denumirea așezărilor, combinarea străzilor, aspectul de case noi) chiar și la scară națională.

3.1. Algoritm de sincronizare completă

Pentru simplitate, să presupunem că nici măcar nu este nevoie să restructurați datele - doar aduceți tabelul în forma dorită, adică:

  • elimina tot ceea ce nu mai există
  • actualizare tot ceea ce exista deja și trebuie actualizat
  • insera tot ce nu s-a întâmplat încă

De ce ar trebui să se facă operațiunile în această ordine? Pentru că așa va crește dimensiunea mesei minim (ține minte MVCC!).

DELETE FROM dst

Nu, bineînțeles că te descurci cu doar două operații:

  • elimina (DELETE) totul în general
  • insera toate din noua imagine

Dar, în același timp, datorită MVCC, Dimensiunea mesei va crește exact de două ori! Obținerea de +1 milion de imagini ale înregistrărilor din tabel datorită unei actualizări de 10K este atât de redundantă...

TRUNCATE dst

Un dezvoltator mai experimentat știe că întreaga tabletă poate fi curățată destul de ieftin:

  • curat (TRUNCATE) întregul tabel
  • insera toate din noua imagine

Metoda este eficientă, uneori destul de aplicabil, dar există o problemă... Vom adăuga 1M de înregistrări pentru o lungă perioadă de timp, așa că nu ne putem permite să lăsăm tabelul gol pentru tot acest timp (cum se va întâmpla fără a-l împacheta într-o singură tranzacție).

Care înseamnă:

  • începem tranzacție de lungă durată
  • TRUNCATE impune Acces Exclusiv-blocarea
  • facem inserarea pentru o lungă perioadă de timp, și toți ceilalți în acest moment nici măcar nu pot SELECT

Ceva nu merge bine...

ALTER TABLE… RENUMIRE… / DROP TABLE…

O alternativă este să completați totul într-un tabel nou separat și apoi pur și simplu să îl redenumiți în locul celui vechi. Câteva lucruri mici urâte:

  • totusi Acces Exclusiv, deși mult mai puțin timp
  • toate planurile/statisticile de interogare pentru acest tabel sunt resetate, trebuie să rulați ANALYZE
  • toate cheile externe sunt sparte (FK) la masă

A existat un patch WIP de la Simon Riggs care sugera realizarea ALTER-o operație de înlocuire a corpului tabelului la nivel de fișier, fără a atinge statistici și FK, dar nu a colectat cvorum.

ȘTERGE, ACTUALIZAȚI, INSERA

Deci, ne hotărâm pe opțiunea de neblocare a trei operațiuni. Aproape trei... Cum să faci asta cel mai eficient?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Post-procesare de import

În același KLADR, toate înregistrările modificate trebuie să fie rulate suplimentar prin post-procesare - normalizate, cuvinte cheie evidențiate și reduse la structurile necesare. Dar de unde știi... ce s-a schimbat exactfără a complica codul de sincronizare, ideal fără a-l atinge deloc?

Dacă numai procesul dvs. are acces la scriere în momentul sincronizării, atunci puteți utiliza un declanșator care va colecta toate modificările pentru noi:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Acum putem aplica declanșatoare înainte de a începe sincronizarea (sau le putem activa prin ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

Și apoi extragem cu calm toate modificările de care avem nevoie din tabelele de jurnal și le rulăm prin handlere adiționale.

3.3. Importul seturi legate

Mai sus am luat în considerare cazurile în care structurile de date ale sursei și destinației sunt aceleași. Dar dacă încărcarea dintr-un sistem extern are un format diferit de structura de stocare din baza noastră de date?

Să luăm ca exemplu stocarea clienților și a conturilor acestora, opțiunea clasică „mai multe la unu”:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Dar descărcarea dintr-o sursă externă vine la noi sub forma „toate într-unul”:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Evident, datele clienților pot fi duplicate în această versiune, iar înregistrarea principală este „contul”:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Pentru model, vom introduce pur și simplu datele noastre de testare, dar rețineți - COPY mai eficient!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Mai întâi, să evidențiem acele „tăieri” la care se referă „faptele” noastre. În cazul nostru, facturile se referă la clienți:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Pentru a asocia corect conturile cu ID-urile clienților, trebuie mai întâi să aflăm sau să generăm acești identificatori. Să adăugăm câmpuri sub ele:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Să folosim metoda de sincronizare a tabelului descrisă mai sus cu o mică modificare - nu vom actualiza și nu vom șterge nimic din tabelul țintă, deoarece importăm clienți „numai pentru adăugare”:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

De fapt, totul este în invoice_import Acum avem câmpul de contact completat client_id, cu care vom introduce factura.

Sursa: www.habr.com

Adauga un comentariu