DBA: пісьменна арганізоўваем сінхранізацыі і імпарты

Пры складанай апрацоўцы вялікіх набораў дадзеных (розныя ETL-працэсы: імпарты, канвертацыі і сінхранізацыі з вонкавай крыніцай) часта ўзнікае неабходнасць часова «запомніць», і адразу хутка апрацаваць нешта аб'ёмнае.

Тыпавая задача падобнага роду гучыць звычайна прыкладна так: «Вось тут бухгалтэрыя выгрузіла з кліент-банка апошнія паступілі аплаты, трэба іх хуценька ўкачаць на сайт і прывязаць да рахункаў»

Але калі аб'ём гэтага чагосьці пачынае вымярацца сотнямі мегабайт, а сэрвіс пры гэтым павінен працягваць працаваць з базай у рэжыме 24×7, узнікае мноства side-эфектаў, якія будуць псаваць вам жыццё.
DBA: пісьменна арганізоўваем сінхранізацыі і імпарты
Каб зладзіцца з імі ў PostgreSQL (ды і не толькі ў ім), можна выкарыстаць некаторыя магчымасці для аптымізацый, якія дазволяць апрацаваць усё хутчэй і з малодшым выдаткам рэсурсаў.

1. Куды грузіць?

Спачатку давайце вызначымся, куды мы можам заліць дадзеныя, якія мы жадаем "адпрацэсіць".

1.1. Часовыя табліцы (TEMPORARY TABLE)

У прынцыпе, для PostgreSQL часовыя - гэта такія ж табліцы, як і любыя іншыя. Таму няслушныя забабоны тыпу "там усё захоўваецца толькі ў памяці, а яна можа скончыцца". Але ёсць і некалькі істотных адрозненняў.

Свой «неймспейс» для кожнага падключэння да БД

Калі два падключэнні паспрабуюць адначасова выканаць CREATE TABLE x, то хтосьці абавязкова атрымае памылку неўнікальнасці аб'ектаў БД.

А вось калі абодва паспрабуюць выканаць CREATE TEMPORARY TABLE x, то абодва нармальна гэта зробяць, і кожны атрымае свой экзэмпляр табліцы. І нічога агульнага паміж імі не будзе.

«Самазнішчэнне» пры disconnect

Пры зачыненні падлучэння ўсе часовыя табліцы аўтаматычна выдаляюцца, таму "уручную" выконваць DROP TABLE x сэнсу няма ніякага, акрамя…

Калі вы працуеце праз pgbouncer у transaction mode, то база працягвае лічыць, што гэта злучэнне ўсё яшчэ актыўна, і ў ім вось гэтая часавая табліца па-ранейшаму існуе.

Таму спроба стварыць яе паўторна, ужо з іншага падлучэння да pgbouncer, прывядзе да памылкі. Але гэта можна абыйсці, скарыстаўшыся CREATE TEMPORARY TABLE IF NOT EXISTS x.

Праўда, лепш так усёткі не рабіць, таму што затым можна "раптоўна" выявіць тамака, якія засталіся ад "папярэдняга ўладальніка" дадзеныя. Замест гэтага значна лепш прачытаць-ткі мануал, і ўбачыць, што пры стварэнні табліцы ёсць магчымасць дапісаць ON COMMIT DROP - гэта значыць пры завяршэнні транзакцыі табліца будзе аўтаматычна выдаленая.

Не-рэплікацыя

У сілу прыналежнасці толькі вызначанаму злучэнню, часавыя табліцы не рэпліцыруюцца. Затое гэта пазбаўляе ад неабходнасці падвойнага запісу дадзеных у heap + WAL, таму INSERT/UPDATE/DELETE у яе істотна хутчэй.

Але паколькі часавая - гэта ўсёткі "амаль звычайная" табліца, то і на рэпліцы яе стварыць нельга таксама. Прынамсі, пакуль, хаця адпаведны патч ужо даўно ходзіць.

1.2. Нежурналюемыя табліцы (UNLOGGED TABLE)

