DBA: Synchronisationen und Importe kompetent organisieren

Für die komplexe Verarbeitung großer Datenmengen (verschiedene ETL-Prozesse: Importe, Konvertierungen und Synchronisierung mit einer externen Quelle) besteht häufig ein Bedarf vorübergehend „erinnern“ und sofort schnell verarbeiten etwas Voluminöses.

Eine typische Aufgabe dieser Art hört sich meist etwa so an: "Genau hier Buchhaltungsabteilung von der Kundenbank entlastet Um die letzten eingegangenen Zahlungen anzuzeigen, müssen Sie diese schnell auf die Website hochladen und mit Ihren Konten verknüpfen.“

Aber wenn das Volumen dieses „Etwas“ Hunderte von Megabyte zu messen beginnt und der Dienst rund um die Uhr mit der Datenbank arbeiten muss, treten viele Nebenwirkungen auf, die Ihr Leben ruinieren.
DBA: Synchronisationen und Importe kompetent organisieren
Um damit in PostgreSQL (und nicht nur darin) umzugehen, können Sie einige Optimierungen verwenden, die es Ihnen ermöglichen, alles schneller und mit weniger Ressourcenverbrauch zu verarbeiten.

1. Wohin versenden?

Lassen Sie uns zunächst entscheiden, wo wir die Daten hochladen können, die wir „verarbeiten“ möchten.

1.1. Temporäre Tabellen (TEMPORARY TABLE)

Im Prinzip sind temporäre Tabellen für PostgreSQL die gleichen wie alle anderen. Daher mag Aberglaube „Alles dort ist nur im Gedächtnis gespeichert, und es kann enden“. Es gibt aber auch einige wesentliche Unterschiede.

Ihr eigener „Namespace“ für jede Verbindung zur Datenbank

Wenn zwei Verbindungen versuchen, gleichzeitig eine Verbindung herzustellen CREATE TABLE x, dann wird es bestimmt jemand bekommen Nicht-Eindeutigkeitsfehler Datenbankobjekte.

Aber wenn beide versuchen, auszuführen CREATE TEMPORARY TABLE x, dann werden es beide normal machen und jeder wird es bekommen deine Kopie Tische. Und es wird nichts gemeinsam zwischen ihnen geben.

„Selbstzerstörung“ beim Trennen der Verbindung

Beim Schließen der Verbindung werden alle temporären Tabellen automatisch gelöscht, also manuell DROP TABLE x Es hat keinen Sinn, außer...

Wenn Sie durcharbeiten pgbouncer im Transaktionsmodus, dann geht die Datenbank weiterhin davon aus, dass diese Verbindung noch aktiv ist und diese temporäre Tabelle darin noch vorhanden ist.

Daher führt der Versuch, es über eine andere Verbindung zu pgbouncer erneut zu erstellen, zu einem Fehler. Dies kann jedoch durch die Verwendung umgangen werden CREATE TEMPORARY TABLE IF NOT EXISTS x.

Allerdings ist es besser, dies nicht zu tun, denn dann findet man dort „plötzlich“ die vom „Vorbesitzer“ verbliebenen Daten. Stattdessen ist es viel besser, das Handbuch zu lesen und zu sehen, ob beim Erstellen einer Tabelle etwas hinzugefügt werden kann ON COMMIT DROP – Das heißt, wenn die Transaktion abgeschlossen ist, wird die Tabelle automatisch gelöscht.

Nicht-Replikation

Da sie nur zu einer bestimmten Verbindung gehören, werden temporäre Tabellen nicht repliziert. Aber Dadurch entfällt die Notwendigkeit einer doppelten Datenerfassung im Heap + WAL, daher ist INSERT/UPDATE/DELETE viel schneller.

Da es sich bei einer temporären Tabelle aber immer noch um eine „fast gewöhnliche“ Tabelle handelt, kann sie auch nicht auf einem Replikat erstellt werden. Zumindest vorerst, obwohl der entsprechende Patch schon länger im Umlauf ist.

1.2. UNLOGGIERTER TABELLE

Aber was sollten Sie beispielsweise tun, wenn Sie einen umständlichen ETL-Prozess haben, der nicht innerhalb einer Transaktion implementiert werden kann, Sie es aber trotzdem haben? pgbouncer im Transaktionsmodus? ..

