DBA: kompetentně organizujte synchronizace a importy

Pro komplexní zpracování velkých souborů dat (různé ETL procesy: importy, konverze a synchronizace s externím zdrojem) je často potřeba dočasně „zapamatovat“ a okamžitě rychle zpracovat něco objemného.

Typický úkol tohoto druhu obvykle zní nějak takto: "Právě tady účetní oddělení vyloženo z banky klienta poslední přijaté platby, musíte je rychle nahrát na web a propojit je se svými účty“

Když se ale objem tohoto „něčeho“ začne měřit ve stovkách megabajtů a služba musí pokračovat v práci s databází 24x7, objeví se mnoho vedlejších efektů, které vám zničí život.
DBA: kompetentně organizujte synchronizace a importy
Abyste si s nimi v PostgreSQL (a nejen v něm) poradili, můžete využít některé optimalizace, které vám umožní zpracovat vše rychleji a s menší spotřebou zdrojů.

1. Kam odeslat?

Nejprve se rozhodneme, kam můžeme nahrát data, která chceme „zpracovat“.

1.1. Dočasné stoly (TEMPORARY TABLE)

V zásadě jsou dočasné tabulky pro PostgreSQL stejné jako jakékoli jiné. Proto pověry jako "Všechno tam je uloženo pouze v paměti a může to skončit". Existuje ale také několik podstatných rozdílů.

Váš vlastní „namespace“ pro každé připojení k databázi

Pokud se dvě připojení pokusí připojit současně CREATE TABLE x, pak se určitě někdo dostane chyba nejedinečnosti databázové objekty.

Ale pokud se oba pokusí provést CREATE TEMPORARY TABLE x, pak to oba udělají normálně a všichni to dostanou vaši kopii tabulky. A nebude mezi nimi nic společného.

"Sebedestrukce" při odpojení

Po ukončení připojení se všechny dočasné tabulky automaticky smažou, takže ručně DROP TABLE x nemá to smysl kromě...

Pokud pracujete přes pgbouncer v transakčním režimu, pak databáze nadále věří, že toto připojení je stále aktivní a v něm tato dočasná tabulka stále existuje.

Proto pokus o vytvoření znovu, z jiného připojení k pgbouncer, bude mít za následek chybu. To se ale dá obejít používáním CREATE TEMPORARY TABLE IF NOT EXISTS x.

Je pravda, že je lepší to stejně nedělat, protože pak tam můžete „najednou“ najít data zbývající od „předchozího vlastníka“. Místo toho je mnohem lepší přečíst si manuál a vidět, že při vytváření tabulky je možné přidat ON COMMIT DROP - to znamená, že po dokončení transakce bude tabulka automaticky smazána.

Nereplikace

Protože patří pouze ke konkrétnímu připojení, dočasné tabulky se nereplikují. Ale to eliminuje potřebu dvojitého záznamu dat v haldě + WAL, takže INSERT/UPDATE/DELETE do něj je mnohem rychlejší.

Ale protože dočasná tabulka je stále „téměř obyčejná“ tabulka, nelze ji vytvořit ani na replice. Alespoň prozatím, i když odpovídající patch koluje již delší dobu.

1.2. NEPŘIHLÁŠENÁ TABULKA

Co ale dělat, když například máte nějaký těžkopádný ETL proces, který nelze implementovat v rámci jedné transakce, ale přesto máte pgbouncer v transakčním režimu? ..

Nebo je datový tok tak velký, že Na jednom připojení není dostatečná šířka pásma z databáze (čtení, jeden proces na CPU)?..

Nebo probíhají nějaké operace asynchronně v různých spojeních?...

Tady je jen jedna možnost - dočasně vytvořit nedočasnou tabulku. Hříčka, ano. to je:

  • vytvořil „vlastní“ tabulky s maximálně náhodnými názvy, aby se s nikým nekřížily
  • Výpis: naplnil je daty z externího zdroje
  • Změnit: převedeno, vyplněná klíčová propojovací pole
  • Zatížení: nalije připravená data do cílových tabulek
  • smazal „moje“ tabulky

A teď - moucha v háji. Ve skutečnosti, všechny zápisy v PostgreSQL se dějí dvakrát - první ve WALa poté do těl tabulky/indexu. To vše je děláno pro podporu ACID a správnou viditelnost dat mezi nimi COMMIT'ořechový a ROLLBACK'nulové transakce.

Ale tohle my nepotřebujeme! Máme celý proces Buď se to úplně povedlo, nebo ne.. Nezáleží na tom, kolik mezitransakcí bude – nemáme zájem o „pokračování procesu od středu“, zvláště když není jasné, kde to bylo.

