DBA: organize sincronizações e importações com competência

Para processamento complexo de grandes conjuntos de dados (diferentes Processos ETL: importações, conversões e sincronização com uma fonte externa) muitas vezes há necessidade temporariamente “lembre-se” e processe imediatamente e rapidamente algo volumoso.

Uma tarefa típica desse tipo geralmente soa mais ou menos assim: "Bem aqui departamento de contabilidade descarregado do banco cliente os últimos pagamentos recebidos, você precisa carregá-los rapidamente no site e vinculá-los às suas contas"

Mas quando o volume desse “algo” começa a medir centenas de megabytes, e o serviço precisa continuar funcionando com o banco de dados 24 horas por dia, 7 dias por semana, surgem muitos efeitos colaterais que vão arruinar sua vida.
DBA: organize sincronizações e importações com competência
Para lidar com eles no PostgreSQL (e não só nele), você pode usar algumas otimizações que permitirão processar tudo mais rápido e com menor consumo de recursos.

1. Para onde enviar?

Primeiro, vamos decidir onde podemos fazer upload dos dados que queremos “processar”.

1.1. Tabelas temporárias (TEMPORARY TABLE)

Em princípio, para o PostgreSQL as tabelas temporárias são iguais a qualquer outra. Portanto, superstições como “Tudo lá fica guardado apenas na memória e pode acabar”. Mas também existem várias diferenças significativas.

Seu próprio “namespace” para cada conexão com o banco de dados

Se duas conexões tentarem se conectar ao mesmo tempo CREATE TABLE x, então alguém definitivamente vai conseguir erro de não exclusividade objetos de banco de dados.

Mas se ambos tentarem executar CREATE TEMPORARY TABLE x, então ambos farão isso normalmente e todos receberão sua cópia tabelas. E não haverá nada em comum entre eles.

"Autodestruição" ao desconectar

Quando a conexão é encerrada, todas as tabelas temporárias são excluídas automaticamente, portanto, manualmente DROP TABLE x não há sentido, exceto...

Se você está trabalhando pgbouncer em modo de transação, então o banco de dados continua acreditando que esta conexão ainda está ativa, e nela esta tabela temporária ainda existe.

Portanto, tentar criá-lo novamente, a partir de uma conexão diferente com o pgbouncer, resultará em erro. Mas isso pode ser contornado usando CREATE TEMPORARY TABLE IF NOT EXISTS x.

É verdade que é melhor não fazer isso de qualquer maneira, porque então você pode “de repente” encontrar ali os dados restantes do “proprietário anterior”. Em vez disso, é muito melhor ler o manual e ver que ao criar uma tabela é possível adicionar ON COMMIT DROP - ou seja, quando a transação for concluída, a tabela será excluída automaticamente.

Não replicação

Por pertencerem apenas a uma conexão específica, as tabelas temporárias não são replicadas. Mas isso elimina a necessidade de gravação dupla de dados no heap + WAL, então INSERT/UPDATE/DELETE nele é muito mais rápido.

Mas como uma tabela temporária ainda é uma tabela “quase comum”, ela também não pode ser criada em uma réplica. Pelo menos por enquanto, embora o patch correspondente já circule há muito tempo.

1.2. TABELA NÃO LOGADA

Mas o que você deve fazer, por exemplo, se tiver algum tipo de processo ETL complicado que não pode ser implementado em uma transação, mas ainda tiver pgbouncer em modo de transação? ..

Ou o fluxo de dados é tão grande que Não há largura de banda suficiente em uma conexão de um banco de dados (leitura, um processo por CPU)?..

Ou algumas operações estão acontecendo de forma assíncrona em conexões diferentes?..

Só há uma opção aqui - criar temporariamente uma tabela não temporária. Trocadilho, sim. Aquilo é:

  • criei tabelas “minhas próprias” com nomes aleatórios ao máximo para não cruzar com ninguém
  • Extrair: preencheu-os com dados de uma fonte externa
  • Transformar: convertido, preenchido nos principais campos de vinculação
  • Ver: despejou dados prontos em tabelas de destino
  • excluiu “minhas” tabelas

E agora - uma mosca na sopa. Na verdade, todas as gravações no PostgreSQL acontecem duas vezes - primeiro em WALe, em seguida, nos corpos da tabela/índice. Tudo isso é feito para suportar ACID e corrigir a visibilidade dos dados entre COMMIT'noz e ROLLBACK'transações nulas.