Oder der Datenfluss ist so groß, dass Auf einer Verbindung ist nicht genügend Bandbreite vorhanden aus einer Datenbank (lesen, ein Prozess pro CPU)?..

Oder es laufen gerade Operationen asynchron in verschiedenen Zusammenhängen?..

Hier gibt es nur eine Möglichkeit - Erstellen Sie vorübergehend eine nicht temporäre Tabelle. Wortspiel, ja. Also:

  • Ich habe „meine eigenen“ Tabellen mit möglichst zufälligen Namen erstellt, um mich mit niemandem zu überschneiden
  • Extrahieren: Sie wurden mit Daten aus einer externen Quelle gefüllt
  • Transformieren: konvertiert, wichtige Verknüpfungsfelder ausgefüllt
  • Laden Sie: Fertige Daten in Zieltabellen gegossen
  • „meine“ Tabellen gelöscht

Und jetzt – ein Wermutstropfen. Tatsächlich, Alle Schreibvorgänge in PostgreSQL erfolgen zweimal - zuerst in WAL, dann in die Tabellen-/Indexkörper. All dies geschieht zur Unterstützung von ACID und zur korrekten Datensichtbarkeit zwischen COMMIT'nussig und ROLLBACK'Null-Transaktionen.

Aber das brauchen wir nicht! Wir haben den gesamten Prozess Entweder war es völlig erfolgreich oder nicht.. Es spielt keine Rolle, wie viele Zwischentransaktionen es geben wird – wir sind nicht daran interessiert, „den Prozess von der Mitte aus fortzusetzen“, insbesondere wenn nicht klar ist, wo er war.

Zu diesem Zweck haben die PostgreSQL-Entwickler bereits in Version 9.1 so etwas eingeführt wie UNLOGGED-Tabellen:

Mit dieser Angabe wird die Tabelle als nicht protokolliert erstellt. Daten, die in nicht protokollierte Tabellen geschrieben werden, durchlaufen nicht das Write-Ahead-Protokoll (siehe Kapitel 29), was dazu führt, dass solche Tabellen dies tun viel schneller arbeiten als sonst. Allerdings sind sie nicht vor Misserfolgen gefeit; im Falle eines Serverausfalls oder einer Notabschaltung eine nicht protokollierte Tabelle automatisch abgeschnitten. Zusätzlich der Inhalt der nicht protokollierten Tabelle nicht repliziert zu Slave-Servern. Alle für eine nicht protokollierte Tabelle erstellten Indizes werden automatisch nicht protokolliert.

Kurz gesagt, es wird viel schneller gehen, aber wenn der Datenbankserver „ausfällt“, wird es unangenehm. Aber wie oft passiert das und weiß Ihr ETL-Prozess, wie er dies „von der Mitte“ nach der „Revitalisierung“ der Datenbank richtig korrigieren kann?

Wenn nicht und der obige Fall Ihrem ähnlich ist, verwenden Sie UNLOGGEDaber nie Aktivieren Sie dieses Attribut nicht für echte Tabellen, deren Daten Ihnen am Herzen liegen.

1.3. ON COMMIT { ZEILEN LÖSCHEN | FALLEN}

Mit diesem Konstrukt können Sie beim Erstellen einer Tabelle ein automatisches Verhalten festlegen, wenn eine Transaktion abgeschlossen ist.

Про ON COMMIT DROP Ich habe oben schon geschrieben, es generiert DROP TABLE, aber mit ON COMMIT DELETE ROWS Die Situation ist interessanter - sie wird hier erzeugt TRUNCATE TABLE.

Da die gesamte Infrastruktur zum Speichern der Metabeschreibung einer temporären Tabelle genau die gleiche ist wie die einer regulären Tabelle Das ständige Erstellen und Löschen temporärer Tabellen führt zu einem starken „Anschwellen“ der Systemtabellen pg_class, pg_attribute, pg_attrdef, pg_depend,…

Stellen Sie sich nun vor, dass Sie einen Worker haben, der eine direkte Verbindung zur Datenbank hat, der jede Sekunde eine neue Transaktion öffnet, eine temporäre Tabelle erstellt, füllt, verarbeitet und löscht... In den Systemtabellen wird sich ein Überschuss an Müll ansammeln Dies führt bei jedem Vorgang zu zusätzlichen Bremsen.