Za tímto účelem vývojáři PostgreSQL ve verzi 9.1 představili něco jako UNLOGGED tabulky:

S tímto označením je tabulka vytvořena jako nepřihlášená. Data zapsaná do nezaprotokolovaných tabulek neprocházejí protokolem zápisu napřed (viz kapitola 29), což způsobuje, že pracovat mnohem rychleji než obvykle. Nejsou však imunní vůči selhání; v případě selhání serveru nebo nouzového vypnutí odhlášená tabulka automaticky zkrácena. Navíc obsah nepřihlášené tabulky nereplikované na podřízené servery. Všechny indexy vytvořené v nepřihlášené tabulce se automaticky odhlásí.

Stručně řečeno, bude to mnohem rychlejší, ale pokud databázový server „spadne“, bude to nepříjemné. Ale jak často se to stává a ví váš ETL proces, jak to správně opravit „od středu“ po „revitalizaci“ databáze?...

Pokud ne a výše uvedený případ je podobný vašemu, použijte UNLOGGEDale nikdy nepovolujte tento atribut na skutečných tabulkách, z nichž jsou vám údaje drahé.

1.3. ON COMMIT { DELETE ROWS | POKLES}

Tato konstrukce umožňuje určit automatické chování po dokončení transakce při vytváření tabulky.

O ON COMMIT DROP Už jsem psal výše, generuje DROP TABLE, ale s ON COMMIT DELETE ROWS situace je zajímavější - generuje se zde TRUNCATE TABLE.

Protože celá infrastruktura pro ukládání meta-popisu dočasné tabulky je přesně stejná jako u běžné tabulky, pak Neustálé vytváření a mazání dočasných tabulek vede k vážnému „nabobtnání“ systémových tabulek pg_class, pg_attribute, pg_attrdef, pg_depend,…

Nyní si představte, že máte pracovníka na přímém připojení k databázi, který každou sekundu otevře novou transakci, vytvoří, vyplní, zpracuje a odstraní dočasnou tabulku... V systémových tabulkách se bude hromadit přebytek odpadu a to způsobí další brzdy pro každou operaci.

Obecně to nedělejte! V tomto případě je to mnohem efektivnější CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS vyřaďte to z transakčního cyklu - pak na začátku každé nové transakce už jsou tabulky bude existovat (uložit hovor CREATE), ale bude prázdný, díky TRUNCATE (jeho volání jsme také uložili) při dokončení předchozí transakce.

1.4. JAKO...VČETNĚ...

Na začátku jsem zmínil, že jedním z typických případů použití pro dočasné tabulky jsou různé druhy importů - a vývojář unaveně kopíruje a vkládá seznam polí cílové tabulky do deklarace své dočasné...

Ale lenost je motorem pokroku! Proto vytvořit novou tabulku „na základě vzorku“ může to být mnohem jednodušší:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Vzhledem k tomu, že pak můžete do této tabulky vygenerovat spoustu dat, nebude prohledávání v ní nikdy rychlé. Na to ale existuje tradiční řešení – indexy! A ano, dočasná tabulka může mít také indexy.

Protože se požadované indexy často shodují s indexy cílové tabulky, můžete jednoduše zapisovat LIKE target_table INCLUDING INDEXES.

Pokud také potřebujete DEFAULT-hodnoty (například k vyplnění hodnot primárního klíče), můžete použít LIKE target_table INCLUDING DEFAULTS. Nebo prostě - LIKE target_table INCLUDING ALL — kopíruje výchozí hodnoty, indexy, omezení,...

Ale tady musíte pochopit, že pokud jste vytvořili importujte tabulku okamžitě s indexy, pak bude načítání dat trvat délenež když nejprve vše zaplníte a teprve potom srolujete indexy - podívejte se, jak to dělá jako příklad pg_dump.

Obecně lze říci, RTFM!

2. Jak psát?

Dovolte mi říct - použijte to COPY- flow místo "pack" INSERT, občas zrychlení. Můžete dokonce přímo z předem vygenerovaného souboru.

3. Jak zpracovat?

Nechme tedy naše intro vypadat nějak takto:

  • máte ve své databázi uloženou tabulku s klientskými daty 1 milion záznamů
  • každý den vám klient pošle nový úplný "obrázek"
  • ze zkušenosti to čas od času víte nezmění se více než 10 XNUMX záznamů

Klasickým příkladem takové situace je základna KLADR — adres je celkem hodně, ale v každém týdenním uploadu je jen velmi málo změn (přejmenování sídel, kombinování ulic, vzhled nových domů) i v celostátním měřítku.

