DBA: kompetentnie organizuj synchronizacje i importy

Do złożonego przetwarzania dużych zbiorów danych (różnych Procesy ETL: import, konwersja i synchronizacja z zewnętrznym źródłem) często zachodzi taka potrzeba tymczasowo „zapamiętać” i natychmiast szybko przetworzyć coś obszernego.

Typowe zadanie tego rodzaju zwykle brzmi mniej więcej tak: "Tutaj dział księgowości wyładowany z banku klienta ostatnie otrzymane płatności, musisz szybko załadować je na stronę i połączyć ze swoimi kontami.”

Kiedy jednak objętość tego „czegoś” zaczyna mierzyć się w setkach megabajtów, a usługa musi nadal pracować z bazą danych 24 godziny na dobę, 7 dni w tygodniu, pojawia się wiele skutków ubocznych, które zrujnują Ci życie.
DBA: kompetentnie organizuj synchronizacje i importy
Aby sobie z nimi poradzić w PostgreSQL (i nie tylko w nim), można zastosować pewne optymalizacje, które pozwolą Ci przetwarzać wszystko szybciej i przy mniejszym zużyciu zasobów.

1. Gdzie wysłać?

Najpierw zdecydujmy, gdzie możemy przesłać dane, które chcemy „przetworzyć”.

1.1. Tabele tymczasowe (TABELA TYMCZASOWA)

W zasadzie dla PostgreSQL tabele tymczasowe są takie same jak inne. Dlatego przesądy takie jak „Wszystko tam jest zapisane tylko w pamięci i może się to skończyć”. Ale jest też kilka znaczących różnic.

Własna „przestrzeń nazw” dla każdego połączenia z bazą danych

Jeśli dwa połączenia próbują połączyć się w tym samym czasie CREATE TABLE x, to ktoś na pewno sięgnie błąd niepowtarzalności obiekty bazy danych.

Ale jeśli obaj spróbują wykonać CREATE TEMPORARY TABLE x, wtedy obaj zrobią to normalnie i wszyscy dostaną twoja kopia stoły. I nie będzie między nimi nic wspólnego.

„Samozniszczenie” podczas rozłączania

Po zamknięciu połączenia wszystkie tabele tymczasowe są automatycznie usuwane, więc ręcznie DROP TABLE x nie ma sensu, chyba że...

Jeśli przepracujesz pgbouncer w trybie transakcyjnym, wówczas baza danych nadal uważa, że ​​to połączenie jest nadal aktywne i w niej nadal istnieje ta tabela tymczasowa.

Dlatego próba utworzenia go ponownie, z innego połączenia do pgbouncer, zakończy się błędem. Można to jednak obejść, stosując CREATE TEMPORARY TABLE IF NOT EXISTS x.

To prawda, że ​​​​i tak lepiej tego nie robić, bo wtedy „nagle” możesz znaleźć tam dane pozostałe od „poprzedniego właściciela”. Zamiast tego znacznie lepiej jest przeczytać instrukcję i przekonać się, że podczas tworzenia tabeli można ją dodać ON COMMIT DROP - czyli po zakończeniu transakcji tabela zostanie automatycznie usunięta.

Brak replikacji

Ponieważ należą one tylko do określonego połączenia, tabele tymczasowe nie są replikowane. Ale eliminuje to potrzebę podwójnej rejestracji danych na stercie + WAL, więc WSTAWIANIE/AKTUALIZACJA/USUWANIE do niego jest znacznie szybsze.

Ponieważ jednak tabela tymczasowa jest nadal „prawie zwyczajną” tabelą, nie można jej również utworzyć na replice. Przynajmniej na razie, choć odpowiedni patch krąży już od dłuższego czasu.

1.2. NIEZALOGOWANA TABELA

Co jednak zrobić, jeśli np. masz jakiś uciążliwy proces ETL, którego nie da się zrealizować w ramach jednej transakcji, a mimo to masz pgbouncer w trybie transakcyjnym? ..