Mas não precisamos disso! Temos todo o processo ou foi completamente bem sucedido ou não. Não importa quantas transações intermediárias haverá – não estamos interessados ​​em “continuar o processo a partir do meio”, especialmente quando não está claro onde estava.

Para fazer isso, os desenvolvedores do PostgreSQL, na versão 9.1, introduziram algo como Tabelas UNLOGGED:

Com esta indicação a tabela é criada como não registrada. Os dados gravados em tabelas não registradas não passam pelo log write-ahead (consulte o Capítulo 29), fazendo com que tais tabelas sejam trabalhe muito mais rápido que o normal. Contudo, eles não estão imunes ao fracasso; em caso de falha do servidor ou desligamento de emergência, uma tabela não registrada automaticamente truncado. Além disso, o conteúdo da tabela não registrada não replicado para servidores escravos. Quaisquer índices criados em uma tabela não registrada tornam-se automaticamente não registrados.

Em suma, será muito mais rápido, mas se o servidor de banco de dados “cair”, será desagradável. Mas com que frequência isso acontece, e seu processo ETL sabe como corrigir isso corretamente “do meio” após “revitalizar” o banco de dados?..

Caso contrário, e o caso acima for semelhante ao seu, use UNLOGGEDmas nunca não habilite este atributo em tabelas reais, cujos dados são caros para você.

1.3. ON COMMIT {EXCLUIR LINHAS | DERRUBAR}

Esta construção permite especificar o comportamento automático quando uma transação é concluída ao criar uma tabela.

Про ON COMMIT DROP Já escrevi acima, gera DROP TABLE, mas com ON COMMIT DELETE ROWS a situação é mais interessante - é gerada aqui TRUNCATE TABLE.

Como toda a infraestrutura para armazenar a meta-descrição de uma tabela temporária é exatamente igual à de uma tabela normal, então A criação e exclusão constantes de tabelas temporárias levam a um grave “inchaço” das tabelas do sistema pg_class, pg_attribute, pg_attrdef, pg_depend,…

Agora imagine que você tem um trabalhador em conexão direta com o banco de dados, que abre uma nova transação a cada segundo, cria, preenche, processa e exclui uma tabela temporária... Haverá um excesso de lixo acumulado nas tabelas do sistema, e isso causará freios extras para cada operação.

Em geral, não faça isso! Neste caso é muito mais eficaz CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS tirá-lo do ciclo de transação - então, no início de cada nova transação, as tabelas já estão existirá (salvar uma chamada CREATE), mas estará vazio, graças a TRUNCATE (também salvamos sua chamada) ao concluir a transação anterior.

1.4. COMO...INCLUINDO...

Mencionei no início que um dos casos de uso típicos para tabelas temporárias são vários tipos de importações - e o desenvolvedor copia e cola cansadamente a lista de campos da tabela de destino na declaração de seu temporário...

Mas a preguiça é o motor do progresso! É por isso crie uma nova tabela “com base na amostra” pode ser muito mais simples:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Como você pode gerar muitos dados nessa tabela, pesquisá-la nunca será rápido. Mas existe uma solução tradicional para isso: índices! E sim, uma tabela temporária também pode ter índices.

Como muitas vezes os índices necessários coincidem com os índices da tabela de destino, você pode simplesmente escrever LIKE target_table INCLUDING INDEXES.

Se você também precisa DEFAULT-valores (por exemplo, para preencher os valores da chave primária), você pode usar LIKE target_table INCLUDING DEFAULTS. Ou simplesmente - LIKE target_table INCLUDING ALL — copia padrões, índices, restrições,...

Mas aqui você precisa entender que se você criou importar tabela imediatamente com índices, então os dados levarão mais tempo para carregardo que se você primeiro preencher tudo e só depois acumular os índices - veja como faz isso como exemplo pg_dump.

Em geral, RTFM!

2. Como escrever?

Deixe-me apenas dizer: use-o COPY-fluxo em vez de “pacote” INSERT, aceleração às vezes. Você pode até mesmo diretamente de um arquivo pré-gerado.

3. Como processar?

Então, vamos deixar nossa introdução ficar mais ou menos assim:

  • você tem uma tabela com dados de clientes armazenados em seu banco de dados 1 milhão de registros
  • todo dia um cliente te manda um novo "imagem" completa
  • por experiência você sabe que de vez em quando não mais do que 10 mil registros são alterados

Um exemplo clássico de tal situação é Base KLADR — são muitos endereços no total, mas em cada upload semanal há muito poucas mudanças (renomeação de assentamentos, combinação de ruas, aparecimento de novas casas), mesmo em escala nacional.

