DBA: компетентно организиране на синхронизации и импортиране

За сложна обработка на големи масиви от данни (различни ETL процеси: импортиране, конвертиране и синхронизиране с външен източник) често има нужда временно „запомняне“ и незабавно бързо обработване нещо обемно.

Типична задача от този вид обикновено звучи по следния начин: "Точно тук счетоводен отдел разтоварен от клиентската банка последните получени плащания, трябва бързо да ги качите на уебсайта и да ги свържете към вашите сметки.“

Но когато обемът на това „нещо“ започне да се измерва в стотици мегабайти и услугата трябва да продължи да работи с базата данни 24x7, възникват много странични ефекти, които ще съсипят живота ви.
DBA: компетентно организиране на синхронизации и импортиране
За да се справите с тях в PostgreSQL (и не само в него), можете да използвате някои оптимизации, които ще ви позволят да обработвате всичко по-бързо и с по-малко потребление на ресурси.

1. Къде да изпратим?

Първо, нека решим къде можем да качим данните, които искаме да „обработим“.

1.1. Временни таблици (TEMPORARY TABLE)

По принцип за PostgreSQL временните таблици са същите като всички останали. Следователно, суеверия като „Всичко там се съхранява само в паметта и може да свърши“. Но има и няколко съществени разлики.

Ваше собствено „пространство от имена“ за всяка връзка към базата данни

Ако две връзки се опитат да се свържат едновременно CREATE TABLE x, тогава някой определено ще получи грешка при неуникалност обекти на база данни.

Но ако и двамата се опитат да изпълнят CREATE TEMPORARY TABLE x, тогава и двамата ще го направят нормално и всички ще получат вашето копие маси. И между тях няма да има нищо общо.

"Самоунищожаване" при прекъсване на връзката

Когато връзката е затворена, всички временни таблици се изтриват автоматично, така че ръчно DROP TABLE x няма смисъл освен...

Ако работите чрез pgbouncer в режим на транзакция, тогава базата данни продължава да вярва, че тази връзка все още е активна и в нея тази временна таблица все още съществува.

Следователно опитът да го създадете отново, от различна връзка към pgbouncer, ще доведе до грешка. Но това може да бъде заобиколено с помощта на CREATE TEMPORARY TABLE IF NOT EXISTS x.

Вярно е, че е по-добре да не правите това така или иначе, защото тогава можете „внезапно“ да намерите там данните, останали от „предишния собственик“. Вместо това е много по-добре да прочетете ръководството и да видите, че при създаване на таблица е възможно да се добави ON COMMIT DROP - тоест, когато транзакцията приключи, таблицата ще бъде автоматично изтрита.

Нерепликация

Тъй като принадлежат само на конкретна връзка, временните таблици не се репликират. Но това елиминира необходимостта от двойно записване на данни в heap + WAL, така че INSERT/UPDATE/DELETE в него е много по-бързо.

Но тъй като временната таблица все още е „почти обикновена“ таблица, тя също не може да бъде създадена на реплика. Поне засега, въпреки че съответният пач се разпространява отдавна.

1.2. НЕРЕГИСТРАНА ТАБЛИЦА

Но какво трябва да направите, например, ако имате някакъв вид тромав ETL процес, който не може да бъде внедрен в рамките на една транзакция, но все пак имате pgbouncer в режим на транзакция? ..

Или потокът от данни е толкова голям, че Няма достатъчно честотна лента на една връзка от база данни (четете, един процес на процесор)?..

Или се провеждат някакви операции асинхронно в различни връзки?..

Тук има само един вариант - временно създайте невременна таблица. Игра на думи, да. Това е:

  • създадох „мои собствени“ таблици с максимално произволни имена, за да не се пресичат с никого
  • Екстракт: попълни ги с данни от външен източник
  • Transform: преобразувани, попълнени полета за свързване на ключове
  • Натоварване: изсипва готови данни в целеви таблици
  • изтри „моите“ таблици

И сега - муха в мехлема. Всъщност, всички записи в PostgreSQL се случват два пъти - първи в WAL, след това в телата на таблицата/индекса. Всичко това се прави, за да поддържа ACID и да коригира видимостта на данните между тях COMMITорехов и ROLLBACK„нулеви транзакции.

Но ние нямаме нужда от това! Имаме целия процес Или беше напълно успешен, или не.. Няма значение колко междинни транзакции ще има - ние не се интересуваме от „продължаване на процеса от средата“, особено когато не е ясно къде е бил.

