DBA: organiza de forma competente sincronizacións e importacións

Para o procesamento complexo de grandes conxuntos de datos (diferentes Procesos ETL: importacións, conversións e sincronización cunha fonte externa) moitas veces hai unha necesidade "lembra" temporalmente e procesa inmediatamente rapidamente algo voluminoso.

Unha tarefa típica deste tipo adoita soar algo así: "Xusto aquí departamento de contabilidade descargado do banco cliente Os últimos pagos recibidos, debes cargalos rapidamente no sitio web e vinculalos ás túas contas.

Pero cando o volume deste "algo" comeza a medirse en centos de megabytes e o servizo debe seguir funcionando coa base de datos 24x7, xorden moitos efectos secundarios que arruinarán a túa vida.
DBA: organiza de forma competente sincronizacións e importacións
Para tratar con eles en PostgreSQL (e non só nel), podes utilizar algunhas optimizacións que che permitirán procesar todo máis rápido e con menos consumo de recursos.

1. Onde enviar?

En primeiro lugar, imos decidir onde podemos cargar os datos que queremos "procesar".

1.1. Táboas temporais (TEMPORARY TABLE)

En principio, para PostgreSQL as táboas temporais son as mesmas que calquera outra. Polo tanto, supersticións como "Todo aí está almacenado só na memoria e pode acabar". Pero tamén hai varias diferenzas significativas.

O teu propio "espazo de nomes" para cada conexión á base de datos

Se dúas conexións intenta conectarse ao mesmo tempo CREATE TABLE x, entón alguén definitivamente conseguirá erro de non singularidade obxectos da base de datos.

Pero se ambos intentan executar CREATE TEMPORARY TABLE x, entón ambos o farán normalmente, e todos conseguirán a súa copia táboas. E non haberá nada en común entre eles.

"Autodestrución" ao desconectar

Cando se pecha a conexión, todas as táboas temporais elimínanse automaticamente, polo que manualmente DROP TABLE x non ten sentido excepto...

Se estás traballando pgbouncer en modo de transacción, entón a base de datos segue crendo que esta conexión aínda está activa, e nela aínda existe esta táboa temporal.

Polo tanto, tentando crealo de novo, desde unha conexión diferente a pgbouncer, producirase un erro. Pero isto pódese evitar usando CREATE TEMPORARY TABLE IF NOT EXISTS x.

É certo, é mellor non facelo de todos os xeitos, porque entón podes atopar "de súpeto" alí os datos que quedan do "propietario anterior". Pola contra, é moito mellor ler o manual e ver que ao crear unha táboa é posible engadir ON COMMIT DROP - é dicir, cando se complete a transacción, a táboa eliminarase automaticamente.

Non replicación

Como só pertencen a unha conexión específica, as táboas temporais non se replican. Pero isto elimina a necesidade de dobre gravación de datos no montón + WAL, polo que INSERT/UPDATE/DELETE é significativamente máis rápido.

Pero como unha táboa temporal aínda é unha táboa "case ordinaria", tampouco se pode crear nunha réplica. Polo menos polo momento, aínda que hai tempo que circula o parche correspondente.

1.2. TÁBOA DESCONECTADA

Pero que debes facer, por exemplo, se tes algún tipo de proceso ETL engorroso que non se pode implementar nunha transacción, pero aínda tes pgbouncer en modo de transacción? ..

Ou o fluxo de datos é tan grande que Non hai suficiente ancho de banda nunha conexión desde unha base de datos (lectura, un proceso por CPU)?...

Ou están a realizarse algunhas operacións de forma asíncrona en diferentes conexións?...

Aquí só hai unha opción: crear temporalmente unha táboa non temporal. Xogo de palabras, si. É dicir:

  • creou táboas "a miña propia" con nomes ao máximo para non cruzarse con ninguén
  • Extraer: encheunos con datos dunha fonte externa
  • Transformar: convertido, cuberto nos campos de ligazón clave
  • Carga: verteu datos listos nas táboas de destino
  • borraron as "miñas" táboas

E agora - unha mosca na pomada. De feito, todas as escrituras en PostgreSQL suceden dúas veces - primeiro en WAL, despois nos corpos da táboa/índice. Todo isto faise para soportar ACID e corrixir a visibilidade dos datos entre COMMIT'noz e ROLLBACK'transaccións nulas.

Pero non necesitamos isto! Temos todo o proceso Ou foi completamente exitoso ou non.. Non importa cantas transaccións intermedias haxa: non nos interesa "continuar o proceso desde o medio", especialmente cando non está claro onde estaba.

