DBA: грамотно організовуємо синхронізації та імпорти

При складній обробці великих наборів даних (різні ETL-процеси: імпорти, конвертації та синхронізації із зовнішнім джерелом) часто виникає необхідність тимчасово «запам'ятати», і відразу швидко обробити щось об'ємне.

Типове завдання подібного роду звучить зазвичай приблизно так: "Ось тут бухгалтерія вивантажила із клієнт-банку останні оплати, що їх надійшли, треба їх швиденько вкачати на сайт і прив'язати до рахунків»

Але коли обсяг цього "чогось" починає вимірюватися сотнями мегабайт, а сервіс при цьому повинен продовжувати працювати з базою в режимі 24×7, виникає безліч side-ефектів, які псуватимуть вам життя.
DBA: грамотно організовуємо синхронізації та імпорти
Щоб впоратися з ними в PostgreSQL (та й не тільки в ньому), можна використовувати деякі можливості для оптимізації, які дозволять обробити все швидше та з меншою витратою ресурсів.

1. Куди вантажити?

Спочатку давайте визначимося, куди ми можемо залити дані, які хочемо «відпроцесувати».

1.1. Тимчасові таблиці (TEMPORARY TABLE)

В принципі, для PostgreSQL тимчасові – це такі ж таблиці, як будь-які інші. Тому невірні забобони типу «Там усе зберігається тільки в пам'яті, а вона може скінчитися». Але є кілька істотних відмінностей.

Свій "неймспейс" для кожного підключення до БД

Якщо два підключення спробують одночасно виконати CREATE TABLE x, то хтось обов'язково отримає помилку неунікальності об'єктів БД

А от якщо обидва спробують виконати CREATE TEMPORARY TABLE x, то обидва нормально це зроблять, і кожен отримає свій екземпляр таблиці. І нічого спільного між ними не буде.

«Самознищення» при disconnect

При закритті підключення всі тимчасові таблиці автоматично видаляються, тому «вручну» виконувати DROP TABLE x сенсу немає жодного, крім…

Якщо ви працюєте через pgbouncer у transaction mode, то база продовжує вважати, що це з'єднання все ще активно, і в ньому ця тимчасова таблиця як і раніше існує.

Тому спроба створити її повторно, вже з іншого підключення до pgbouncer, призведе до помилки. Але це можна обійти, скориставшись CREATE TEMPORARY TABLE IF NOT EXISTS x.

Правда, краще так все-таки не робити, тому що потім можна «раптово» виявити там дані, що залишилися від «попереднього власника». Натомість набагато краще прочитати-таки мануал, і побачити, що при створенні таблиці є можливість дописати ON COMMIT DROP — тобто після завершення транзакції таблиця буде автоматично видалена.

Не-реплікація

З огляду на належність лише певному з'єднанню, часові таблиці не реплікуються. Зате це позбавляє необхідності подвійного запису даних в heap + WAL, тому INSERT/UPDATE/DELETE у ній значно швидше.

Але оскільки тимчасова — це все ж таки «майже звичайна» таблиця, то й на репліці її створити не можна теж. Принаймні поки що, хоча відповідний патч вже давно ходить.

1.2. Нежурнальовані таблиці (UNLOGGED TABLE)

Але що робити, наприклад, якщо ви маєте якийсь громіздкий ETL-процес, який не вдається реалізувати в рамках однієї транзакції, а у вас таки pgbouncer у transaction mode? ..

Або потік даних настільки великий, що недостатньо пропускну здатність одного з'єднання з БД (читай, одного процесу на CPU)?

Або частина операцій ідуть асинхронно у різних коннектах?

Тут варіант тільки один. тимчасово створювати не-часову таблицю. Каламбур, ага. Тобто:

  • створив «свої» таблиці з максимально-випадковими іменами, щоб ні з ким не перетнутися
  • Витяг: залив у них дані із зовнішнього джерела
  • Перетворення: перетворив, заповнив ключові зв'язувальні поля
  • Навантаження: перелив готові дані до цільових таблиць
  • видалив «свої» таблиці

А тепер – ложка дьогтю. По суті, весь запис у PostgreSQL відбувається двічі - спочатку у WAL, потім вже тіла таблиці/індексів. Все це зроблено для підтримки ACID та коректної видимості даних між COMMITСкинутими і ROLLBACKСтягнутими транзакціями.