Lub przepływ danych jest tak duży, że Jedno połączenie nie ma wystarczającej przepustowości z bazy danych (odczyt, jeden proces na procesor)?..

Albo jakieś operacje są w toku asynchronicznie w różnych połączeniach?..

Tutaj jest tylko jedna opcja – tymczasowo utwórz nietymczasową tabelę. Pun, tak. To jest:

  • stworzyłem „własne” tabele o maksymalnie losowych nazwach, aby z nikim się nie krzyżować
  • Wyciąg: wypełnij je danymi z zewnętrznego źródła
  • Przekształcać: przekonwertowany, wypełnione kluczowe pola łączące
  • Załadować: wlał gotowe dane do tabel docelowych
  • usunięto „moje” tabele

A teraz - mucha w maści. W rzeczywistości, wszystkie zapisy w PostgreSQL zdarzają się dwa razy - pierwszy w WAL, a następnie do treści tabeli/indeksu. Wszystko to ma na celu obsługę ACID i poprawną widoczność danych pomiędzy COMMIT„zwariowany i ROLLBACKtransakcje zerowe.

Ale nie potrzebujemy tego! Mamy cały proces Albo było to całkowicie udane, albo nie.. Nie ma znaczenia, ile będzie transakcji pośrednich – nie interesuje nas „kontynuowanie procesu od środka”, zwłaszcza gdy nie jest jasne, gdzie to było.

Aby to zrobić, programiści PostgreSQL, już w wersji 9.1, wprowadzili coś takiego jak NIEZALOGOWANE tabele:

Przy tym wskazaniu tabela jest tworzona jako niezalogowana. Dane zapisane w niezalogowanych tabelach nie przechodzą przez dziennik zapisu z wyprzedzeniem (patrz rozdział 29), co powoduje, że takie tabele pracować znacznie szybciej niż zwykle. Jednak nie są odporni na niepowodzenia; w przypadku awarii serwera lub awaryjnego wyłączenia, niezalogowana tabela automatycznie obcinane. Dodatkowo zawartość tabeli niezalogowanej nie replikowane do serwerów podrzędnych. Wszelkie indeksy utworzone w niezalogowanej tabeli zostaną automatycznie wyrejestrowane.

Krótko mówiąc, będzie znacznie szybciej, ale jeśli serwer bazy danych „upadnie”, będzie to nieprzyjemne. Ale jak często się to zdarza i czy Twój proces ETL wie, jak to poprawnie skorygować „od środka” po „rewitalizacji” bazy danych?..

Jeśli nie, a powyższy przypadek jest podobny do Twojego, użyj UNLOGGEDale nigdy nie włączaj tego atrybutu na prawdziwych stołach, dane z których są Ci bliskie.

1.3. ON COMMIT { USUŃ WIERSZA | UPUSZCZAĆ}

Ta konstrukcja pozwala określić automatyczne zachowanie po zakończeniu transakcji podczas tworzenia tabeli.

Про ON COMMIT DROP Już pisałem powyżej, generuje DROP TABLE, ale z ON COMMIT DELETE ROWS sytuacja jest bardziej interesująca - jest generowana tutaj TRUNCATE TABLE.

Ponieważ cała infrastruktura do przechowywania metaopisu tabeli tymczasowej jest dokładnie taka sama, jak w przypadku zwykłej tabeli Ciągłe tworzenie i usuwanie tabel tymczasowych prowadzi do poważnego „pęcznienia” tabel systemowych pg_class, pg_attribute, pg_attrdef, pg_dependent,…

Teraz wyobraź sobie, że masz pracownika podłączonego bezpośrednio do bazy danych, który co sekundę otwiera nową transakcję, tworzy, wypełnia, przetwarza i usuwa tabelę tymczasową... W tabelach systemowych zgromadzi się nadmiar śmieci, a spowoduje to dodatkowe hamulce przy każdej operacji.

