DBA: organisera synkroniseringar och importer på ett kompetent sätt

För komplex bearbetning av stora datamängder (olika ETL-processer: importer, konverteringar och synkronisering med en extern källa) ofta finns det ett behov tillfälligt "kom ihåg" och omedelbart snabbt bearbeta något omfattande.

En typisk uppgift av det här slaget låter vanligtvis ungefär så här: "Precis här redovisningsavdelningen lossas från kundbanken de senast mottagna betalningarna måste du snabbt ladda upp dem till webbplatsen och länka dem till dina konton"

Men när volymen av detta "något" börjar mäta sig i hundratals megabyte, och tjänsten måste fortsätta att arbeta med databasen 24x7, uppstår många biverkningar som kommer att förstöra ditt liv.
DBA: organisera synkroniseringar och importer på ett kompetent sätt
För att hantera dem i PostgreSQL (och inte bara i det), kan du använda några optimeringar som gör att du kan bearbeta allt snabbare och med mindre resursförbrukning.

1. Vart ska man skicka?

Låt oss först bestämma var vi kan ladda upp data som vi vill "behandla".

1.1. Tillfälliga tabeller (TEMPORARY TABLE)

I princip är temporära tabeller för PostgreSQL desamma som alla andra. Därför vidskepelse som "Allt där lagras bara i minnet, och det kan ta slut". Men det finns också flera betydande skillnader.

Ditt eget "namnområde" för varje anslutning till databasen

Om två anslutningar försöker ansluta samtidigt CREATE TABLE x, då kommer någon definitivt att få icke-unika fel databasobjekt.

Men om båda försöker utföra CREATE TEMPORARY TABLE x, då kommer båda att göra det normalt, och alla kommer att få ditt exemplar tabeller. Och det kommer inte att finnas något gemensamt mellan dem.

"Självförstörande" vid frånkoppling

När anslutningen stängs raderas alla temporära tabeller automatiskt, alltså manuellt DROP TABLE x det är ingen mening förutom...

Om du jobbar igenom pgbouncer i transaktionsläge, då fortsätter databasen att tro att denna anslutning fortfarande är aktiv, och i den finns den här temporära tabellen fortfarande.

Därför kommer det att resultera i ett fel om du försöker skapa den igen, från en annan anslutning till pgbouncer. Men detta kan kringgås genom att använda CREATE TEMPORARY TABLE IF NOT EXISTS x.

Det är sant att det är bättre att inte göra detta ändå, för då kan du "plötsligt" hitta data som finns kvar från den "föregående ägaren". Istället är det mycket bättre att läsa manualen och se att när man skapar en tabell är det möjligt att lägga till ON COMMIT DROP - det vill säga när transaktionen är klar kommer tabellen automatiskt att raderas.

Icke-replikering

Eftersom de bara tillhör en specifik anslutning replikeras inte temporära tabeller. Men detta eliminerar behovet av dubbelregistrering av data i heap + WAL, så INSERT/UPDATE/DELETE i den är betydligt snabbare.

Men eftersom en temporär tabell fortfarande är en "nästan vanlig" tabell, kan den inte heller skapas på en replik. Åtminstone för nu, även om motsvarande plåster har cirkulerat länge.

1.2. OLOGGAD BORD

Men vad ska du göra, till exempel om du har någon form av krånglig ETL-process som inte kan implementeras inom en transaktion, men du fortfarande har pgbouncer i transaktionsläge? ..

Eller så är dataflödet så stort att Det finns inte tillräckligt med bandbredd på en anslutning från en databas (läs, en process per CPU)?..

Eller så pågår vissa operationer asynkront i olika sammanhang?...

Det finns bara ett alternativ här - skapa tillfälligt en icke-tillfällig tabell. Ordlek, ja. Det är:

  • skapade "mina egna" tabeller med maximalt slumpmässiga namn för att inte korsa någon
  • Utdrag: fyllde dem med data från en extern källa
  • Förvandla: konverterat, ifyllt viktiga länkningsfält
  • Ladda: hällde färdiga data i måltabeller
  • raderade "mina" tabeller

Och nu - en fluga i glädjen. Faktiskt, alla skrivningar i PostgreSQL händer två gånger - först i WAL, sedan in i tabellen/indexkropparna. Allt detta görs för att stödja ACID och korrekt datasynlighet mellan COMMIT'nötlig och ROLLBACK'nolltransaktioner.

Men vi behöver inte detta! Vi har hela processen Antingen var det helt lyckat eller så var det inte.. Det spelar ingen roll hur många mellanliggande transaktioner det blir - vi är inte intresserade av att "fortsätta processen från mitten", särskilt när det inte är klart var det var.