3.1. Plný synchronizační algoritmus

Pro zjednodušení řekněme, že ani nepotřebujete restrukturalizovat data – stačí převést tabulku do požadované podoby, tedy:

  • odstranit všechno, co už neexistuje
  • obnovit vše, co již existuje a je třeba aktualizovat
  • vložit všechno, co se ještě nestalo

Proč by měly být operace prováděny v tomto pořadí? Protože takto se velikost tabulky zvětší minimálně (pamatujte na MVCC!).

VYMAZAT Z dst

Ne, samozřejmě si vystačíte pouze se dvěma operacemi:

  • odstranit (DELETE) vše obecně
  • vložit vše z nového obrázku

Ale zároveň, díky MVCC, Velikost stolu se zvětší přesně dvakrát! Získání +1 milionu obrázků záznamů v tabulce díky aktualizaci 10 XNUMX je tak nadbytečné...

TRUNCATE dst

Zkušenější vývojář ví, že celý tablet lze vyčistit poměrně levně:

  • jasné (TRUNCATE) celou tabulku
  • vložit vše z nového obrázku

Metoda je účinná, někdy docela použitelné, ale je tu problém... Dlouhou dobu budeme přidávat 1M záznamů, takže si nemůžeme dovolit nechat tabulku po celou dobu prázdnou (jak se to stane bez zabalení do jediné transakce).

Což znamená:

  • začínáme dlouhotrvající transakce
  • TRUNCATE ukládá Exkluzivní přístup-blokování
  • vkládání děláme dlouho a všichni ostatní v tuto chvíli ani nemůže SELECT

Něco se nedaří...

ALTER TABLE… PŘEJMENOVAT… / DROP TABLE…

Alternativou je vyplnit vše do samostatné nové tabulky a poté ji jednoduše přejmenovat na místo staré. Pár nepříjemných maličkostí:

  • stále taky Exkluzivní přístup, i když podstatně kratší dobu
  • všechny plány dotazů/statistiky pro tuto tabulku jsou resetovány, je potřeba spustit ANALYZE
  • všechny cizí klíče jsou rozbité (FK) ke stolu

Byl tam patch WIP od Simona Riggse, který navrhoval vytvoření ALTER-operace, která nahradí tělo tabulky na úrovni souboru, aniž by se dotkla statistik a FK, ale neshromáždila kvorum.

SMAZAT, AKTUALIZOVAT, VLOŽIT

Takže jsme se rozhodli pro neblokovací možnost tří operací. Téměř tři... Jak to udělat nejúčinněji?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Importujte následné zpracování

Ve stejném KLADR je nutné všechny změněné záznamy dodatečně proběhnout následným zpracováním – normalizovat, zvýraznit klíčová slova a zredukovat na požadované struktury. Ale jak víš- co přesně se změnilobez zkomplikování synchronizačního kódu, ideálně bez dotyku?

Pokud má v době synchronizace přístup pro zápis pouze váš proces, můžete použít spouštěč, který za nás shromáždí všechny změny:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Nyní můžeme použít spouštěče před zahájením synchronizace (nebo je povolit pomocí ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

A pak v klidu vytáhneme všechny potřebné změny z log tabulek a spustíme je přes další handlery.

3.3. Import propojených sad

Výše jsme uvažovali o případech, kdy jsou datové struktury zdroje a cíle stejné. Co když ale upload z externího systému má formát odlišný od struktury úložiště v naší databázi?

Vezměme si jako příklad úložiště klientů a jejich účtů, klasickou možnost „mnoho na jednoho“:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Ale stahování z externího zdroje k nám přichází ve formě „vše v jednom“:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Je zřejmé, že zákaznická data lze v této verzi duplikovat a hlavním záznamem je „účet“:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Pro model jednoduše vložíme naše testovací data, ale pamatujte - COPY Efektivnější!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Nejprve vyzdvihněme ty „škrty“, ke kterým se naše „fakta“ vztahují. V našem případě se faktury týkají zákazníků:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Abychom mohli správně přiřadit účty k ID zákazníků, musíme tyto identifikátory nejprve zjistit nebo vygenerovat. Přidejme pod ně pole:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Použijme metodu synchronizace tabulky popsanou výše s malou úpravou – v cílové tabulce nebudeme nic aktualizovat ani mazat, protože importujeme klienty „pouze“:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Ve skutečnosti je všechno uvnitř invoice_import Nyní máme vyplněné kontaktní pole client_id, kterým vložíme fakturu.

Zdroj: www.habr.com

Přidat komentář