DBA: vakkundig synchronisaties en importen organiseren

Voor complexe verwerking van grote datasets (verschillende ETL-processen: importen, conversies en synchronisatie met een externe bron) is er vaak behoefte aan tijdelijk “onthouden” en onmiddellijk snel verwerken iets volumineus.

Een typische taak van deze soort klinkt meestal ongeveer zo: "Hier boekhoudafdeling gelost van de klantbank de laatst ontvangen betalingen moet u deze snel naar de website uploaden en aan uw accounts koppelen"

Maar wanneer het volume van dit ‘iets’ honderden megabytes begint te bedragen, en de dienst 24x7 met de database moet blijven werken, ontstaan ​​er veel bijwerkingen die je leven zullen ruïneren.
DBA: vakkundig synchronisaties en importen organiseren
Om ze in PostgreSQL (en niet alleen daarin) aan te pakken, kunt u enkele optimalisaties gebruiken waarmee u alles sneller en met minder hulpbronnenverbruik kunt verwerken.

1. Waar naartoe verzenden?

Laten we eerst beslissen waar we de gegevens kunnen uploaden die we willen 'verwerken'.

1.1. Tijdelijke tabellen (TIJDELIJKE TABEL)

In principe zijn tijdelijke tabellen voor PostgreSQL hetzelfde als alle andere. Daarom bijgeloof zoals “Alles daar wordt alleen in het geheugen opgeslagen, en het kan eindigen”. Maar er zijn ook een aantal significante verschillen.

Uw eigen “naamruimte” voor elke verbinding met de database

Als twee verbindingen tegelijkertijd proberen verbinding te maken CREATE TABLE x, dan zal iemand het zeker krijgen niet-uniciteitsfout database-objecten.

Maar als beide proberen uit te voeren CREATE TEMPORARY TABLE x, dan zullen beide het normaal doen, en iedereen zal het krijgen jouw exemplaar tafels. En er zal niets gemeenschappelijks tussen hen zijn.

"Zelfvernietiging" bij het verbreken van de verbinding

Wanneer de verbinding verbroken wordt, worden alle tijdelijke tabellen automatisch verwijderd, dus handmatig DROP TABLE x het heeft geen zin behalve...

Als je doorwerkt pgbouncer in transactiemodus, dan blijft de database geloven dat deze verbinding nog steeds actief is, en daarin bestaat deze tijdelijke tabel nog steeds.

Daarom zal een poging om het opnieuw te maken, vanuit een andere verbinding met pgbouncer, resulteren in een fout. Maar dit kan worden omzeild door gebruik te maken van CREATE TEMPORARY TABLE IF NOT EXISTS x.

Toegegeven, het is beter om dit toch niet te doen, want dan kun je daar “plotseling” de gegevens vinden die nog over zijn van de “vorige eigenaar”. In plaats daarvan is het veel beter om de handleiding te lezen en te zien dat het mogelijk is om bij het maken van een tabel iets toe te voegen ON COMMIT DROP - dat wil zeggen dat wanneer de transactie is voltooid, de tabel automatisch wordt verwijderd.

Niet-replicatie

Omdat ze alleen bij een specifieke verbinding horen, worden tijdelijke tabellen niet gerepliceerd. Maar dit elimineert de noodzaak van dubbele registratie van gegevens in heap + WAL, dus INSERT/UPDATE/DELETE erin is aanzienlijk sneller.

Maar aangezien een tijdelijke tabel nog steeds een ‘bijna gewone’ tabel is, kan deze ook niet op een replica worden gemaakt. In ieder geval voorlopig, hoewel de bijbehorende patch al lang in omloop is.

1.2. ONGELOGDE TABEL

Maar wat moet u bijvoorbeeld doen als u een omslachtig ETL-proces heeft dat niet binnen één transactie kan worden geïmplementeerd, maar u toch een pgbouncer in transactiemodus? ..

