DBA:有效地組織同步和導入

對於大數據集的複雜處理(不同 ETL流程:導入、轉換和與外部來源同步)經常有需要 暫時「記住」並立即快速處理 大量的東西。

這類典型任務通常聽起來像這樣: 「就在這兒 會計部門向客戶銀行卸載 最後收到的付款,您需要快速將其上傳到網站並將其連結到您的帳戶。”

但是,當這個「東西」的體積開始達到數百兆字節,並且該服務必須繼續 24x7 與資料庫一起工作時,就會出現許多副作用,從而毀掉您的生活。
DBA:有效地組織同步和導入
要在 PostgreSQL 中(而不僅僅是在 PostgreSQL 中)處理它們,您可以使用一些最佳化,這將使您能夠更快地處理所有內容並減少資源消耗。

1. 運送到哪裡?

首先,讓我們決定可以在哪裡上傳我們想要「處理」的資料。

1.1. 臨時表(TEMPORARY TABLE)

原則上,PostgreSQL 臨時表與其他臨時表相同。 因此,迷信如 “那裡的一切都只儲存在記憶中,一切都會結束”。 但也存在一些顯著差異。

每個資料庫連接都有您自己的“命名空間”

如果兩個連線嘗試同時連接 CREATE TABLE x,那麼一定有人會得到 非唯一性錯誤 資料庫對象。

但如果兩者都嘗試執行 CREATE TEMPORARY TABLE x,那麼兩個人都會正常做,每個人都會得到 你的副本 表。 他們之間不會有任何共同點。

斷開連線時“自毀”

當連線關閉時,所有臨時表都會自動刪除,所以需要手動刪除 DROP TABLE x 沒有任何意義,除了...

如果您正在通過 pgbouncer 處於交易模式,那麼資料庫繼續認為該連接仍然處於活動狀態,並且其中該臨時表仍然存在。

因此,嘗試從與 pgbouncer 的不同連接再次建立它,將導致錯誤。 但這可以透過使用來規避 CREATE TEMPORARY TABLE IF NOT EXISTS x.

確實,無論如何最好不要這樣做,因為這樣你就可以「突然」在那裡找到「前所有者」留下的數據。 相反,最好閱讀手冊並了解在創建表時可以添加 ON COMMIT DROP - 即當交易完成時,該表將會自動刪除。

不可複製

由於它們僅屬於特定連接,因此不會複製臨時表。 但 這消除了雙重記錄數據的需要 在堆+ WAL 中,因此插入/更新/刪除速度要快得多。

但由於臨時表仍然是「幾乎普通」的表,因此也無法在副本上建立它。 至少目前來說,雖然相應的補丁已經流傳很久了。

1.2. 未記錄的表

但是你應該怎麼做,例如,如果你有某種繁瑣的 ETL 流程,無論在一個事務中實現,但你仍然有 pgbouncer 處於交易模式?..

或是數據流量太大 一個連線上沒有足夠的頻寬 從資料庫(讀取,每個 CPU 一個行程)?...

或正在進行一些操作 非同步地 在不同的連接中?...

這裡只有一個選擇—— 臨時建立一個非臨時表。 雙關語,是的。 那是:

  • 建立具有最大隨機名稱的「我自己的」表,以免與任何人相交
  • 提取:用外部來源的資料填充它們
  • 改造:已轉換,填寫關鍵連結字段
  • 加載:將準備好的資料倒入目標表中
  • 刪除了「我的」表

而現在——美中不足。 實際上, PostgreSQL 中的所有寫入都會發生兩次 - WAL 中的第一個,然後進入表/索引體。 所有這一切都是為了支援 ACID 和正確的資料可見性 COMMIT'瘋狂和 ROLLBACK'空交易。

但我們不需要這個! 我們有全流程 要嘛完全成功,要嘛沒有。。 無論有多少中間交易,我們對「從中間繼續這個過程」不感興趣,尤其是當不清楚它在哪裡時。

為此,PostgreSQL 開發人員早在 9.1 版本就引進了這樣的東西 未記錄的表:

透過此指示,該表將被建立為未記錄的。 寫入未記錄表的資料不會經過預先寫入日誌(請參閱第 29 章),導致此類表 工作比平常快得多。 然而,他們也難免會失敗。 在伺服器故障或緊急關閉的情況下,未記錄的表 自動截斷。 此外,未記錄表格的內容 沒有被複製 到從屬伺服器。 在未記錄的表上建立的任何索引都會自動變為未記錄的。

總之, 會快得多,但是如果資料庫伺服器「宕機」了,那就不好受了。 但是這種情況多久發生一次,您的 ETL 流程是否知道如何在「重振」資料庫之後「從中間」正確糾正這種情況?...

如果沒有,並且上面的情況與您的情況類似,請使用 UNLOGGED但從來沒有 請勿在真實表上啟用此屬性,其中的數據對您來說很珍貴。

1.3. 提交時{刪除行| 降低}

此構造允許您在建立表時指定事務完成時的自動行為。

Про ON COMMIT DROP 我已經在上面寫過,它會生成 DROP TABLE,但與 ON COMMIT DELETE ROWS 這種情況更有趣 - 它是在這裡生成的 TRUNCATE TABLE.

由於儲存臨時表元描述的整個基礎架構與常規表完全相同,因此 不斷地建立和刪除臨時表導致系統表嚴重“膨脹” pg_class、pg_attribute、pg_attrdef、pg_depend、…

現在想像一下,您有一個直接連接到資料庫的工作人員,它每秒打開一個新事務,創建、填充、處理和刪除臨時表...系統表中會積累過多的垃圾,並且這將導致每次操作都需要額外的煞車。

一般來說,不要這樣做! 在這種情況下,它更有效 CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS 將其從事務週期中取出 - 然後在每個新事務開始時,表已經是 將會存在 (儲存通話 CREATE), 但 將是空的, 謝謝 TRUNCATE (我們也保存了它的呼叫)完成上一個事務時。

1.4. 喜歡...包括...

我在開始時提到臨時表的典型用例之一是各種導入 - 開發人員疲倦地將目標表的字段列表複製並貼上到他的臨時表的聲明中...

但懶惰是進步的動力! 這就是為什麼 「基於樣本」建立一個新表 它可以更簡單:

CREATE TEMPORARY TABLE import_table(
  LIKE target_table
);

由於您可以在該表中產生大量數據,因此搜尋它永遠不會很快。 但有一個傳統的解決方案 - 索引! 是的, 臨時表也可以有索引.

由於通常所需的索引與目標表的索引一致,因此您可以簡單地編寫 LIKE target_table INCLUDING INDEXES.

如果您還需要 DEFAULT-values(例如填入主鍵值),可以使用 LIKE target_table INCLUDING DEFAULTS。 或者簡單地—— LIKE target_table INCLUDING ALL — 複製預設值、索引、約束...

但在這裡你需要明白,如果你創建了 立即導入帶有索引的表,那麼資料將需要更長的時間來加載比你先填滿所有內容,然後才匯總索引 - 看看它是如何做到這一點的示例 pg_dump.

在一般情況下, RTFM!

2、怎麼寫?

我只想說——用它 COPY-流動而不是“包裝” INSERT, 有時加速。 您甚至可以直接從預先產生的文件中。

3、如何加工?

那麼,讓我們的介紹看起來像這樣:

  • 您的資料庫中有一個包含客戶資料的表 1萬筆記錄
  • 每天都有客戶向您發送新的 完整的“圖像”
  • 根據經驗你知道有時 更改的記錄不超過 10K

這種情況的典型例子是 KLADR基地 - 總共有許多地址,但在每週上傳的資料中,即使在全國範圍內,也很少有變化(定居點的重命名、街道的合併、新房屋的外觀)。

3.1. 全同步演算法

為簡單起見,假設您甚至不需要重組資料 - 只需將表轉換為所需的形式,即:

  • 清除 一切不再存在的事物
  • 更新 一切已經存在並需要更新的東西
  • 插入 一切尚未發生的事情