3.1. Algoritmo de sincronização completo

Para simplificar, digamos que você nem precise reestruturar os dados - basta trazer a tabela para o formato desejado, ou seja:

  • remova tudo que não existe mais
  • refrescar tudo que já existia e precisa ser atualizado
  • insira tudo o que ainda não aconteceu

Por que as operações deveriam ser feitas nesta ordem? Porque é assim que o tamanho da tabela crescerá minimamente (lembre-se do MVCC!).

EXCLUIR DO dst

Não, claro que você pode sobreviver com apenas duas operações:

  • remova (DELETE) tudo em geral
  • insira tudo da nova imagem

Mas ao mesmo tempo, graças ao MVCC, O tamanho da mesa aumentará exatamente duas vezes! Obter +1 milhão de imagens de registros na tabela devido a uma atualização de 10K é uma redundância moderada...

TRUNCAR dst

Um desenvolvedor mais experiente sabe que todo o tablet pode ser limpo de forma bastante barata:

  • claro (TRUNCATE) tabela inteira
  • insira tudo da nova imagem

O método é eficaz, às vezes bastante aplicável, mas há um problema... Estaremos adicionando 1 milhão de registros por um longo tempo, então não podemos nos dar ao luxo de deixar a tabela vazia por todo esse tempo (como acontecerá sem envolvê-la em uma única transação).

E isso significa:

  • estamos começando transação de longa duração
  • TRUNCATE impõe Acesso Exclusivo-bloqueio
  • a gente faz a inserção há muito tempo, e todo mundo nessa hora não posso nem SELECT

Algo não está indo bem...

ALTER TABLE… RENOMEAR… / DROP TABLE…

Uma alternativa é preencher tudo em uma nova tabela separada e simplesmente renomeá-la no lugar da antiga. Algumas coisinhas desagradáveis:

  • ainda também Acesso Exclusivo, embora significativamente menos tempo
  • todos os planos/estatísticas de consulta para esta tabela são redefinidos, precisa executar ANALYZE
  • todas as chaves estrangeiras estão quebradas (FK) para a mesa

Houve um patch WIP de Simon Riggs que sugeria fazer ALTER-uma operação para substituir o corpo da tabela no nível do arquivo, sem tocar nas estatísticas e no FK, mas não coletou quorum.

EXCLUIR, ATUALIZAR, INSERIR

Assim, optamos pela opção sem bloqueio de três operações. Quase três... Como fazer isso de forma mais eficaz?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Pós-processamento de importação

No mesmo KLADR, todos os registros alterados devem passar adicionalmente por pós-processamento - normalizados, palavras-chave destacadas e reduzidos às estruturas exigidas. Mas como você sabe - o que exatamente mudousem complicar o código de sincronização, de preferência sem mexer nele?

Se apenas o seu processo tiver acesso de gravação no momento da sincronização, você poderá usar um gatilho que coletará todas as alterações para nós:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Agora podemos aplicar gatilhos antes de iniciar a sincronização (ou habilitá-los via ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

E então extraímos com calma todas as alterações necessárias das tabelas de log e as executamos por meio de manipuladores adicionais.

3.3. Importando Conjuntos Vinculados

Acima consideramos casos em que as estruturas de dados da origem e do destino são as mesmas. Mas e se o upload de um sistema externo tiver um formato diferente da estrutura de armazenamento do nosso banco de dados?

Tomemos como exemplo o armazenamento de clientes e suas contas, a clássica opção “muitos para um”:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

Mas o download de uma fonte externa chega até nós na forma de “tudo em um”:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Obviamente, os dados do cliente podem ser duplicados nesta versão, e o registro principal é “conta”:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

Para o modelo, simplesmente inseriremos nossos dados de teste, mas lembre-se: COPY mais eficiente!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Primeiro, vamos destacar aqueles “cortes” aos quais se referem os nossos “fatos”. No nosso caso, as faturas referem-se aos clientes:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

Para associar corretamente as contas aos IDs dos clientes, primeiro precisamos descobrir ou gerar esses identificadores. Vamos adicionar campos abaixo deles:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Vamos usar o método de sincronização de tabelas descrito acima com uma pequena alteração - não atualizaremos ou excluiremos nada na tabela de destino, porque importamos clientes “somente acréscimos”:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Na verdade, está tudo em invoice_import Agora temos o campo de contato preenchido client_id, com o qual inseriremos a fatura.

Fonte: habr.com

Adicionar um comentário