DBA:有效地组织同步和导入

对于大数据集的复杂处理(不同 ETL流程:导入、转换和与外部源同步)经常有需要 暂时“记住”并立即快速处理 大量的东西。

此类典型任务通常听起来像这样: “就在这儿 会计部门从客户银行卸载 最后收到的付款,您需要快速将其上传到网站并将其链接到您的帐户”

但是,当这个“东西”的体积开始达到数百兆字节,并且该服务必须继续 24x7 与数据库一起工作时,就会出现许多副作用,从而毁掉您的生活。
DBA:有效地组织同步和导入
要在 PostgreSQL 中(而不仅仅是在 PostgreSQL 中)处理它们,您可以使用一些优化,这将使您能够更快地处理所有内容并减少资源消耗。

1. 运送到哪里?

首先,让我们决定可以在哪里上传我们想要“处理”的数据。

1.1. 临时表(TEMPORARY TABLE)

原则上,PostgreSQL 临时表与其他临时表相同。 因此,迷信如 “那里的一切都只储存在记忆中,一切都会结束”。 但也存在一些显着差异。

每个数据库连接都有您自己的“命名空间”

如果两个连接尝试同时连接 CREATE TABLE x,那么肯定有人会得到 非唯一性错误 数据库对象。

但如果两者都尝试执行 CREATE TEMPORARY TABLE x,那么两个人都会正常做,每个人都会得到 你的副本 表。 他们之间不会有任何共同点。

断开连接时“自毁”

当连接关闭时,所有临时表都会自动删除,所以需要手动删除 DROP TABLE x 没有任何意义,除了...

如果您正在通过 pgbouncer 处于交易模式,那么数据库继续认为该连接仍然处于活动状态,并且其中该临时表仍然存在。

因此,尝试从与 pgbouncer 的不同连接再次创建它,将导致错误。 但这可以通过使用来规避 CREATE TEMPORARY TABLE IF NOT EXISTS x.

确实,无论如何最好不要这样做,因为这样你就可以“突然”在那里找到“前任所有者”留下的数据。 相反,最好阅读手册并了解在创建表时可以添加 ON COMMIT DROP - 即当事务完成时,该表将被自动删除。

不可复制

由于它们仅属于特定连接,因此不会复制临时表。 但 这消除了双重记录数据的需要 在堆+ WAL 中,因此插入/更新/删除速度要快得多。

但由于临时表仍然是“几乎普通”的表,因此也无法在副本上创建它。 至少目前来说,虽然相应的补丁已经流传很长时间了。

1.2. 未记录的表

但是你应该怎么做,例如,如果你有某种繁琐的 ETL 流程,无​​法在一个事务中实现,但你仍然有 pgbouncer 处于交易模式?..

或者数据流量太大 一个连接上没有足够的带宽 从数据库(读取,每个 CPU 一个进程)?...

或者正在进行一些操作 异步地 在不同的连接中?...

这里只有一个选择—— 临时创建一个非临时表。 双关语,是的。 那是:

  • 创建具有最大随机名称的“我自己的”表,以免与任何人相交
  • 提取:用外部来源的数据填充它们
  • 改造:已转换,填写关键链接字段
  • 加载:将准备好的数据倒入目标表中
  • 删除了“我的”表

而现在——美中不足。 实际上, PostgreSQL 中的所有写入都会发生两次 - WAL 中的第一个,然后进入表/索引体。 所有这一切都是为了支持 ACID 和正确的数据可见性 COMMIT'疯狂和 ROLLBACK'空交易。

但我们不需要这个! 我们有全流程 要么完全成功,要么没有。。 无论有多少中间交易,我们对“从中间继续该过程”不感兴趣,尤其是当不清楚它在哪里时。

为此,PostgreSQL 开发人员早在 9.1 版本中就引入了这样的东西 未记录的表:

通过此指示,该表将被创建为未记录的。 写入未记录表的数据不会经过预写日志(请参阅第 29 章),导致此类表 工作比平常快得多。 然而,他们也难免会失败。 在服务器故障或紧急关闭的情况下,未记录的表 自动截断。 此外,未记录表的内容 没有被复制 到从属服务器。 在未记录的表上创建的任何索引都会自动变为未记录的。

总之, 会快得多,但是如果数据库服务器“宕机”了,那就不好受了。 但是这种情况多久发生一次,您的 ETL 流程是否知道如何在“重振”数据库后“从中间”正确纠正这种情况?...

如果没有,并且上面的情况与您的情况类似,请使用 UNLOGGED但从来没有 不要在真实表上启用此属性,其中的数据对您来说很珍贵。

1.3. 提交时{删除行| 降低}

此构造允许您在创建表时指定事务完成时的自动行为。

Про ON COMMIT DROP 我已经在上面写过,它会生成 DROP TABLE,但与 ON COMMIT DELETE ROWS 这种情况更有趣 - 它是在这里生成的 TRUNCATE TABLE.

由于存储临时表元描述的整个基础架构与常规表完全相同,因此 不断地创建和删除临时表导致系统表严重“膨胀” pg_class、pg_attribute、pg_attrdef、pg_depend、…

现在想象一下,您有一个直接连接到数据库的工作人员,它每秒打开一个新事务,创建、填充、处理和删除临时表...系统表中会积累过多的垃圾,并且这将导致每次操作都需要额外的制动。