Tun Sie dies im Allgemeinen nicht! In diesem Fall ist es viel effektiver CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS Nehmen Sie es aus dem Transaktionszyklus heraus – dann sind die Tabellen zu Beginn jeder neuen Transaktion bereits vorhanden wird existieren (Speichern Sie einen Anruf CREATE) aber wird leer seinDanke TRUNCATE (Wir haben auch seinen Aufruf gespeichert) beim Abschluss der vorherigen Transaktion.

1.4. WIE...INKLUSIVE...

Ich habe am Anfang erwähnt, dass einer der typischen Anwendungsfälle für temporäre Tabellen verschiedene Arten von Importen sind – und der Entwickler kopiert und einfügt müde die Liste der Felder der Zieltabelle in die Deklaration seiner temporären ...

Aber Faulheit ist der Motor des Fortschritts! Deshalb Erstellen Sie eine neue Tabelle „basierend auf dem Beispiel“ es kann viel einfacher sein:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Da Sie dann viele Daten in dieser Tabelle generieren können, wird die Suche nie schnell gehen. Aber dafür gibt es eine traditionelle Lösung – Indizes! Und ja, Eine temporäre Tabelle kann auch Indizes haben.

Da die erforderlichen Indizes häufig mit den Indizes der Zieltabelle übereinstimmen, können Sie einfach schreiben LIKE target_table INCLUDING INDEXES.

Wenn Sie es auch brauchen DEFAULT-Werte (z. B. zum Ausfüllen der Primärschlüsselwerte) können Sie verwenden LIKE target_table INCLUDING DEFAULTS. Oder einfach - LIKE target_table INCLUDING ALL – kopiert Standardeinstellungen, Indizes, Einschränkungen usw.

Aber hier müssen Sie das verstehen, wenn Sie erstellt haben Wenn Sie die Tabelle sofort mit Indizes importieren, dauert das Laden der Daten längerals wenn man erst alles auffüllt und erst dann die Indizes hochrollt - sehen Sie sich das als Beispiel an pg_dump.

Im Allgemeinen, RTFM!

2. Wie schreibe ich?

Lassen Sie mich einfach sagen: Benutzen Sie es COPY-flow statt „pack“ INSERT, zeitweise Beschleunigung. Sie können dies sogar direkt aus einer vorgenerierten Datei tun.

3. Wie erfolgt die Bearbeitung?

Lassen wir unser Intro also etwa so aussehen:

  • Sie haben eine Tabelle mit Kundendaten in Ihrer Datenbank gespeichert 1 Mio. Datensätze
  • Jeden Tag schickt Ihnen ein Kunde ein neues vollständiges „Bild“
  • Aus Erfahrung weiß man das ab und zu Es werden nicht mehr als 10 Datensätze geändert

Ein klassisches Beispiel für eine solche Situation ist KLADR-Basis — Insgesamt gibt es viele Adressen, aber bei jedem wöchentlichen Upload gibt es selbst auf nationaler Ebene nur sehr wenige Änderungen (Umbenennung von Siedlungen, Zusammenlegung von Straßen, Erscheinen neuer Häuser).

3.1. Vollständiger Synchronisationsalgorithmus

Nehmen wir der Einfachheit halber an, dass Sie die Daten nicht einmal umstrukturieren müssen – bringen Sie die Tabelle einfach in die gewünschte Form, das heißt:

  • entfernen alles, was nicht mehr existiert
  • aktualisieren alles, was bereits vorhanden war und aktualisiert werden muss
  • einfügen alles, was noch nicht passiert ist

Warum sollten die Vorgänge in dieser Reihenfolge durchgeführt werden? Denn so wächst die Tabellengröße minimal (Denken Sie an MVCC!).

LÖSCHEN VON dst

Nein, natürlich kommen Sie mit nur zwei Operationen aus:

  • entfernen (DELETE) alles im Allgemeinen
  • einfügen alles vom neuen Bild

Aber gleichzeitig, dank MVCC, Die Größe der Tabelle erhöht sich genau um das Doppelte! Das Erhalten von +1 Mio. Bildern von Datensätzen in der Tabelle aufgrund eines 10K-Updates ist eine mittelmäßige Redundanz ...

TRUNCATE dst