Ogólnie rzecz biorąc, nie rób tego! W tym przypadku jest to o wiele skuteczniejsze CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS wyjmij go z cyklu transakcyjnego - wtedy na początku każdej nowej transakcji tabele już są będzie istnieć (zapisz połączenie CREATE), ale będzie pusty, dzięki TRUNCATE (zapisaliśmy także jego połączenie) przy finalizacji poprzedniej transakcji.

1.4. JAK... W TYM...

Wspomniałem na początku, że jednym z typowych przypadków użycia tabel tymczasowych są różnego rodzaju importy - a programista ze zmęczeniem kopiuje i wkleja listę pól tabeli docelowej do deklaracji swojej tymczasowej tabeli...

Ale lenistwo jest motorem postępu! Dlatego utwórz nową tabelę „na podstawie próbki” może być znacznie prościej:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Ponieważ możesz następnie wygenerować wiele danych w tej tabeli, przeszukiwanie jej nigdy nie będzie szybkie. Istnieje jednak tradycyjne rozwiązanie tego problemu — indeksy! I tak, tabela tymczasowa może również zawierać indeksy.

Ponieważ często wymagane indeksy pokrywają się z indeksami tabeli docelowej, możesz po prostu pisać LIKE target_table INCLUDING INDEXES.

Jeśli również potrzebujesz DEFAULT-wartości (na przykład do wypełnienia wartości klucza podstawowego), których możesz użyć LIKE target_table INCLUDING DEFAULTS. Lub po prostu - LIKE target_table INCLUDING ALL — kopiuje wartości domyślne, indeksy, ograniczenia,...

Ale tutaj musisz to zrozumieć, jeśli stworzyłeś natychmiast zaimportuj tabelę z indeksami, wówczas ładowanie danych będzie trwało dłużejniż gdybyś najpierw wszystko wypełnił, a dopiero potem zwinął indeksy - spójrz, jak to robi jako przykład pg_dump.

Ogólnie, RTFM!

2. Jak pisać?

Powiem jedno – korzystaj COPY-przepływ zamiast „pakowania” INSERT, momentami przyspieszenie. Można nawet bezpośrednio z wstępnie wygenerowanego pliku.

3. Jak przetwarzać?

Załóżmy więc, że nasze wprowadzenie będzie wyglądać mniej więcej tak:

  • masz tabelę z danymi klienta przechowywanymi w bazie danych Rekordy 1M
  • każdego dnia klient wysyła Ci nowy pełny „obraz”
  • z doświadczenia wiesz, że od czasu do czasu nie zmieniono więcej niż 10 tys. rekordów

Klasycznym przykładem takiej sytuacji jest Baza KLADR — adresów jest w sumie sporo, ale w każdym cotygodniowym przesłaniu zmian (zmiana nazw osiedli, łączenie ulic, pojawienie się nowych domów) jest bardzo niewiele, nawet w skali kraju.

3.1. Algorytm pełnej synchronizacji

Dla uproszczenia załóżmy, że nie trzeba nawet restrukturyzować danych – wystarczy doprowadzić tabelę do pożądanej postaci, czyli:

  • usunąć wszystko, czego już nie ma
  • odśwież wszystko, co już istniało i wymaga aktualizacji
  • wstawić wszystko, co jeszcze się nie wydarzyło

Dlaczego operacje należy wykonywać w tej kolejności? Ponieważ w ten sposób rozmiar stołu będzie minimalnie rosnąć (pamiętaj o MVCC!).

USUŃ Z dst

Nie, oczywiście, że możesz to zrobić, wykonując tylko dwie operacje:

  • usunąć (DELETE) ogólnie wszystko
  • wstawić wszystko z nowego obrazu

Ale jednocześnie, dzięki MVCC, Rozmiar stołu wzrośnie dokładnie dwukrotnie! Uzyskanie +1 miliona obrazów rekordów w tabeli dzięki aktualizacji 10 tys. to taka nadmiarowość…

SKRÓĆ dst