一般来说,不要这样做! 在这种情况下,它更有效 CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS 将其从事务周期中取出 - 然后在每个新事务开始时,表已经是 将会存在 (保存通话 CREATE),但 将是空的,благодаря TRUNCATE (我们还保存了它的调用)完成上一个事务时。

1.4. 喜欢...包括...

我在开始时提到临时表的典型用例之一是各种导入 - 开发人员疲倦地将目标表的字段列表复制粘贴到他的临时表的声明中......

但懒惰是进步的动力! 这就是为什么 “基于样本”创建一个新表 它可以更简单:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

由于您可以在该表中生成大量数据,因此搜索它永远不会很快。 但有一个传统的解决方案 - 索引! 是的, 临时表也可以有索引.

由于通常所需的索引与目标表的索引一致,因此您可以简单地编写 LIKE target_table INCLUDING INDEXES.

如果您还需要 DEFAULT-values(例如填写主键值),可以使用 LIKE target_table INCLUDING DEFAULTS。 或者简单地—— LIKE target_table INCLUDING ALL — 复制默认值、索引、约束...

但在这里你需要明白,如果你创建了 立即导入带有索引的表,那么数据将需要更长的时间来加载比你先填满所有内容,然后才汇总索引 - 看看它是如何做到这一点的示例 pg_dump.

总体而言, RTFM!

2、怎么写?

我只想说——用它 COPY-流动而不是“包装” INSERT, 有时加速。 您甚至可以直接从预先生成的文件中。

3、如何加工?

那么,让我们的介绍看起来像这样:

  • 您的数据库中有一个包含客户数据的表 1万条记录
  • 每天都有客户向您发送新的 完整的“图像”
  • 根据经验你知道有时 更改的记录不超过 10K

这种情况的一个典型例子是 KLADR基地 - 总共有很多地址,但在每周上传的数据中,即使在全国范围内,也很少有变化(定居点的重命名、街道的合并、新房屋的外观)。

3.1. 全同步算法

为简单起见,假设您甚至不需要重组数据 - 只需将表转换为所需的形式,即:

  • 清除 一切不再存在的事物
  • 更新 一切已经存在并需要更新的东西
  • 插入 一切尚未发生的事情

为什么要按这个顺序进行操作? 因为这是表大小增长最小的方式(记住 MVCC!).

从 dst 中删除

不,当然,您只需执行两个操作即可:

  • 清除 (DELETE)一切都一般
  • 插入 全部来自新图像

但同时,感谢 MVCC, 表的大小将恰好增加两倍! 由于 1K 更新而获取表中记录的 +10M 图像是马马虎虎的冗余......

截断 dst

经验丰富的开发人员知道,整个平板电脑的清洁成本非常低:

  • 清洁 (TRUNCATE) 整个表
  • 插入 全部来自新图像

该方法有效, 有时非常适用,但是有一个问题...我们将在很长一段时间内添加 1M 条记录,因此我们不能一直将表留空(如果不将其包装在单个事务中就会发生这种情况)。

这意味着:

  • 我们开始了 长时间运行的事务
  • TRUNCATE 施加 访问独家-阻塞
  • 我们插入了很长时间,而其他人此时 甚至不能 SELECT

有事进展不顺利...

更改表...重命名.../删除表...

另一种方法是将所有内容填充到一个单独的新表中,然后简单地将其重命名以代替旧表。 一些令人讨厌的小事情:

  • 仍然也是 访问独家,尽管时间明显减少
  • 该表的所有查询计划/统计信息均已重置, 需要运行分析
  • 所有外键都坏了 (FK) 到桌子上

Simon Riggs 有一个 WIP 补丁建议制作 ALTER- 在文件级别替换表体的操作,不触及统计信息和FK,但不收集仲裁。

删除、更新、插入

因此,我们选择了三个操作的非阻塞选项。 快三个了...如何最有效地做到这一点?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. 导入后处理

在同一个 KLADR 中,所有更改的记录都必须另外进行后处理 - 标准化、突出显示关键字并缩减为所需的结构。 但你怎么知道—— 究竟发生了什么变化不使同步代码复杂化,理想情况下根本不接触它?

如果只有您的进程在同步时具有写访问权限,那么您可以使用触发器来为我们收集所有更改:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

现在我们可以在开始同步之前应用触发器(或通过启用它们 ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

然后我们冷静地从日志表中提取我们需要的所有更改,并通过其他处理程序运行它们。

3.3. 导入链接集

上面我们考虑了源和目标的数据结构相同的情况。 但是,如果从外部系统上传的格式与我们数据库中的存储结构不同怎么办?

让我们以客户及其帐户的存储为例,即经典的“多对一”选项:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

但是从外部源的下载以“all in one”的形式出现在我们面前:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

显然,这个版本中客户数据是可以重复的,主要记录是“account”:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

对于模型,我们将简单地插入测试数据,但请记住 - COPY 更高效!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

首先,让我们强调一下我们的“事实”所指的那些“削减”。 在我们的例子中,发票指的是客户:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

为了正确地将帐户与客户 ID 关联起来,我们首先需要找出或生成这些标识符。 让我们在它们下面添加字段:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

让我们使用上面描述的表同步方法,稍加修改 - 我们不会更新或删除目标表中的任何内容,因为我们“仅追加”导入客户端:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

其实一切都在 invoice_import 现在我们已经填写了联系字段 client_id,我们将用它插入发票。

来源: habr.com

添加评论