Ein erfahrenerer Entwickler weiß, dass sich das gesamte Tablet recht günstig reinigen lässt:

  • klar (TRUNCATE) die gesamte Tabelle
  • einfügen alles vom neuen Bild

Die Methode ist effektiv, manchmal durchaus zutreffend, aber es gibt ein Problem ... Wir werden über einen längeren Zeitraum 1 Million Datensätze hinzufügen, daher können wir es uns nicht leisten, die Tabelle die ganze Zeit leer zu lassen (was passieren würde, wenn wir sie nicht in eine einzige Transaktion einbinden).

Und das heißt:

  • wir fangen an Langfristige Transaktion
  • TRUNCATE auferlegt Exklusiver Zugriff-Blockierung
  • Wir machen das Einfügen schon lange und alle anderen zu dieser Zeit kann nicht einmal SELECT

Etwas läuft nicht gut...

TABELLE ÄNDERN… UMBENENNEN… / TABELLE DROP…

Eine Alternative besteht darin, alles in eine separate neue Tabelle einzutragen und diese dann einfach anstelle der alten umzubenennen. Ein paar böse Kleinigkeiten:

  • immer noch auch Exklusiver Zugriff, wenn auch deutlich weniger Zeit
  • alle Abfragepläne/Statistiken für diese Tabelle werden zurückgesetzt, muss ANALYZE ausführen
  • Alle Fremdschlüssel sind defekt (FK) zum Tisch

Es gab einen WIP-Patch von Simon Riggs, der die Erstellung vorschlug ALTER-eine Operation zum Ersetzen des Tabellenkörpers auf Dateiebene, ohne Statistiken und FK zu berühren, aber kein Quorum zu sammeln.

LÖSCHEN, AKTUALISIEREN, EINFÜGEN

Daher entscheiden wir uns für die nicht blockierende Option mit drei Vorgängen. Fast drei... Wie geht das am effektivsten?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Nachbearbeitung importieren

Im selben KLADR müssen alle geänderten Datensätze zusätzlich einer Nachbearbeitung unterzogen werden – normalisiert, Schlüsselwörter hervorgehoben und auf die erforderlichen Strukturen reduziert. Aber woher weißt du das - was genau sich geändert hatohne den Synchronisationscode zu komplizieren, idealerweise ohne ihn überhaupt zu berühren?

Wenn Ihr Prozess zum Zeitpunkt der Synchronisierung nur Schreibzugriff hat, können Sie einen Trigger verwenden, der alle Änderungen für uns sammelt:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Jetzt können wir Trigger anwenden, bevor wir mit der Synchronisierung beginnen (oder sie über aktivieren). ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

Und dann extrahieren wir in aller Ruhe alle benötigten Änderungen aus den Protokolltabellen und führen sie über zusätzliche Handler aus.

3.3. Verknüpfte Sätze importieren

Oben haben wir Fälle betrachtet, in denen die Datenstrukturen von Quelle und Ziel identisch sind. Was aber, wenn der Upload von einem externen System ein anderes Format als die Speicherstruktur in unserer Datenbank hat?

Nehmen wir als Beispiel die Speicherung von Kunden und deren Konten, die klassische „Many-to-One“-Variante:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Aber der Download von einer externen Quelle kommt zu uns in Form von „All in One“:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Offensichtlich können Kundendaten in dieser Version dupliziert werden, und der Hauptdatensatz ist „Konto“:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Für das Modell fügen wir einfach unsere Testdaten ein, aber denken Sie daran: COPY effizienter!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Lassen Sie uns zunächst die „Kürzungen“ hervorheben, auf die sich unsere „Fakten“ beziehen. In unserem Fall beziehen sich Rechnungen auf Kunden:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Um Konten korrekt mit Kunden-IDs zu verknüpfen, müssen wir diese Identifikatoren zunächst herausfinden oder generieren. Fügen wir darunter Felder hinzu:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Verwenden wir die oben beschriebene Tabellensynchronisierungsmethode mit einer kleinen Änderung – wir aktualisieren oder löschen nichts in der Zieltabelle, da wir Clients „nur anhängen“ importieren:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Eigentlich ist alles drin invoice_import Jetzt haben wir das Kontaktfeld ausgefüllt client_id, mit der wir die Rechnung einfügen.

Source: habr.com

Kommentar hinzufügen