За да направят това, разработчиците на PostgreSQL във версия 9.1 въведоха такова нещо като UNLOGGED таблици:

С тази индикация таблицата се създава като нерегистрирана. Данните, записани в нерегистрирани таблици, не преминават през журнала за предварителен запис (вижте Глава 29), което кара такива таблици да работят много по-бързо от обикновено. Те обаче не са имунизирани срещу провал; в случай на повреда на сървъра или аварийно изключване, нерегистрирана таблица автоматично съкратено. Освен това, съдържанието на нерегистрираната таблица не се репликира към подчинени сървъри. Всички индекси, създадени върху нерегистрирана таблица, автоматично стават нерегистрирани.

Накратко, ще бъде много по-бързо, но ако сървърът на базата данни „падне“, ще бъде неприятно. Но колко често се случва това и вашият ETL процес знае ли как да коригира това правилно „от средата“ след „съживяване“ на базата данни?..

Ако не и случаят по-горе е подобен на вашия, използвайте UNLOGGED, но никога не активирайте този атрибут на реални таблици, данните от който са ви скъпи.

1.3. ON COMMIT { DELETE ROWS | ИЗПУСКАЙТЕ}

Тази конструкция ви позволява да укажете автоматично поведение при завършване на транзакция при създаване на таблица.

Около ON COMMIT DROP Вече писах по-горе, генерира DROP TABLE, но със ON COMMIT DELETE ROWS ситуацията е по-интересна - тук се генерира TRUNCATE TABLE.

Тъй като цялата инфраструктура за съхраняване на мета-описанието на временна таблица е точно същата като тази на обикновена таблица, тогава Постоянното създаване и изтриване на временни таблици води до силно "подуване" на системните таблици pg_class, pg_attribute, pg_attrdef, pg_depend,…

Сега си представете, че имате работник на директна връзка с базата данни, който отваря нова транзакция всяка секунда, създава, попълва, обработва и изтрива временна таблица... Ще има излишък от боклук, натрупан в системните таблици и това ще доведе до допълнителни спирачки за всяка операция.

Като цяло, не правете това! В този случай е много по-ефективно CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS извадете го от цикъла на транзакцията - тогава до началото на всяка нова транзакция масите вече са ще съществува (запазване на обаждане CREATE), Но ще бъде празен, благодарение на TRUNCATE (също запазихме неговото повикване) при завършване на предишната транзакция.

1.4. ХАРЕСВАНЕ...ВКЛЮЧИТЕЛНО...

Споменах в началото, че един от типичните случаи на използване на временни таблици е различни видове импортиране - и разработчикът уморено копира и поставя списъка с полета на целевата таблица в декларацията на своята временна...

Но мързелът е двигателят на прогреса! Ето защо създайте нова таблица „въз основа на извадка“ може да бъде много по-просто:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Тъй като след това можете да генерирате много данни в тази таблица, търсенето в нея никога няма да бъде бързо. Но има традиционно решение за това - индекси! И да, временната таблица също може да има индекси.

Тъй като често необходимите индекси съвпадат с индексите на целевата таблица, можете просто да пишете LIKE target_table INCLUDING INDEXES.

Ако имате нужда и от DEFAULT-стойности (например за попълване на стойностите на първичния ключ), можете да използвате LIKE target_table INCLUDING DEFAULTS. Или просто - LIKE target_table INCLUDING ALL — копира настройки по подразбиране, индекси, ограничения,...

Но тук трябва да разберете, че ако сте създали незабавно импортиране на таблица с индекси, тогава зареждането на данните ще отнеме повече времеотколкото ако първо попълните всичко и едва след това навиете индексите - вижте как прави това като пример pg_dump.

Като цяло, RTFM!

2. Как се пише?

Само да кажа - използвайте го COPY- поток вместо „пакет“ INSERT, ускорение на моменти. Можете дори директно от предварително генериран файл.

3. Как да обработваме?

И така, нека нашето въведение изглежда така:

  • имате таблица с клиентски данни, съхранени във вашата база данни 1 милион записи
  • всеки ден клиент ви изпраща нов пълен "изображение"
  • от опит знаеш, че от време на време не се променят повече от 10K записа

Класически пример за такава ситуация е База KLADR — общо има много адреси, но във всяко седмично качване има много малко промени (преименуване на населени места, обединяване на улици, поява на нови къщи) дори в национален мащаб.

