DBA: competently organize synchronization and imports

When complex processing of large data sets (various ETL processes: imports, conversions and synchronization with an external source) often there is a need temporarily "remember", and immediately quickly process something voluminous.

A typical task of this kind usually sounds something like this: "Right here accounting unloaded from the client-bank the last received payments, you need to quickly upload them to the site and link them to the accounts "

But when the volume of this “something” starts to be measured in hundreds of megabytes, and the service must continue to work with the database in 24x7 mode, there are many side effects that will ruin your life.
DBA: competently organize synchronization and imports
To deal with them in PostgreSQL (and not only in it), you can use some opportunities for optimizations that will allow you to process everything faster and with less resource consumption.

1. Where to ship?

First, let's decide where we can fill in the data that we want to "process".

1.1. Temporary tables (TEMPORARY TABLE)

In principle, for PostgreSQL, temporary tables are the same tables as any others. Therefore, superstitions like “everything is stored there only in memory, and it can end”. But there are also several significant differences.

Own "namespace" for each connection to the database

If two connections try to connect at the same time CREATE TABLE x, then someone will definitely get non-uniqueness error database objects.

But if both try to execute CREATE TEMPORARY TABLE x, then both will do it normally, and each will receive your copy tables. And there will be nothing in common between them.

"Self-destruction" on disconnect

When the connection is closed, all temporary tables are automatically deleted, so "manually" perform DROP TABLE x makes no sense except...

If you are working through pgbouncer in transaction mode, then the database continues to believe that this connection is still active, and this temporary table still exists in it.

Therefore, an attempt to create it again, already from another connection to pgbouncer, will result in an error. But this can be bypassed by using CREATE TEMPORARY TABLE IF NOT EXISTS x.

True, it’s better not to do this after all, because then you can “suddenly” find there the data left over from the “previous owner”. Instead, it is much better to read the manual, and see that when creating a table, it is possible to add ON COMMIT DROP - that is, when the transaction is completed, the table will be automatically deleted.

Non-replication

By virtue of belonging only to a specific connection, temporary tables are not replicated. But this eliminates the need to double-write data into heap + WAL, so INSERT/UPDATE/DELETE into it is significantly faster.

But since a temporary table is still an “almost ordinary” table, it cannot be created on a replica either. At least for now, although the corresponding patch has been around for a long time.

1.2. Unlogged tables (UNLOGGED TABLE)

But what to do, for example, if you have some kind of cumbersome ETL process that cannot be implemented within a single transaction, but you still pgbouncer in transaction mode? ..

Or is the data flow so large that not enough bandwidth per connection from the database (read, one process per CPU)?..

Or some operations are going asynchronously in different connections?..

There is only one option here - temporarily create a non-temp table. Pun, yeah. That is:

  • created "his" tables with maximally random names so as not to cross paths with anyone
  • Extract: filled them with data from an external source
  • Transform: converted, filled in key binding fields
  • Load: poured ready data into target tables
  • deleted "own" tables

And now - a fly in the ointment. In fact, all writing in PostgreSQL happens twicefirst in WAL, then already in the body of the table/indexes. All this is done to support ACID and correct data visibility between COMMIT'chick and ROLLBACK'needed transactions.

But we don't need that! We have the whole process either completely successful or not. It doesn’t matter how many intermediate transactions there will be - we are not interested in “continuing the process from the middle”, especially when it is not clear where it was.

To do this, PostgreSQL developers, back in version 9.1, introduced such a thing as unlogged (UNLOGGED) tables:

With this hint, the table is created as non-logged. Data written to non-logged tables does not go through the write-ahead log (see Chapter 29), causing such tables to work much faster than normal. However, they are not fault-proof; on server crash or shutdown, unlogged table automatically truncated. In addition, the contents of the unlogged table not replicated to slave servers. Any indexes created on an unlogged table automatically become unlogged.

In short, will be much faster, but if the database server "falls" - it will be unpleasant. But how often does this happen, and is your ETL process able to correctly refine this “from the middle” after the “revival” of the database?..

If not, and the case above is similar to yours, use UNLOGGEDbut never don't enable this attribute on real tables, the data of which is dear to you.

1.3. ON COMMIT { DELETE ROWS | DROP}

This construction allows you to set automatic behavior when a transaction is completed when creating a table.

About ON COMMIT DROP I already wrote above, it generates DROP TABLE, but with ON COMMIT DELETE ROWS the situation is more interesting - here it is generated TRUNCATE TABLE.

Since the entire infrastructure for storing the meta description of a temporary table is exactly the same as that of a regular table, then the constant creation and deletion of temporary tables leads to a strong "swelling" of system tables pg_class, pg_attribute, pg_attrdef, pg_depend,…

Now imagine that you have a worker on a direct connection to the database, which opens a new transaction every second, creates, fills, processes and deletes a temporary table ... Garbage in the system tables will accumulate in excess, and these are extra brakes for each operation.