Але што рабіць, напрыклад, калі ў вас ёсць нейкі грувасткі ETL-працэс, які не ўдаецца рэалізаваць у рамках адной транзакцыі, а ў вас такі pgbouncer у transaction mode? ..

Або струмень дадзеных настолькі вялікі, што недастаткова прапускной здольнасці аднаго злучэння з БД (чытай, аднаго працэсу на CPU)?..

Або частка аперацый ідуць асінхронна у розных канэктах?..

Тут варыянт толькі адзін - часова ствараць нечасовую табліцу. Каламбур, ага. Гэта значыць:

  • стварыў "свае" табліцы з максімальна-выпадковымі імёнамі, каб ні з кім не перасекчыся
  • Выманне: заліў у іх дадзеныя са знешняй крыніцы
  • Пераўтварэнне: пераўтварыў, запоўніў ключавыя злучальныя палі
  • Нагрузка: пераліў гатовыя дадзеныя ў мэтавыя табліцы
  • выдаліў "свае" табліцы

А цяпер - лыжка дзёгцю. Па сутнасці, уся запіс у PostgreSQL адбываецца двойчы - спачатку ў WAL, потым ужо ў целы табліцы/індэксаў. Усё гэта зроблена для падтрымкі ACID і карэктнай бачнасці дадзеных паміж COMMITСцягнутымі і ROLLBACKСцягнутымі транзакцыямі.

Але нам гэтага не трэба! У нас увесь працэс ці цалкам паспяхова прайшоў, ці не. Усё роўна, колькі ў ім будзе прамежкавых транзакцый - нам не цікава "працягваць працэс з сярэдзіны", асабліва калі незразумела, дзе яна была.

Для гэтага распрацоўнікі PostgreSQL яшчэ ў версіі 9.1 укаранілі такую ​​штуку як нежурналюемыя (UNLOGGED) табліцы:

З гэтым указаннем табліца ствараецца як нежурналюемая. Дадзеныя, якія запісваюцца ў нежурналюемыя табліцы, не праходзяць праз часопіс перадзапісу (гл. Главу 29), у выніку чаго такія табліцы працуюць значна хутчэй за звычайных. Аднак, яны не абаронены ад збою; пры збоі або аварыйным адключэнні сервера нежурналюемая табліца аўтаматычна ўсекаецца. Акрамя таго, змесціва нежурналюемай табліцы не рэплікуецца на кіраваныя серверы. Любыя індэксы, якія ствараюцца для нежурналюемай табліцы, аўтаматычна становяцца нежурналяванымі.

карацей, будзе моцна хутчэй, Але калі сервер БД «упадзе» - будзе непрыемна. Але ці часта гэта адбываецца, і ці ўмее ваш ETL-працэс гэта карэктна дапрацоўваць "з сярэдзіны" пасля "ажыўлення" БД?

Калі ж няма, і кейс вышэй падобны на ваш – выкарыстоўвайце UNLOGGED, але ніколі не ўключайце гэты атрыбут на рэальных табліцах, дадзеныя з якіх вам дарогі.

1.3. ON COMMIT { DELETE ROWS | DROP }

Гэтая канструкцыя дазваляе пры стварэнні табліцы задаць аўтаматычныя паводзіны пры завяршэнні транзакцыі.

Пра ON COMMIT DROP я ўжо напісаў вышэй, ён генеруе DROP TABLE, а вось з ON COMMIT DELETE ROWS сітуацыя цікавейшая — тут генеруецца TRUNCATE TABLE.

Паколькі ўся інфраструктура захоўвання метаапісання часавай табліцы роўна такая ж, як і ў звычайнай, то сталае стварэнне-выдаленне часавых табліц прыводзіць да моцнага "набракання" сістэмных табліц pg_class, pg_attribute, pg_attrdef, pg_depend,…

Цяпер уявіце, што ў вас ёсць воркер на прамым злучэнні з БД, які кожную секунду адчыняе новую транзакцыю, стварае, напаўняе, апрацоўвае і выдаляе часавую табліцу… Смецце ў сістэмных табліцах назапасіцца ў лішку, а гэта лішнія тормазы пры кожнай аперацыі.