Para iso, os desenvolvedores de PostgreSQL, xa na versión 9.1, introduciron tal cousa como Táboas non rexistradas:

Con esta indicación, a táboa créase como non rexistrada. Os datos escritos en táboas non rexistradas non pasan polo rexistro de escritura anticipada (consulte o Capítulo 29), polo que tales táboas traballar moito máis rápido do habitual. Non obstante, non son inmunes ao fracaso; en caso de falla do servidor ou apagado de emerxencia, unha táboa non rexistrada truncado automaticamente. Ademais, o contido da táboa non rexistrada non replicado aos servidores escravos. Calquera índice creado nunha táboa non rexistrada queda automaticamente sen rexistro.

En definitiva, será moito máis rápido, pero se o servidor de base de datos "cae", será desagradable. Pero cantas veces sucede isto e o teu proceso ETL sabe como corrixilo correctamente "desde o medio" despois de "revitalizar" a base de datos?...

Se non, e o caso anterior é semellante ao teu, usa UNLOGGEDpero nunca non habilite este atributo en táboas reais, cuxos datos son queridos.

1.3. ON COMMIT { BORRAR FILAS | SOLAR}

Esta construción permítelle especificar un comportamento automático cando se completa unha transacción ao crear unha táboa.

en ON COMMIT DROP Xa escribín arriba, xera DROP TABLE, pero con ON COMMIT DELETE ROWS a situación é máis interesante: xérase aquí TRUNCATE TABLE.

Dado que toda a infraestrutura para almacenar a metadescrición dunha táboa temporal é exactamente a mesma que a dunha táboa normal, entón A creación e eliminación constantes de táboas temporais leva a un "inchazo" grave das táboas do sistema pg_class, pg_attribute, pg_attrdef, pg_depend,...

Agora imaxina que tes un traballador en conexión directa coa base de datos, que abre unha nova transacción cada segundo, crea, enche, procesa e elimina unha táboa temporal... Haberá un exceso de lixo acumulado nas táboas do sistema, e isto provocará freos adicionais para cada operación.

En xeral, non o fagas! Neste caso é moito máis eficaz CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS elimínao do ciclo de transaccións; entón ao comezo de cada nova transacción as táboas xa están existirá (garda unha chamada CREATE), pero estará baleiro, grazas a TRUNCATE (tamén gardamos a súa chamada) ao completar a transacción anterior.

1.4. COMO... INCLUÍDO...

Mencionei ao principio que un dos casos de uso típicos das táboas temporais son varios tipos de importacións, e o desenvolvedor pega cansadamente a lista de campos da táboa de destino na declaración da súa temporal...

Pero a preguiza é o motor do progreso! Por iso crear unha nova táboa "baseada na mostra" pode ser moito máis sinxelo:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Dado que entón pode xerar moitos datos nesta táboa, a busca nunca será rápida. Pero hai unha solución tradicional para isto: índices! E, si, unha táboa temporal tamén pode ter índices.

Xa que, moitas veces, os índices necesarios coinciden cos índices da táboa de destino, pode simplemente escribir LIKE target_table INCLUDING INDEXES.

Se tamén necesitas DEFAULT-valores (por exemplo, para cubrir os valores da chave primaria), pode usar LIKE target_table INCLUDING DEFAULTS. Ou simplemente - LIKE target_table INCLUDING ALL — copia valores predeterminados, índices, restricións,...

Pero aquí tes que entender iso se creaches importar a táboa inmediatamente con índices, entón os datos tardarán máis en cargarseque se primeiro enche todo e só despois enrola os índices: mira como fai isto como exemplo pg_dump.

En xeral, RTFM!

2. Como escribir?

Déixame dicir: úsao COPY-flow no canto de "pack" INSERT, aceleración por momentos. Podes incluso directamente desde un ficheiro xerado previamente.

3. Como procesar?

Entón, deixemos que a nosa intro se vexa así:

  • tes unha táboa cos datos do cliente almacenados na túa base de datos 1M rexistros
  • cada día un cliente envíache un novo "imaxe" completa
  • por experiencia sabes que de cando en vez non se cambian máis de 10 rexistros

Un exemplo clásico de tal situación é Base KLADR — Hai moitos enderezos en total, pero en cada carga semanal hai moi poucos cambios (denominación de asentamentos, combinación de rúas, aparición de novas vivendas) mesmo a escala nacional.

