PostgreSQL 中的平行查詢

PostgreSQL 中的平行查詢
現代 CPU 有很多核心。 多年來,應用程式一直向資料庫並行發送查詢。 如果是對錶中多行的報表查詢,使用多個CPU時運行速度會更快,PostgreSQL從9.6版本開始就可以做到這一點。

平行查詢功能的實作花了三年時間——我們必須在查詢執行的不同階段重寫程式碼。 PostgreSQL 3 引進了基礎設施來進一步改進程式碼。 在後續版本中,其他類型的查詢是並行執行的。

限制

  • 如果所有核心都已繁忙,請勿啟用並行執行,否則其他要求會減慢。
  • 最重要的是,具有高 WORK_MEM 值的平行處理會使用大量記憶體 - 每個哈希連接或排序都會佔用 work_mem 記憶體。
  • 低延遲 OLTP 查詢無法透過並行執行來加速。 如果查詢傳回一行,並行處理只會減慢速度。
  • 開發人員喜歡使用 TPC-H 基準測試。 也許您對完美並行執行有類似的查詢。
  • 僅並行執行沒有謂詞鎖定的 SELECT 查詢。
  • 有時,正確的索引比並行模式下的順序表掃描更好。
  • 不支援暫停查詢和遊標。
  • 視窗函數和有序集聚合函數不並行。
  • 您不會從 I/O 工作負載中獲得任何好處。
  • 沒有並行排序演算法。 但在某些方面,帶有排序的查詢可以並行執行。
  • 將 CTE (WITH ...) 替換為巢狀 SELECT 以啟用平行處理。
  • 第三方資料包裝器尚不支援並行處理(但它們可以!)
  • 不支援 FULL OUTER JOIN。
  • max_rows 停用並行處理。
  • 如果查詢具有未標記為 PARALLEL SAFE 的函數,則它將是單線程的。
  • SERIALIZABLE 事務隔離等級會停用並行處理。

測試環境

PostgreSQL 開發人員試圖減少 TPC-H 基準查詢的回應時間。 下載基準測試並 使其適應 PostgreSQL。 這是 TPC-H 基準測試的非官方用途 - 不用於資料庫或硬體比較。

  1. 下載TPC-H_Tools_v2.17.3.zip(或更新版本) 來自 TPC 異地.
  2. 將 makefile.suite 重新命名為 Makefile 並如下所述進行變更: https://github.com/tvondra/pg_tpch 。 使用 make 指令編譯程式碼。
  3. 產生數據: ./dbgen -s 10 建立一個 23 GB 的資料庫。 這足以看出並行和非平行查詢的效能差異。
  4. 轉換檔案 tbl в csv с for и sed.
  5. 克隆存儲庫 pg_tpch 並複製文件 csv в pg_tpch/dss/data.
  6. 使用命令建立查詢 qgen.
  7. 使用命令將資料載入到資料庫中 ./tpch.sh.

平行順序掃描

它可能更快不是因為並行讀取,而是因為資料分佈在許多 CPU 核心上。 在現代作業系統中,PostgreSQL 資料檔案被很好地快取。 透過預讀,可以從儲存中取得比 PG 守護程式請求更大的區塊。 因此,查詢效能不受磁碟 I/O 的限制。 它消耗 CPU 週期來:

  • 從表頁一次讀取一行;
  • 比較字串值和條件 WHERE.

讓我們執行一個簡單的查詢 select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

順序掃描產生太多行而沒有聚合,因此查詢由單一 CPU 核心執行。

如果你添加 SUM(),您可以看到兩個工作流程將有助於加快查詢速度:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

平行聚合

並行順序掃描節點產生用於部分聚合的行。 “部分聚合”節點使用以下方式修剪這些行 SUM()。 最後,「Gather」節點收集每個工作進程的 SUM 計數器。

最終結果由“Finalize Aggregate”節點計算。 如果您有自己的聚合函數,請不要忘記將它們標記為「並行安全」。

工作行程數

可以在不重新啟動伺服器的情況下增加工作進程的數量:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

這裡發生了什麼事? 工作進程增加了 2 倍,而請求速度僅提高了 1,6599 倍。 計算很有趣。 我們有 2 個工作流程和 1 個領導進程。 改後變成了4+1。

並行處理的最大加速比:5/3 = 1,66(6) 倍。