Bardziej doświadczony programista wie, że cały tablet można wyczyścić dość tanim kosztem:

  • jasne (TRUNCATE) cały stół
  • wstawić wszystko z nowego obrazu

Metoda jest skuteczna, czasami całkiem stosowne, ale jest problem... 1M rekordów będziemy dodawać przez długi czas, więc nie możemy pozwolić sobie na pozostawienie tabeli pustej przez cały ten czas (co stanie się bez zawinięcia jej w jedną transakcję).

Co znaczy:

  • zaczynamy długoterminowa transakcja
  • TRUNCATE narzuca Dostęp na wyłączność-bloking
  • robimy wstawianie przez długi czas, a wszyscy inni w tym czasie nawet nie mogę SELECT

Coś idzie nie tak...

ZMIEŃ TABELĘ… ZMIEŃ NAZWĘ… / USUŃ TABELĘ…

Alternatywą jest wypełnienie wszystkiego w osobnej nowej tabeli, a następnie po prostu zmiana jej nazwy w miejsce starej. Kilka paskudnych drobiazgów:

  • nadal też Dostęp na wyłączność, choć znacznie krócej
  • wszystkie plany zapytań/statystyki dla tej tabeli zostaną zresetowane, trzeba uruchomić ANALIZĘ
  • wszystkie klucze obce są zepsute (FK) do stołu

Była łatka WIP od Simona Riggsa, która sugerowała wykonanie ALTER-operacja zamiany treści tabeli na poziomie pliku, bez dotykania statystyk i FK, ale nie zebrała kworum.

USUŃ, AKTUALIZUJ, WSTAW

Decydujemy się więc na nieblokującą opcję trzech operacji. Prawie trzy... Jak to zrobić najskuteczniej?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Importuj obróbkę końcową

W tym samym KLADR-ie wszystkie zmienione rekordy muszą zostać dodatkowo poddane postprocessingowi - znormalizowane, wyróżnione słowa kluczowe i zredukowane do wymaganych struktur. Ale skąd wiesz - co dokładnie się zmieniłobez komplikowania kodu synchronizacji, najlepiej bez dotykania go w ogóle?

Jeśli w momencie synchronizacji tylko Twój proces ma prawo do zapisu, możesz skorzystać z wyzwalacza, który zbierze za nas wszystkie zmiany:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Teraz możemy zastosować wyzwalacze przed rozpoczęciem synchronizacji (lub włączyć je poprzez ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

Następnie spokojnie wyodrębniamy wszystkie potrzebne zmiany z tabel dzienników i przepuszczamy je przez dodatkowe procedury obsługi.

3.3. Importowanie połączonych zestawów

Powyżej rozważyliśmy przypadki, gdy struktury danych źródła i miejsca docelowego są takie same. Co jednak, jeśli przesyłanie z systemu zewnętrznego ma format inny niż struktura przechowywania w naszej bazie danych?

Weźmy jako przykład przechowywanie klientów i ich kont, klasyczną opcję „wiele do jednego”:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Ale pobieranie z zewnętrznego źródła przychodzi do nas w formie „wszystko w jednym”:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Oczywiście w tej wersji dane klienta można powielić, a głównym zapisem jest „konto”:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

W przypadku modelu po prostu wstawimy nasze dane testowe, ale pamiętaj - COPY bardziej wydajny!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Na początek podkreślmy te „cięcia”, do których odnoszą się nasze „fakty”. W naszym przypadku faktury dotyczą klientów:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Aby poprawnie powiązać konta z identyfikatorami klientów, musimy najpierw poznać lub wygenerować te identyfikatory. Dodajmy pod nimi pola:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Skorzystajmy z opisanej powyżej metody synchronizacji tabel z małą poprawką - w tabeli docelowej nie będziemy niczego aktualizować ani usuwać, ponieważ importujemy klientów „tylko do dopisywania”:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Właściwie wszystko jest w środku invoice_import Mamy teraz wypełnione pole kontaktowe client_id, do którego wstawimy fakturę.

Źródło: www.habr.com

Dodaj komentarz