3.1. Algoritmo de sincronización completa

Para simplificar, digamos que nin sequera precisa reestruturar os datos, só trae a táboa na forma desexada, é dicir:

  • eliminar todo o que xa non existe
  • para actualizar todo o que xa existía e necesita ser actualizado
  • inserir todo o que aínda non pasou

Por que se deben facer as operacións nesta orde? Porque así é como o tamaño da táboa crecerá mínimamente (lembrar MVCC!).

ELIMINAR DE DST

Non, por suposto que podes facelo con só dúas operacións:

  • eliminar (DELETE) todo en xeral
  • inserir todo dende a nova imaxe

Pero ao mesmo tempo, grazas a MVCC, O tamaño da mesa aumentará exactamente dúas veces! Conseguir +1 millón de imaxes de rexistros na táboa debido a unha actualización de 10 XNUMX é unha redundancia...

TRUNCADO dst

Un desenvolvedor máis experimentado sabe que a tableta enteira se pode limpar bastante barato:

  • claro (TRUNCATE) toda a táboa
  • inserir todo dende a nova imaxe

O método é eficaz, ás veces bastante aplicable, pero hai un problema... Estaremos engadindo rexistros de 1M durante moito tempo, polo que non podemos permitirnos o luxo de deixar a táboa baleira durante todo este tempo (como sucederá sen envolvela nunha soa transacción).

O que significa:

  • estamos comezando transacción de longa duración
  • TRUNCATE impón Acceso Exclusivo-bloqueo
  • facemos a inserción durante moito tempo, e todos os demais neste momento nin sequera pode SELECT

Algo non vai ben...

ALTER TABLE... RENOME... / SOLTAR TÁBOA...

Unha alternativa é encher todo nunha nova táboa separada e, a continuación, simplemente renomeala no lugar da antiga. Un par de pequenas cousas desagradables:

  • aínda tamén Acceso Exclusivo, aínda que significativamente menos tempo
  • restablecéronse todos os plans/estatísticas de consulta desta táboa, cómpre executar ANALYZE
  • todas as claves estranxeiras están rotas (FK) á táboa

Había un parche WIP de Simon Riggs que suxeriu facer ALTER-unha operación para substituír o corpo da táboa a nivel de ficheiro, sen tocar as estatísticas e FK, pero non recolleu quórum.

ELIMINAR, ACTUALIZAR, INSERIR

Entón, decidimos a opción de non bloqueo de tres operacións. Case tres... Como facelo de forma máis eficaz?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Post-procesamento de importación

No mesmo KLADR, todos os rexistros modificados deben ser executados adicionalmente mediante o post-procesamento: normalizados, as palabras clave destacadas e reducidas ás estruturas necesarias. Pero como sabes - que cambiou exactamentesen complicar o código de sincronización, idealmente sen tocalo en absoluto?

Se só o teu proceso ten acceso de escritura no momento da sincronización, podes usar un disparador que recollerá todos os cambios por nós:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Agora podemos aplicar disparadores antes de iniciar a sincronización (ou activalos mediante ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

E despois extraemos con calma todos os cambios que necesitamos das táboas de rexistro e executámolos a través de controladores adicionais.

3.3. Importación de conxuntos vinculados

Enriba consideramos casos nos que as estruturas de datos da orixe e do destino son as mesmas. Pero e se a carga desde un sistema externo ten un formato diferente da estrutura de almacenamento da nosa base de datos?

Poñamos como exemplo o almacenamento dos clientes e as súas contas, a clásica opción "moitos a un":

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Pero a descarga desde unha fonte externa chéganos en forma de "todo en un":

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Obviamente, os datos do cliente pódense duplicar nesta versión e o rexistro principal é "conta":

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Para o modelo, simplemente inseriremos os nosos datos de proba, pero lembra: COPY máis eficiente!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

En primeiro lugar, imos destacar aqueles “cortes” aos que se refiren os nosos “feitos”. No noso caso, as facturas refírense a clientes:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Para asociar correctamente as contas cos ID de clientes, primeiro necesitamos descubrir ou xerar estes identificadores. Engadimos campos baixo eles:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Usemos o método de sincronización de táboas descrito anteriormente cunha pequena modificación: non actualizaremos nin eliminaremos nada da táboa de destino, porque importamos clientes "só anexar":

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

De feito, todo está dentro invoice_import Agora temos o campo de contacto cuberto client_id, co que inseriremos a factura.

Fonte: www.habr.com

Engadir un comentario