Of de datastroom is zo groot dat Er is niet genoeg bandbreedte op één verbinding vanuit een database (lees, één proces per CPU)?..

Of er zijn operaties aan de gang asynchroon in verschillende verbindingen?..

Er is hier maar één optie: maak tijdelijk een niet-tijdelijke tabel aan. Woordspeling, ja. Dat is:

  • creëerde “mijn eigen” tabellen met maximaal willekeurige namen om met niemand te kruisen
  • Extract: vulde ze met gegevens uit een externe bron
  • Transformeren: geconverteerd, belangrijke koppelingsvelden ingevuld
  • Laden: kant-en-klare gegevens in doeltabellen gegoten
  • verwijderde “mijn” tabellen

En nu - een vlieg in de zalf. In werkelijkheid, alle schrijfbewerkingen in PostgreSQL gebeuren tweemaal - eerst in WALen vervolgens in de tabel/indexlichamen. Dit alles wordt gedaan om ACID te ondersteunen en de zichtbaarheid van gegevens tussen COMMIT'nootachtig en ROLLBACK'nultransacties.

Maar dit hebben wij niet nodig! Wij hebben het hele proces Ofwel was het volledig succesvol, ofwel niet.. Het maakt niet uit hoeveel tussentijdse transacties er zullen zijn - we zijn niet geïnteresseerd in “het voortzetten van het proces vanuit het midden”, vooral als het niet duidelijk is waar het was.

Om dit te doen, introduceerden de PostgreSQL-ontwikkelaars in versie 9.1 zoiets als NIET-GELOGDE tabellen:

Met deze indicatie wordt de tabel aangemaakt als niet-gelogd. Gegevens die naar niet-gelogde tabellen worden geschreven, gaan niet door het vooruitschrijflogboek (zie hoofdstuk 29), waardoor dergelijke tabellen werk veel sneller dan normaal. Ze zijn echter niet immuun voor mislukkingen; in geval van serverstoring of noodstop, een niet-gelogde tabel automatisch ingekort. Bovendien de inhoud van de niet-ingelogde tabel niet gerepliceerd naar slave-servers. Alle indexen die in een niet-gelogde tabel zijn gemaakt, worden automatisch ontlogd.

Kortom, het zal veel sneller zijn, maar als de databaseserver “valt”, zal dat onaangenaam zijn. Maar hoe vaak gebeurt dit, en weet uw ETL-proces dit “vanuit het midden” correct te corrigeren na het “revitaliseren” van de database?

Als dit niet het geval is, en het bovenstaande geval lijkt op het uwe, gebruik dan UNLOGGEDmaar nooit schakel dit kenmerk niet in voor echte tabellen, waarvan de gegevens u dierbaar zijn.

1.3. ON COMMIT { VERWIJDER RIJEN | DRUPPEL}

Met deze constructie kunt u automatisch gedrag opgeven wanneer een transactie is voltooid bij het maken van een tabel.

Про ON COMMIT DROP Ik schreef hierboven al, het genereert DROP TABLE, maar met ON COMMIT DELETE ROWS de situatie is interessanter: deze wordt hier gegenereerd TRUNCATE TABLE.

Omdat de gehele infrastructuur voor het opslaan van de metabeschrijving van een tijdelijke tabel precies dezelfde is als die van een gewone tabel, Het voortdurend aanmaken en verwijderen van tijdelijke tabellen leidt tot een ernstige “uitbreiding” van systeemtabellen pg_class, pg_attribute, pg_attrdef, pg_depend,…

Stel je nu voor dat je een werker hebt die een directe verbinding heeft met de database, die elke seconde een nieuwe transactie opent, een tijdelijke tabel aanmaakt, vult, verwerkt en verwijdert... Er zal zich een overmaat aan afval ophopen in de systeemtabellen, en dit zal voor elke handeling extra remmen veroorzaken.