為什麼要照這個順序來操作? 因為這是表大小增長最小的方式(記住MVCC!).

從 dst 中刪除

不,當然,您只需執行兩個操作即可:

  • 清除 (DELETE)一切都一般
  • 插入 全部來自新圖像

但同時,感謝 MVCC, 表格的大小將恰好增加兩倍! 由於 1K 更新而獲取表中記錄的 +10M 影像是馬馬虎虎的冗餘...

截斷 dst

經驗豐富的開發人員知道,整台平板電腦的清潔成本非常低:

  • 清潔 (TRUNCATE) 整個表
  • 插入 全部來自新圖像

方法有效, 有時非常適用,但是有一個問題...我們將在很長一段時間內添加 1M 條記錄,因此我們不能一直將表留空(如果不將其包裝在單一事務中就會發生這種情況)。

意思是:

  • 我們開始了 長時間運行的事務
  • TRUNCATE 施加 訪問獨家-阻塞
  • 我們插入了很長一段時間,而其他人此時 甚至不能 SELECT

有事進展不順利...

更改表格...重新命名.../刪除表...

另一種方法是將所有內容填入單獨的新表中,然後簡單地將其重新命名以取代舊表。 一些令人討厭的小事:

  • 仍然也是 訪問獨家,儘管時間明顯減少
  • 該表的所有查詢計劃/統計資料均已重置, 需要運行分析
  • 所有外鍵都壞了 (FK) 到桌上

Simon Riggs 有一個 WIP 補丁建議製作 ALTER- 在文件層級替換表體的操作,不觸及統計資料和FK,但不收集仲裁。

刪除、更新、插入

因此,我們選擇了三個操作的非阻塞選項。 快三個了...如何最有效地做到這一點?

-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний
BEGIN;

-- создаем временную таблицу с импортируемыми данными
CREATE TEMPORARY TABLE tmp(
  LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами
) ON COMMIT DROP; -- за рамками транзакции она нам не нужна

-- быстро-быстро вливаем новый образ через COPY
COPY tmp FROM STDIN;
-- ...
-- .

-- удаляем отсутствующие
DELETE FROM
  dst D
USING
  dst X
LEFT JOIN
  tmp Y
    USING(pk1, pk2) -- поля первичного ключа
WHERE
  (D.pk1, D.pk2) = (X.pk1, X.pk2) AND
  Y IS NOT DISTINCT FROM NULL; -- "антиджойн"

-- обновляем оставшиеся
UPDATE
  dst D
SET
  (f1, f2, f3) = (T.f1, T.f2, T.f3)
FROM
  tmp T