3.1. Пълен алгоритъм за синхронизация

За простота, нека кажем, че дори не е необходимо да преструктурирате данните - просто приведете таблицата в желаната форма, тоест:

  • премахнете всичко, което вече не съществува
  • актуализация всичко, което вече съществува и трябва да се актуализира
  • вложка всичко, което още не се е случило

Защо операциите трябва да се извършват в този ред? Защото така размерът на масата ще нарасне минимално (помнете за MVCC!).

ИЗТРИВАНЕ ОТ dst

Не, разбира се, можете да минете само с две операции:

  • премахнете (DELETE) всичко като цяло
  • вложка всичко от новото изображение

Но в същото време, благодарение на MVCC, Размерът на масата ще се увеличи точно два пъти! Получаването на +1M изображения на записи в таблицата поради 10K актуализация е толкова излишно...

ТРЪНЦИРАНЕ дст

По-опитен разработчик знае, че целият таблет може да бъде почистен доста евтино:

  • чист (TRUNCATE) цялата таблица
  • вложка всичко от новото изображение

Методът е ефективен, понякога доста приложимо, но има проблем... Ще добавяме 1 милион записа за дълго време, така че не можем да си позволим да оставим таблицата празна през цялото това време (както ще се случи, без да я обгърнем в една транзакция).

Което означава:

  • ние започваме дългосрочна сделка
  • TRUNCATE налага Изключителен достъп- блокиране
  • ние правим вмъкването за дълго време, а всички останали по това време дори не може SELECT

Нещо не върви...

ПРОМЯНА НА ТАБЛИЦА… ПРЕИМЕНУВАНЕ… / ПУСКА НА ТАБЛИЦА…

Алтернатива е да попълните всичко в отделна нова таблица и след това просто да я преименувате на мястото на старата. Няколко гадни дребни неща:

  • все още също Изключителен достъп, макар и значително по-малко време
  • всички планове/статистики за заявки за тази таблица се нулират, трябва да стартирате ANALYZE
  • всички външни ключове са повредени (FK) към масата

Имаше WIP пач от Саймън Ригс, който предлагаше да се направи ALTER-операция за подмяна на тялото на таблицата на ниво файл, без да се пипат статистики и FK, но не събра кворум.

ИЗТРИВАНЕ, АКТУАЛИЗИРАНЕ, ВМЪКВАНЕ

И така, ние се спираме на опцията без блокиране на три операции. Почти три... Как да направите това най-ефективно?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Постобработка на импортиране

В същия KLADR всички променени записи трябва допълнително да преминат през последваща обработка - нормализирани, подчертани ключови думи и намалени до необходимите структури. Но откъде знаеш - какво точно се променибез да усложнявате кода за синхронизация, в идеалния случай без изобщо да го докосвате?

Ако само вашият процес има достъп за запис по време на синхронизирането, тогава можете да използвате тригер, който ще събере всички промени за нас:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Сега можем да приложим тригери, преди да започнем синхронизация (или да ги активираме чрез ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

И след това спокойно извличаме всички промени, от които се нуждаем, от журналните таблици и ги пускаме през допълнителни манипулатори.

3.3. Импортиране на свързани набори

По-горе разгледахме случаи, когато структурите от данни на източника и дестинацията са еднакви. Но какво ще стане, ако качването от външна система има формат, различен от структурата за съхранение в нашата база данни?

Да вземем за пример съхранението на клиенти и техните акаунти, класическата опция „много към едно“:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Но изтеглянето от външен източник идва при нас под формата на „всичко в едно“:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Очевидно клиентските данни могат да бъдат дублирани в тази версия и основният запис е „акаунт“:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

За модела просто ще вмъкнем нашите тестови данни, но помнете - COPY по-ефикасно!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Първо, нека подчертаем онези „съкращения“, за които се отнасят нашите „факти“. В нашия случай фактурите се отнасят за клиенти:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

За да свържем правилно акаунти с клиентски идентификатори, първо трябва да открием или генерираме тези идентификатори. Нека добавим полета под тях:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Нека използваме описания по-горе метод за синхронизиране на таблици с малка поправка - няма да актуализираме или изтрием нищо в целевата таблица, защото импортираме клиенти „само за добавяне“:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Всъщност всичко е вътре invoice_import Сега имаме попълнено поле за контакт client_id, с който ще вмъкнем фактурата.

Източник: www.habr.com

Добавяне на нов коментар