Увогуле, не трэба так! У гэтым выпадку значна больш эфектыўна CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS вынесці за цыкл транзакцый - тады да пачатку кожнай новай транзакцыі табліцы ўжо будзе існаваць (эканомім выклік CREATE), Але будзе пусты, дзякуючы TRUNCATE (яго выклік мы таксама зэканомілі) пры завяршэнні папярэдняй транзакцыі.

1.4. LIKE… INCLUDING …

Я згадаў у пачатку, што адзін з тыповых use case для часовых табліц - гэта рознага роду імпарты - і распрацоўшчык стомлена копіпастыт спіс палёў мэтавай табліцы ў аб'яву сваёй часовай…

Але лянота - рухавік прагрэсу! Таму стварыць новую табліцу "па ўзоры" можна значна прасцей:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Паколькі нагенераваць потым у гэтую табліцу можна вельмі шмат дадзеных, то пошукі па ёй стануць ні разу не хуткімі. Але супраць гэтага ёсць традыцыйнае рашэнне - індэксы! І, так, у часавай табліцы таксама могуць быць індэксы.

Паколькі, часцяком, патрэбныя азначнікі супадаюць з азначнікамі мэтавай табліцы, то можна проста напісаць LIKE target_table INCLUDING INDEXES.

Калі вам патрэбны яшчэ і DEFAULT-значэнні (напрыклад, для запаўнення значэнняў першаснага ключа), можна скарыстацца LIKE target_table INCLUDING DEFAULTS. Ну ці проста - LIKE target_table INCLUDING ALL - Скапіюе дэфолты, індэксы, канстрэйнты, ...

Але тут ужо трэба разумець, што калі вы стваралі імпарт-табліцу адразу з індэксамі, то залівацца дадзеныя будуць даўжэй, чым калі спачатку ўсё заліць, а ўжо потым накаціць індэксы - паглядзіце ў якасці прыкладу, як гэта робіць pg_dump.

Увогуле, RTFM!

2. Як пісаць?

Скажу проста - выкарыстоўвайце COPY-струмень замест «пачкі» INSERT, паскарэнне ў разы. Можна нават прама з папярэдне сфармаванага файла.

3. Як апрацоўваць?

Такім чынам, хай наша ўступная выглядае прыкладна так:

  • у вас у базе захоўваецца таблічка з кліенцкімі дадзенымі на 1M запісаў
  • кожны дзень кліент дасылае вам новы поўны "вобраз"
  • па досведзе вы ведаеце, што ад разу да разу змяняецца не больш за 10K запісаў

Класічным прыкладам падобнай сітуацыі з'яўляецца база КЛАДР — усяго адрасоў шмат, але ў кожнай тыднёвай выгрузцы змен (перайменаванняў населеных пунктаў, аб'яднанняў вуліц, з'яўленняў новых дамоў) зусім няшмат нават у маштабе ўсёй краіны.

3.1. Алгарытм поўнай сінхранізацыі

Для прастаты дапусцім, што вам нават рэструктураваць дадзеныя не трэба - проста прывесці табліцу ў патрэбны выгляд, гэта значыць:

  • выдаліць усё, чаго ўжо няма
  • абнавіць усё, што ўжо было, і трэба абнаўляць
  • ўставіць усё, чаго яшчэ не было

Чаму менавіта ў такім парадку трэба рабіць аперацыі? Бо менавіта так памер табліцы вырасце мінімальна (памятай пра MVCC!).

DELETE FROM dst

Не, вядома можна абысціся ўсяго дзвюма аперацыямі:

  • выдаліць (DELETE) наогул усё
  • ўставіць усё з новай выявы

Але пры гэтым, дзякуючы MVCC, памер табліцы павялічыцца роўна ў два разы! Атрымаць +1M вобразаў запісаў у табліцы з-за абнаўлення 10K – так сабе надмернасць…

TRUNCATE dst