För att göra detta introducerade PostgreSQL-utvecklarna, tillbaka i version 9.1, en sådan sak som OLOGGADE tabeller:

Med denna indikation skapas tabellen som ologgad. Data som skrivs till ologgade tabeller går inte igenom framskrivningsloggen (se kapitel 29), vilket gör att sådana tabeller arbeta mycket snabbare än vanligt. Men de är inte immuna mot misslyckande; i händelse av serverfel eller nödavstängning, en ologgad tabell automatiskt trunkerad. Dessutom innehållet i den ologgade tabellen inte replikeras till slavservrar. Alla index som skapas på en ologgad tabell blir automatiskt avloggad.

Kort sagt, det kommer att gå mycket snabbare, men om databasservern "faller" blir det obehagligt. Men hur ofta händer detta, och vet din ETL-process hur man korrigerar detta korrekt "från mitten" efter att ha "vitaliserat" databasen?

Om inte, och fallet ovan liknar ditt, använd UNLOGGEDmen aldrig aktivera inte detta attribut på riktiga tabeller, vars data ligger dig varmt om hjärtat.

1.3. ON COMMIT { DELETE ROWS | SLÄPPA}

Denna konstruktion låter dig specificera automatiskt beteende när en transaktion är slutförd när du skapar en tabell.

Про ON COMMIT DROP Jag skrev redan ovan, det genererar DROP TABLE, men med ON COMMIT DELETE ROWS situationen är mer intressant - den genereras här TRUNCATE TABLE.

Eftersom hela infrastrukturen för att lagra metabeskrivningen för en temporär tabell är exakt densamma som den för en vanlig tabell, Konstant skapande och radering av tillfälliga tabeller leder till allvarlig "svällning" av systemtabeller pg_class, pg_attribute, pg_attrdef, pg_depend,...

Föreställ dig nu att du har en arbetare på en direktanslutning till databasen, som öppnar en ny transaktion varje sekund, skapar, fyller, bearbetar och tar bort en temporär tabell... Det kommer att finnas ett överskott av skräp samlat i systemtabellerna, och detta kommer att orsaka extra bromsar för varje operation.

I allmänhet, gör inte detta! I det här fallet är det mycket mer effektivt CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS ta den ur transaktionscykeln - sedan är tabellerna redan i början av varje ny transaktion kommer att finnas (spara ett samtal CREATE), Men kommer att vara tom, tack vare TRUNCATE (vi sparade också samtalet) när vi slutförde den föregående transaktionen.

1.4. GILLA...INKLUSIVE...

Jag nämnde i början att ett av de typiska användningsfallen för temporära tabeller är olika typer av import - och utvecklaren kopierar och klistrar trött in listan med fält i måltabellen i deklarationen av sin tillfälliga...

Men lathet är motorn för framsteg! Det är därför skapa en ny tabell "baserat på exempel" det kan vara mycket enklare:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Eftersom du sedan kan generera mycket data i den här tabellen kommer det aldrig att gå snabbt att söka igenom den. Men det finns en traditionell lösning på detta - index! Och ja, en temporär tabell kan också ha index.

Eftersom de nödvändiga indexen ofta sammanfaller med måltabellens index kan du helt enkelt skriva LIKE target_table INCLUDING INDEXES.

Om du också behöver DEFAULT-värden (till exempel för att fylla i de primära nyckelvärdena), kan du använda LIKE target_table INCLUDING DEFAULTS. Eller bara - LIKE target_table INCLUDING ALL — kopierar standardinställningar, index, begränsningar,...

Men här måste du förstå att om du skapade importera tabell omedelbart med index, då tar data längre tid att laddaän om du först fyller upp allt, och först sedan rullar upp indexen - titta på hur den gör detta som ett exempel pg_dump.

I allmänhet, RTFM!

2. Hur skriver man?

Låt mig bara säga - använd den COPY-flöde istället för "packa" INSERT, acceleration ibland. Du kan till och med direkt från en förgenererad fil.

3. Hur bearbetar man?

Så låt oss låta vårt intro se ut ungefär så här:

  • du har en tabell med klientdata lagrade i din databas 1 miljon rekord
  • varje dag en kund skickar en ny till dig fullständig "bild"
  • av erfarenhet vet du det då och då inte mer än 10 XNUMX poster ändras

Ett klassiskt exempel på en sådan situation är KLADR bas — Det finns många adresser totalt, men i varje veckouppladdning sker det väldigt få ändringar (byte på bosättningar, kombinera gator, utseende på nya hus) även i nationell skala.