Але нам цього не потрібно! У нас весь процес або цілком успішно пройшов, чи ні. Не має значення, скільки в ньому буде проміжних транзакцій — нам не цікаво «продовжувати процес із середини», особливо коли незрозуміло, де вона була.

Для цього розробники PostgreSQL ще у версії 9.1 впровадили таку штуку як нежурнальовані (UNLOGGED) таблиці:

З цим зазначенням таблиця створюється як нежурналируемая. Дані, що записуються в таблиці, що не журналюються, не проходять через журнал передзапису (див. Главу 29), в результаті чого такі таблиці працюють набагато швидше за звичайні. Однак вони не захищені від збою; при збої або аварійному відключенні сервера таблиця, що не журналюється. автоматично усікається. Крім того, вміст таблиці, що не журналується. не реплікується на керовані сервери. Будь-які індекси, що створюються для таблиці, що не журналюється, автоматично стають нежурналируемыми.

коротше, буде сильно швидше, але якщо сервер БД "впаде" - буде неприємно. Але чи часто це відбувається, і чи вміє ваш ETL-процес це коректно доопрацьовувати з середини після пожвавлення БД?

Якщо ж ні, і кейс вище схожий на ваш — використовуйте UNLOGGED, але ніколи не включайте цей атрибут на реальних таблицях, дані з яких вам дорогі.

1.3. ON COMMIT { DELETE ROWS | DROP }

Ця конструкція дозволяє при створенні таблиці встановити автоматичну поведінку при завершенні транзакції.

Про ON COMMIT DROP я вже написав вище, він генерує DROP TABLE, а ось з ON COMMIT DELETE ROWS ситуація цікавіша — тут генерується TRUNCATE TABLE.

Оскільки вся інфраструктура зберігання метаопису тимчасової таблиці така сама, як і у звичайної, то постійне створення-видалення тимчасових таблиць призводить до сильного «розбухання» системних таблиць pg_class, pg_attribute, pg_attrdef, pg_depend, ...

Тепер уявіть, що у вас є воркер на прямому з'єднанні з БД, який кожну секунду відкриває нову транзакцію, створює, наповнює, обробляє та видаляє тимчасову таблицю… Сміття в системних таблицях накопичиться надміру, а це зайві гальма при кожній операції.

Загалом не треба так! У цьому випадку набагато ефективніше CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS винести за цикл транзакцій — тоді на початок кожної нової транзакції таблиці вже існуватиме (Економимо виклик CREATE), Але буде порожньою, дякую TRUNCATE (його виклик ми теж заощадили) під час завершення попередньої транзакції.

1.4. LIKE… INCLUDING …

Я згадав на початку, що один з типових use case для тимчасових таблиць - це різного роду імпорти - і розробник стомлено копіпастит список полів цільової таблиці в оголошення своєї тимчасової таблиці.

Але ліньки - двигун прогресу! Тому створити нову таблицю «за зразком» можна набагато простіше:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Оскільки потім нагенерувати в цю таблицю можна дуже багато даних, то пошуки по ній стануть жодного разу не швидкими. Але проти цього є традиційне рішення – індекси! І, так, у тимчасової таблиці також можуть бути індекси.

Оскільки часто потрібні індекси збігаються з індексами цільової таблиці, то можна просто написати LIKE target_table INCLUDING INDEXES.

Якщо вам потрібні ще й DEFAULT-значення (наприклад, для заповнення значень первинного ключа), можна скористатися LIKE target_table INCLUDING DEFAULTS. Ну чи просто - LIKE target_table INCLUDING ALL - Скопіює дефолти, індекси, констрейнти, ...

Але тут уже треба розуміти, що якщо ви створювали імпорт-таблицю відразу з індексами, то заливатиметься дані довше, ніж якщо спочатку все залити, а вже потім накотити індекси - подивіться як приклад, як це робить pg_dump.

Загалом, RTFM!

2. Як писати?

Скажу просто – використовуйте COPY-Потік замість «пачки» INSERT, прискорення в рази. Можна навіть прямо із попередньо сформованого файлу.

3. Як обробляти?

Отже, нехай наша вступна виглядає приблизно так:

  • у вас у базі зберігається табличка з клієнтськими даними на 1M записів
  • кожен день клієнт надсилає вам новий повний «образ»
  • з досвіду ви знаєте, що від разу до разу змінюється не більше 10K записів