Больш дасведчаны распрацоўшчык ведае, што ўсю таблічку цалкам можна дастаткова танна зачысціць:

  • ачысціць (TRUNCATE) табліцу цалкам
  • ўставіць усё з новай выявы

Метад дзейсны, часам цалкам ужывальны, Але ёсць няўдача… Уліваць 1M запісаў мы будзем да-о-олга, таму пакінуць табліцу пусты на ўвесь гэты час (як адбудзецца без паварочвання ў адзіную транзакцыю) не можам сабе дазволіць.

А значыць:

  • у нас пачынаецца працяглая транзакцыя
  • TRUNCATE накладвае AccessExclusive-блакіроўку
  • мы доўга робім устаўку, а ўсе астатнія ў гэты час не могуць нават SELECT

Нешта нядобра атрымліваецца...

ALTER TABLE… RENAME… / DROP TABLE …

Як варыянт - заліць усё ў асобную новую табліцу, а потым проста пераназваць на месца старой. Пара непрыемных дробязяў:

  • такі ж AccessExclusive, хоць і істотна менш па часе
  • скідаюцца ўсе планы запытаў/статыстыка гэтай табліцы, трэба ганяць ANALYZE
  • ламаюцца ўсе вонкавыя ключы (FK) на табліцу

Быў WIP-патч ад Simon Riggs, які прапанаваў зрабіць ALTER-аперацыю для падмены цела табліцы на файлавым узроўні, не чапаючы статыстыку і FK, але не сабраў кворуму.

DELETE, UPDATE, INSERT

Такім чынам, спыняемся на неблакіруючым варыянце з трох аперацый. Амаль тры... Як гэта зрабіць найбольш эфектыўна?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Постабпрацоўка імпарту

У тым жа самым КЛАДРы ўсе якія змяніліся запісы неабходна дадаткова прагнаць праз постапрацоўку - нармалізаваць, вылучыць ключавыя словы, прывесці да патрэбных структур. Але як даведацца што менавіта змянялася, не ўскладняючы пры гэтым код сінхранізацыі, у ідэале, увогуле не чапаючы яго?

Калі доступ на запіс у момант сінхранізацыі ёсць толькі ў вашага працэсу, то можна скарыстацца трыгерам, які збярэ для нас усе змены:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Цяпер мы можам перад пачаткам сінхранізацыі трыгеры накласці (або ўключыць праз ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

А потым спакойна з log-табліц здабываем усе патрэбныя нам змены і праганяем па дадатковых апрацоўшчыкам.

3.3. Імпарт звязаных набораў

Вышэй мы разглядалі выпадкі, калі структуры дадзеных крыніцы і прымача супадаюць. Але што рабіць, калі выгрузка са знешняй сістэмы мае фармат выдатны ад структуры захоўвання ў нас у базе?

Возьмем у якасці прыкладу захоўванне кліентаў і рахункаў па іх, класічны варыянт "многія-да-аднаму":

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

А вось выгрузка з вонкавай крыніцы прыходзіць нам у выглядзе «ўсё ў адным»:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Відавочна, што дадзеныя па кліентах могуць дублявацца ў такім варыянце, а асноўным запісам з'яўляецца «кошт»:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Для мадэлі проста ўставім нашы тэставыя дадзеныя, але памятаем. COPY больш эфектыўна!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Спачатку вылучым тыя "разрэзы", на якія нашы "факты" спасылаюцца. У нашым выпадку рахункі спасылаюцца на кліентаў:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Каб рахункі правільна звязаць з ID кліентаў, нам гэтыя ідэнтыфікатары трэба спачатку пазнаць або згенераваць. Дадамо пад іх палі:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Скарыстаемся апісаным вышэй спосабам сінхранізацыі табліц з невялікай папраўкай - не будзем нічога абнаўляць і выдаляць у мэтавай табліцы, бо імпарт кліентаў у нас "append-only":

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Уласна, усё - у invoice_import зараз у нас запоўнена поле сувязі client_id, З якім мы і ўставім рахунак.

Крыніца: habr.com

Дадаць каментар