Doe dit in het algemeen niet! In dit geval is het veel effectiever CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS haal het uit de transactiecyclus - en aan het begin van elke nieuwe transactie zijn de tabellen al aanwezig zal bestaan (een oproep opslaan CREATE), Maar zal leeg zijn, dankzij TRUNCATE (we hebben ook de oproep opgeslagen) bij het voltooien van de vorige transactie.

1.4. ZOALS...INCLUSIEF...

Ik zei in het begin dat een van de typische gebruiksscenario's voor tijdelijke tabellen verschillende soorten import is - en de ontwikkelaar kopieert en plakt vermoeid de lijst met velden van de doeltabel in de declaratie van zijn tijdelijke ...

Maar luiheid is de motor van vooruitgang! Daarom maak een nieuwe tabel “gebaseerd op voorbeeld” het kan veel eenvoudiger:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Omdat u vervolgens veel gegevens in deze tabel kunt genereren, zal het doorzoeken ervan nooit snel gaan. Maar er is een traditionele oplossing hiervoor: indexen! En ja, een tijdelijke tabel kan ook indexen hebben.

Omdat de vereiste indexen vaak samenvallen met de indexen van de doeltabel, kunt u eenvoudigweg schrijven LIKE target_table INCLUDING INDEXES.

Als je dat ook nodig hebt DEFAULT-waarden (bijvoorbeeld om de primaire sleutelwaarden in te vullen), kunt u gebruiken LIKE target_table INCLUDING DEFAULTS. Of gewoon - LIKE target_table INCLUDING ALL — kopieert standaardwaarden, indexen, beperkingen,...

Maar hier moet je begrijpen dat als je het hebt gemaakt importeer de tabel onmiddellijk met indexen, dan duurt het laden van de gegevens langerdan wanneer je eerst alles invult, en dan pas de indexen oprolt - kijk eens hoe het dit doet als voorbeeld pg_dump.

In het algemeen RTFM!

2. Hoe schrijf je?

Laat ik het maar zeggen: gebruik het COPY-flow in plaats van "pack" INSERT, versnelling soms. U kunt zelfs rechtstreeks vanuit een vooraf gegenereerd bestand.

3. Hoe verwerken?

Laten we onze intro er ongeveer zo uitzien:

  • u heeft een tabel met klantgegevens opgeslagen in uw database 1M-records
  • elke dag stuurt een klant je een nieuwe volledige "afbeelding"
  • uit ervaring weet je dat af en toe er worden niet meer dan 10 records gewijzigd

Een klassiek voorbeeld van een dergelijke situatie is KLADR-basis — er zijn in totaal veel adressen, maar bij elke wekelijkse upload zijn er zeer weinig veranderingen (hernoemen van nederzettingen, combineren van straten, uiterlijk van nieuwe huizen), zelfs op nationale schaal.

3.1. Volledig synchronisatie-algoritme

Laten we voor de eenvoud zeggen dat u de gegevens niet eens hoeft te herstructureren; breng de tabel gewoon in de gewenste vorm, dat wil zeggen:

  • verwijderen alles wat niet meer bestaat
  • -update alles wat al bestond en moet worden bijgewerkt
  • invoegen alles wat nog niet is gebeurd

Waarom moeten de bewerkingen in deze volgorde worden uitgevoerd? Omdat dit de manier is waarop de tafelgrootte minimaal zal groeien (denk aan MVCC!).

VERWIJDEREN UIT dst

Nee, natuurlijk kun je rondkomen met slechts twee operaties:

  • verwijderen (DELETE) alles in het algemeen
  • invoegen allemaal van de nieuwe afbeelding

Maar tegelijkertijd, dankzij MVCC, De grootte van de tafel wordt precies twee keer groter! Het verkrijgen van +1 miljoen afbeeldingen van records in de tabel vanwege een 10K-update is matig redundantie...

AFKNOPEN dst