它是如何工作的呢?

流程

請求執行始終從主導程序開始。 領導者執行所有非平行處理和一些並行處理。 執行相同請求的其他進程稱為工作進程。 並行處理使用基礎設施 動態後台工作行程 (從版本 9.4 開始)。 由於 PostgreSQL 的其他部分使用進程而不是線程,因此具有 3 個工作進程的查詢可能比傳統處理快 4 倍。

相互作用

Worker 程序透過訊息佇列(基於共享記憶體)與 Leader 進行通訊。 每個行程有 2 個佇列:用於錯誤和元組。

需要多少個工作流程?

最小限制由參數指定 max_parallel_workers_per_gather。 然後,請求運行程序從受參數限制的池中獲取工作進程 max_parallel_workers size。 最後一個限制是 max_worker_processes,即後台進程總數。

如果無法分配工作進程,則處理將是單進程的。

查詢規劃器可以根據資料表或索引的大小減少工作流程。 有這個參數 min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

每次桌子都比原來大3倍 min_parallel_(index|table)_scan_size,Postgres增加了一個工作進程。 工作流程的數量並非基於成本。 循環依賴使得複雜的實作變得困難。 相反,規劃者使用簡單的規則。

實際上,這些規則並不總是適合生產,因此您可以更改特定表的工作進程數: ALTER TABLE ... SET (parallel_workers = N).

為什麼不使用並行處理?

除了一長串的限制之外,還有成本檢查:

parallel_setup_cost - 避免並行處理短請求。 此參數估計準備記憶體、啟動程序和初始資料交換的時間。

parallel_tuple_cost:領導者和工作人員之間的通訊可能會根據工作流程中的元組數量按比例延遲。 此參數計算資料交換的成本。

嵌套循環連接

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

收集發生在最後階段,因此 Nested Loop Left Join 是一個並行操作。 僅並行索引掃描僅在版本 10 中引入。它的工作原理與平行串行掃描類似。 健康)狀況 c_custkey = o_custkey 每個客戶字串讀取一個訂單。 所以它不是平行的。

哈希連接

每個工作進程都會建立自己的哈希表,直到 PostgreSQL 11。如果這些進程超過四個,效能將不會提高。 在新版本中,哈希表是共享的。 每個工作進程都可以使用 WORK_MEM 建立哈希表。

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

TPC-H 的查詢 12 清楚地顯示了平行哈希連接。 每個工作進程都有助於建立一個公共哈希表。

合併連接

合併聯結本質上是非並行的。 如果這是查詢的最後一步,請不要擔心 - 它仍然可以並行運行。

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

「Merge Join」節點位於「Gather Merge」之上。 所以合併不使用並行處理。 但「並行索引掃描」節點仍有助於該段 part_pkey.

分段連接

在 PostgreSQL 11 中 分段連接 預設為禁用:它的調度成本非常高。 具有相似分區的表可以按分區連接。 這樣 Postgres 將使用更小的哈希表。 每個部分的連接可以是平行的。

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

最重要的是,只有當這些部分足夠大時,這些部分的連接才是並行的。

平行追加

平行追加 可以在不同的工作流程中使用不同的區塊來代替。 這通常發生在 UNION ALL 查詢中。 缺點是並行性較差,因為每個工作進程只處理 1 個請求。

儘管已啟用 2 個工作進程,但這裡有 4 個工作進程正在運行。

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

最重要的變數

  • WORK_MEM 限制每個進程的內存,而不僅僅是查詢:work_mem 流程 連接=大量記憶體。
  • max_parallel_workers_per_gather — 執行程式將使用多少個工作流程來進行計畫中的平行處理。
  • max_worker_processes — 將工作進程總數調整為伺服器上的 CPU 核心數。
  • max_parallel_workers - 相同,但適用於平行工作流程。

結果

從版本 9.6 開始,並行處理可以大大提高掃描許多行或索引的複雜查詢的效能。 在 PostgreSQL 10 中,預設啟用並行處理。 請記住在具有大量 OLTP 工作負載的伺服器上停用它。 順序掃描或索引掃描會消耗大量資源。 如果您沒有對整個資料集運行報告,則可以透過簡單地添加缺少的索引或使用適當的分區來提高查詢效能。

引用

來源: www.habr.com

添加評論