Класичним прикладом такої ситуації є база КЛАДР — всього адрес багато, але в кожному тижневому вивантаженні змін (перейменувань населених пунктів, об'єднань вулиць, появ нових будинків) зовсім небагато навіть у масштабі всієї країни.

3.1. Алгоритм повної синхронізації

Для простоти припустимо, що вам навіть реструктурувати дані не потрібно - просто привести таблицю в потрібний вигляд, тобто:

  • видалити все, чого вже немає
  • оновити все, що вже було, і треба оновлювати
  • вставити все, чого ще не було

Чому саме в такому порядку варто робити операції? Тому що саме так розмір таблиці зросте мінімально (пам'ятай про MVCC!).

DELETE FROM dst

Ні, звичайно можна обійтися лише двома операціями:

  • видалити (DELETE) взагалі все
  • вставити все з нового образу

Але при цьому завдяки MVCC, розмір таблиці збільшиться рівно вдвічі! Отримати +1M образів записів у таблиці через оновлення 10K - так собі надмірність.

TRUNCATE dst

Більш досвідчений розробник знає, що всю табличку цілком можна досить дешево зачистити:

  • очистити (TRUNCATE) таблицю повністю
  • вставити все з нового образу

Метод дієвий, іноді цілком застосуємо, Але є невдача ... Вливати 1M записів ми будемо до-о-олго, тому залишити таблицю порожньою на весь цей час (як станеться без обертання в єдину транзакцію) не можемо собі дозволити.

А значить:

  • у нас починається тривала транзакція
  • TRUNCATE накладає AccessExclusive-блокування
  • ми довго робимо вставку, а решта в цей час не можуть навіть SELECT

Щось недобре виходить...

ALTER TABLE ... RENAME ... / DROP TABLE ...

Як варіант залити все в окрему нову таблицю, а потім просто перейменувати на місце старої. Пара неприємних дрібниць:

  • таки теж AccessExclusive, хоч і суттєво менше за часом
  • скидаються всі плани запитів/статистика цієї таблиці, треба ганяти ANALYZE
  • ламаються усі зовнішні ключі (FK) на таблицю

Був WIP-патч від Simon Riggs, який пропонував зробити ALTER-Операцію для заміни тіла таблиці на файловому рівні, не чіпаючи статистику і FK, але не зібрав кворуму.

DELETE, UPDATE, INSERT

Отже, зупиняємось на неблокуючому варіанті із трьох операцій. Майже трьох… Як це зробити найефективніше?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Постобробка імпорту

У тому ж КЛАДРі всі записи, що змінилися, необхідно додатково прогнати через постобробку — нормалізувати, виділити ключові слова, привести до потрібних структур. Але як дізнатися що саме змінювалося, не ускладнюючи при цьому код синхронізації, в ідеалі взагалі не чіпаючи його?

Якщо доступ на запис у момент синхронізації є тільки у вашого процесу, то можна скористатися тригером, який збере для нас усі зміни:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Тепер ми можемо перед початком синхронізації тригери накласти (або увімкнути через ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

А потім спокійно з log-таблиць витягуємо всі потрібні нам зміни і проганяємо по додаткових обробників.

3.3. Імпорт пов'язаних наборів

Вище ми розглядали випадки, коли структури даних джерела та приймача збігаються. Але що робити, якщо вивантаження із зовнішньої системи має формат відмінний від структури зберігання у нас у базі?

Візьмемо як приклад зберігання клієнтів та рахунків за ними, класичний варіант «багато хто до одного»:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

А ось вивантаження із зовнішнього джерела приходить нам у вигляді «все в одному»:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Очевидно, що дані щодо клієнтів можуть дублюватися в такому варіанті, а основним записом є «рахунок»:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Для моделі просто вставимо наші тестові дані, але пам'ятаємо. COPY ефективніше!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Спочатку виділимо ті «розрізи», куди наші «факти» посилаються. У нашому випадку рахунки посилаються на клієнтів:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Щоб рахунки правильно зв'язати з ID клієнтів, нам ці ідентифікатори треба спочатку дізнатися чи згенерувати. Додамо під них поля:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Скористаємося описаним вище способом синхронізації таблиць з невеликою поправкою — не будемо нічого оновлювати та видаляти в цільовій таблиці, адже імпорт клієнтів у нас «append-only»:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Власне, все - у invoice_import тепер у нас заповнено поле зв'язку client_idз яким ми і вставимо рахунок.

Джерело: habr.com

Додати коментар або відгук