In general, it is not necessary! Much more efficient in this case. CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS take it out of the transaction cycle - then by the beginning of each new transaction, the tables will already will exist (we save a call CREATE), but will be emptythanks to TRUNCATE (we also saved its call) at the end of the previous transaction.

1.4. LIKE…INCLUDING…

I mentioned at the beginning that one of the typical use cases for temporary tables is all sorts of imports - and the developer wearily copy-pastes the list of fields of the target table into the declaration of his temporary ...

But laziness is the engine of progress! That's why create a new spreadsheet can be much easier:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

Since you can then generate a lot of data into this table, searches through it will never be fast. But there is a traditional solution to this - indexes! And yes a temporary table can also have indexes.

Since, often, the desired indexes are the same as the indexes of the target table, you can simply write LIKE target_table INCLUDING INDEXES.

If you need more DEFAULT-values ​​(for example, to fill in the values ​​of the primary key), you can use LIKE target_table INCLUDING DEFAULTS. Or just - LIKE target_table INCLUDING ALL — will copy defaults, indexes, constraints,…

But here you already need to understand that if you created import-table immediately with indexes, then the data will be filled in longerthan if you fill everything first, and only then roll up the indexes - see as an example how it does pg_dump.

Generally, RTFM!

2. How to write?

Simply put - use COPY-stream instead of "pack" INSERT, many times acceleration. You can even directly from a pre-generated file.

3. How to process?

So, let's say our introduction looks something like this:

  • you have a table with client data stored in your database on 1M records
  • every day the client sends you a new one complete "image"
  • you know from experience that from time to time no more than 10K records change

A classic example of such a situation is base KLADR - there are a lot of addresses, but in each weekly upload there are very few changes (renames of settlements, street mergers, new houses) even on a national scale.

3.1. Full Synchronization Algorithm

For simplicity, let's say that you don't even need to restructure the data - just bring the table into the desired form, that is:

  • remove all that is no more
  • refresh everything that was already there and needs to be updated
  • insert everything that hasn't happened yet

Why is it necessary to do the operations in this order? Because this is how the size of the table will grow minimally (remember MVCC!).

DELETE FROM dst

No, of course you can do with just two operations:

  • remove (DELETE) in general everything
  • insert all from the new image

But at the same time, thanks to MVCC, the size of the table will increase exactly twice! Get +1M table record images due to a 10K update - so-so redundancy...

TRUNCATE dst

A more experienced developer knows that the entire table can be cleared quite cheaply:

  • clear (TRUNCATE) the whole table
  • insert all from the new image

The method is effective sometimes applicable, but there is bad luck ... We will be pouring 1M records for a long time, so we cannot afford to leave the table empty for all this time (as it will happen without wrapping it into a single transaction).

And that means:

  • we are starting long running transaction
  • TRUNCATE imposes Access Exclusive- blocking
  • we make an insert for a long time, and everyone else at this time can't even SELECT

Something goes wrong...

ALTER TABLE… RENAME… / DROP TABLE…

As an option, fill everything into a separate new table, and then simply rename it to the place of the old one. A couple of annoying little things:

  • still too Access Exclusive, although much less time
  • all query plans/statistics of this table are reset, you need to drive ANALYZE
  • all foreign keys break (FK) per table

There was a WIP patch from Simon Riggs that suggested doing ALTER- an operation to change the body of the table at the file level, without touching the statistics and FK, but did not collect a quorum.

DELETE, UPDATE, INSERT

So, we stop on a non-blocking variant of three operations. Almost three... How to do it most effectively?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. Import post-processing

In the same KLADR, all changed records must be additionally run through post-processing - normalized, highlighted keywords, brought to the desired structures. But how to find out what exactly changedwithout complicating the synchronization code, ideally without touching it at all?

If only your process has write access at the time of synchronization, then you can use a trigger that will collect all the changes for us:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Now we can apply triggers before starting synchronization (or turn them on via ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

And then we calmly extract all the changes we need from the log tables and run them through additional processors.

3.3. Importing Related Sets

Above, we considered cases where the data structures of the source and destination are the same. But what if the upload from an external system has a format different from the storage structure in our database?

Let's take as an example the storage of customers and invoices for them, the classic many-to-one option:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

But unloading from an external source comes to us in the form of "all in one":

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

Obviously, customer data can be duplicated in this version, and the main record is the “account”:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

For the model, just insert our test data, but remember - COPY more efficient!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

Let us first single out those "sections" to which our "facts" refer. In our case, invoices refer to customers:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

In order for accounts to be correctly associated with customer IDs, we first need to know or generate these identifiers. Let's add fields below them:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

Let's use the table synchronization method described above with a slight correction - we will not update or delete anything in the target table, because we have "append-only" client imports:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

Actually, everything is invoice_import now we have a field filled client_id, with which we will insert the account.

Source: habr.com

Add a comment