3.1. Fullständig synkroniseringsalgoritm

För enkelhetens skull, låt oss säga att du inte ens behöver omstrukturera data - bara ta tabellen i önskad form, det vill säga:

  • ta bort allt som inte finns längre
  • uppdatering allt som redan fanns och behöver uppdateras
  • infoga allt som inte har hänt än

Varför ska operationerna göras i denna ordning? Eftersom det är så här bordsstorleken kommer att växa minimalt (kom ihåg MVCC!).

DELETE FRÅN dst

Nej, självklart klarar du dig med bara två operationer:

  • ta bort (DELETE) allt i allmänhet
  • infoga allt från den nya bilden

Men samtidigt, tack vare MVCC, Storleken på bordet kommer att öka exakt två gånger! Att få +1M bilder av poster i tabellen på grund av en 10K-uppdatering är sådär redundans...

TRUNCATE dst

En mer erfaren utvecklare vet att hela surfplattan kan rengöras ganska billigt:

  • ren (TRUNCATE) hela bordet
  • infoga allt från den nya bilden

Metoden är effektiv, ibland ganska tillämplig, men det finns ett problem... Vi kommer att lägga till 1M-poster under en lång tid, så vi har inte råd att lämna bordet tomt under hela denna tid (vilket kommer att hända utan att paketera det i en enda transaktion).

Som betyder:

  • vi börjar långvarig transaktion
  • TRUNCATE ålägger Exklusiv tillgång-blockering
  • vi gör insättningen under en lång tid, och alla andra vid den här tiden kan inte ens SELECT

Något går inte bra...

ÄNDRA TABELL... BYT NAMN... / SLIPP TABELL...

Ett alternativ är att fylla allt i en separat ny tabell och sedan helt enkelt byta namn på den i stället för den gamla. Ett par otäcka småsaker:

  • fortfarande också Exklusiv tillgång, men betydligt kortare tid
  • alla frågeplaner/statistik för denna tabell återställs, behöver köra ANALYSE
  • alla främmande nycklar är trasiga (FK) till bordet

Det fanns en WIP-patch från Simon Riggs som föreslog att göra ALTER-en operation för att ersätta tabellkroppen på filnivå, utan att röra statistik och FK, men samlade inte in beslutförhet.

DELETE, UPPDATERA, INFOGA

Så vi nöjer oss med det icke-blockerande alternativet med tre operationer. Nästan tre... Hur gör man detta mest effektivt?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Importera efterbearbetning

I samma KLADR måste alla ändrade poster dessutom köras genom efterbearbetning - normaliseras, nyckelord markeras och reduceras till de strukturer som krävs. Men hur vet du - exakt vad som förändradesutan att komplicera synkroniseringskoden, helst utan att röra den alls?

Om bara din process har skrivåtkomst vid tidpunkten för synkroniseringen kan du använda en trigger som samlar in alla ändringar åt oss:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Nu kan vi tillämpa triggers innan vi startar synkronisering (eller aktivera dem via ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

Och sedan extraherar vi lugnt alla ändringar vi behöver från loggtabellerna och kör dem genom ytterligare hanterare.

3.3. Importera länkade uppsättningar

Ovan övervägde vi fall när datastrukturerna för källan och destinationen är desamma. Men vad händer om uppladdningen från ett externt system har ett format som skiljer sig från lagringsstrukturen i vår databas?

Låt oss ta som exempel lagringen av kunder och deras konton, det klassiska alternativet "många-till-en":

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Men nedladdningen från en extern källa kommer till oss i form av "allt i ett":

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Uppenbarligen kan kunddata dupliceras i den här versionen, och huvudposten är "konto":

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

För modellen infogar vi helt enkelt våra testdata, men kom ihåg - COPY mer effektiv!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Låt oss först lyfta fram de "nedskärningar" som våra "fakta" hänvisar till. I vårt fall avser fakturor kunder:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

För att korrekt associera konton med kund-ID:n måste vi först ta reda på eller generera dessa identifierare. Låt oss lägga till fält under dem:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Låt oss använda tabellsynkroniseringsmetoden som beskrivs ovan med ett litet tillägg - vi kommer inte att uppdatera eller ta bort något i måltabellen, eftersom vi importerar klienter "endast lägga till":

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Egentligen är allt inne invoice_import Nu har vi kontaktfältet ifyllt client_id, med vilken vi kommer att infoga fakturan.

Källa: will.com

Lägg en kommentar