Een meer ervaren ontwikkelaar weet dat de hele tablet vrij goedkoop kan worden schoongemaakt:

  • schoon (TRUNCATE) de hele tafel
  • invoegen allemaal van de nieuwe afbeelding

De methode is effectief, soms heel toepasselijk, maar er is een probleem... We zullen voor een lange tijd 1 miljoen records toevoegen, dus we kunnen het ons niet veroorloven om de tabel al die tijd leeg te laten (zoals zal gebeuren zonder deze in één enkele transactie te verpakken).

Wat betekent:

  • wij beginnen langlopende transactie
  • TRUNCATE oplegt Exclusief toegang-blokkeren
  • wij doen het inbrengen al heel lang, en alle anderen op dit moment kan niet eens SELECT

Er gaat iets niet goed...

WIJZIG TAFEL... HERNOEM... / DROP TAFEL...

Een alternatief is om alles in een aparte nieuwe tabel te vullen en deze vervolgens eenvoudigweg te hernoemen in plaats van de oude. Een paar vervelende kleine dingetjes:

  • nog steeds ook Exclusief toegang, hoewel aanzienlijk minder tijd
  • alle queryplannen/statistieken voor deze tabel worden opnieuw ingesteld, moet ANALYSEREN worden uitgevoerd
  • alle externe sleutels zijn kapot (FK) aan tafel

Er was een WIP-patch van Simon Riggs die suggereerde om te maken ALTER-een bewerking om de tabeltekst op bestandsniveau te vervangen, zonder de statistieken en FK aan te raken, maar er werd geen quorum verzameld.

VERWIJDEREN, UPDATEN, INVOEGEN

We kiezen dus voor de niet-blokkerende optie van drie operaties. Bijna drie... Hoe kun je dit het meest effectief doen?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Nabewerking importeren

In dezelfde KLADR moeten alle gewijzigde records bovendien een nabewerking ondergaan - genormaliseerd, trefwoorden gemarkeerd en teruggebracht tot de vereiste structuren. Maar hoe weet je - wat is er precies veranderdzonder de synchronisatiecode ingewikkelder te maken, idealiter zonder deze überhaupt aan te raken?

Als alleen uw proces schrijftoegang heeft op het moment van synchronisatie, kunt u een trigger gebruiken die alle wijzigingen voor ons verzamelt:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Nu kunnen we triggers toepassen voordat we de synchronisatie starten (of inschakelen via ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

En dan extraheren we rustig alle wijzigingen die we nodig hebben uit de logtabellen en voeren ze door extra handlers.

3.3. Gekoppelde sets importeren

Hierboven hebben we gevallen overwogen waarin de datastructuren van de bron en de bestemming hetzelfde zijn. Maar wat als de upload vanaf een extern systeem een ​​ander formaat heeft dan de opslagstructuur in onze database?

Laten we als voorbeeld de opslag van klanten en hun accounts nemen, de klassieke ‘veel-op-één’-optie:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Maar de download van een externe bron komt naar ons toe in de vorm van “alles in één”:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Uiteraard kunnen klantgegevens in deze versie worden gedupliceerd en is het hoofdrecord ‘account’:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Voor het model voegen we eenvoudigweg onze testgegevens in, maar onthoud: COPY efficiënter!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Laten we eerst eens kijken naar de “bezuinigingen” waarnaar onze “feiten” verwijzen. In ons geval verwijzen facturen naar klanten:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Om accounts correct aan klant-ID's te kunnen koppelen, moeten we deze ID's eerst achterhalen of genereren. Laten we er velden onder toevoegen:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Laten we de hierboven beschreven tabelsynchronisatiemethode gebruiken met een kleine wijziging - we zullen niets in de doeltabel bijwerken of verwijderen, omdat we klanten "alleen toevoegen" importeren:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Eigenlijk is alles binnen invoice_import Nu hebben we het contactveld ingevuld client_id, waarmee wij de factuur invoegen.

Bron: www.habr.com

Voeg een reactie