WHERE
  (D.pk1, D.pk2) = (T.pk1, T.pk2) AND
  (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие

-- вставляем отсутствующие
INSERT INTO
  dst
SELECT
  T.*
FROM
  tmp T
LEFT JOIN
  dst D
    USING(pk1, pk2)
WHERE
  D IS NOT DISTINCT FROM NULL;

COMMIT;

3.2. 導入後處理

在同一個 KLADR 中,所有變更的記錄都必須另外進行後處理 - 標準化、突出顯示關鍵字並縮減為所需的結構。 但你怎麼知道—— 究竟發生了什麼變化不使同步程式碼複雜化,理想情況下根本不接觸它?

如果只有您的進程在同步時具有寫入存取權限,那麼您可以使用觸發器來為我們收集所有變更:

-- целевые таблицы
CREATE TABLE kladr(...);
CREATE TABLE kladr_house(...);

-- таблицы с историей изменений
CREATE TABLE kladr$log(
  ro kladr, -- тут лежат целые образы записей старой/новой
  rn kladr
);

CREATE TABLE kladr_house$log(
  ro kladr_house,
  rn kladr_house
);

-- общая функция логирования изменений
CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$
DECLARE
  dst varchar = TG_TABLE_NAME || '$log';
  stmt text = '';
BEGIN
  -- проверяем необходимость логгирования при обновлении записи
  IF TG_OP = 'UPDATE' THEN
    IF NEW IS NOT DISTINCT FROM OLD THEN
      RETURN NEW;
    END IF;
  END IF;
  -- создаем запись лога
  stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES(';
  CASE TG_OP
    WHEN 'INSERT' THEN
      EXECUTE stmt || 'NULL,$1)' USING NEW;
    WHEN 'UPDATE' THEN
      EXECUTE stmt || '$1,$2)' USING OLD, NEW;
    WHEN 'DELETE' THEN
      EXECUTE stmt || '$1,NULL)' USING OLD;
  END CASE;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

現在我們可以在開始同步之前應用觸發器(或透過啟用它們 ALTER TABLE ... ENABLE TRIGGER ...):

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

CREATE TRIGGER log
  AFTER INSERT OR UPDATE OR DELETE
  ON kladr_house
    FOR EACH ROW
      EXECUTE PROCEDURE diff$log();

然後我們冷靜地從日誌表中提取我們需要的所有更改,並透過其他處理程序運行它們。

3.3. 導入連結集

上面我們考慮了來源和目標的資料結構相同的情況。 但是,如果從外部系統上傳的格式與我們資料庫中的儲存結構不同呢?

讓我們以客戶及其帳戶的儲存為例,即經典的「多對一」選項:

CREATE TABLE client(
  client_id
    serial
      PRIMARY KEY
, inn
    varchar
      UNIQUE
, name
    varchar
);

CREATE TABLE invoice(
  invoice_id
    serial
      PRIMARY KEY
, client_id
    integer
      REFERENCES client(client_id)
, number
    varchar
, dt
    date
, sum
    numeric(32,2)
);

但是從外部來源的下載以「all in one」的形式出現在我們面前:

CREATE TEMPORARY TABLE invoice_import(
  client_inn
    varchar
, client_name
    varchar
, invoice_number
    varchar
, invoice_dt
    date
, invoice_sum
    numeric(32,2)
);

顯然,這個版本中客戶資料是可以重複的,主要記錄是「account」:

0123456789;Вася;A-01;2020-03-16;1000.00
9876543210;Петя;A-02;2020-03-16;666.00
0123456789;Вася;B-03;2020-03-16;9999.00

對於模型,我們將簡單地插入測試數據,但請記住 - COPY 更有效率!

INSERT INTO invoice_import
VALUES
  ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00)
, ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00)
, ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);

首先,讓我們強調一下我們的「事實」所指的那些「削減」。 在我們的例子中,發票指的是客戶:

CREATE TEMPORARY TABLE client_import AS
SELECT DISTINCT ON(client_inn)
-- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы
  client_inn inn
, client_name "name"
FROM
  invoice_import;

為了正確地將帳戶與客戶 ID 關聯起來,我們首先需要找出或產生這些標識符。 讓我們在它們下面添加字段:

ALTER TABLE invoice_import ADD COLUMN client_id integer;
ALTER TABLE client_import ADD COLUMN client_id integer;

讓我們使用上面描述的表同步方法,稍加修改 - 我們不會更新或刪除目標表中的任何內容,因為我們「僅追加」匯入客戶端:

-- проставляем в таблице импорта ID уже существующих записей
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  client D
WHERE
  T.inn = D.inn; -- unique key

-- вставляем отсутствовавшие записи и проставляем их ID
WITH ins AS (
  INSERT INTO client(
    inn
  , name
  )
  SELECT
    inn
  , name
  FROM
    client_import
  WHERE
    client_id IS NULL -- если ID не проставился
  RETURNING *
)
UPDATE
  client_import T
SET
  client_id = D.client_id
FROM
  ins D
WHERE
  T.inn = D.inn; -- unique key

-- проставляем ID клиентов у записей счетов
UPDATE
  invoice_import T
SET
  client_id = D.client_id
FROM
  client_import D
WHERE
  T.client_inn = D.inn; -- прикладной ключ

其實一切都在 invoice_import 現在我們已經填寫了聯絡字段 client_id,我們將用它插入發票。

來源: www.habr.com

添加評論