Parallel queries in PostgreSQL

Parallel queries in PostgreSQL
Modern CPUs have a lot of cores. For years, applications have been sending queries to databases in parallel. If it is a reporting query against multiple rows in a table, it is faster when it uses multiple CPUs, and PostgreSQL has been able to do this since version 9.6.

It took 3 years to implement the parallel query feature - I had to rewrite the code at different stages of query execution. PostgreSQL 9.6 introduced the infrastructure to further improve the code. In subsequent versions, other types of queries are executed in parallel.

Restrictions

  • Do not enable parallel execution if all cores are already busy, otherwise other requests will slow down.
  • Most importantly, parallel processing with high WORK_MEM values ​​uses a lot of memory - each hash join or sort consumes memory in the amount of work_mem.
  • Low latency OLTP queries cannot be accelerated by parallel execution. And if the query returns a single row, parallel processing will only slow it down.
  • Developers love to use the TPC-H benchmark. Maybe you have similar queries for perfect parallel execution.
  • Only SELECT queries without predicate locking are executed in parallel.
  • Sometimes proper indexing is better than sequential table scans in parallel.
  • Query pausing and cursors are not supported.
  • Window functions and ordered set aggregate functions are not parallel.
  • You gain nothing in the I/O workload.
  • There are no parallel sorting algorithms. But queries with sorts can run in parallel in some aspects.
  • Replace CTE (WITH ...) with a nested SELECT to enable parallel processing.
  • Foreign data wrappers do not yet support parallel processing (but they could!)
  • FULL OUTER JOIN is not supported.
  • max_rows disables parallel processing.
  • If the request has a function that is not marked as PARALLEL SAFE, it will be single-threaded.
  • The transaction isolation level SERIALIZABLE disables parallel processing.

Test environment

PostgreSQL developers have tried to reduce the response time of TPC-H benchmark queries. Download the benchmark and adapt it to PostgreSQL. This is an unofficial use of the TPC-H benchmark - not for comparing databases or hardware.

  1. Download TPC-H_Tools_v2.17.3.zip (or later) from offsite TPC.
  2. Rename makefile.suite to Makefile and modify as described here: https://github.com/tvondra/pg_tpch . Compile the code with make.
  3. Generate data: ./dbgen -s 10 creates a 23 GB database. This is enough to see the performance difference between parallel and non-parallel queries.
  4. Convert files tbl в csv с for и sed.
  5. Clone the repository pg_tpch and copy the files csv в pg_tpch/dss/data.
  6. Create requests with a command qgen.
  7. Load the data into the database with the command ./tpch.sh.

Parallel Sequential Scan

It may be faster not because of parallel reading, but because the data is scattered across many CPU cores. In modern operating systems, PostgreSQL data files are well cached. With read-ahead, it is possible to get a block larger from storage than the PG daemon requests. Therefore, query performance is not limited by disk I/O. It consumes CPU cycles to:

  • read lines one by one from the pages of the table;
  • compare string values ​​and conditions WHERE.

Let's run a simple query select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Sequential scan gives too many rows without aggregation, so the query is executed by one CPU core.

If add SUM(), you can see that two workflows will help speed up the query:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Parallel Aggregation

The "Parallel Seq Scan" node produces rows for partial aggregation. The "Partial Aggregate" node truncates these rows with SUM(). At the end, the SUM counter from each workflow is collected by the Gather node.

The final result is calculated by the “Finalize Aggregate” node. If you have your own aggregation functions, don't forget to mark them as "parallel safe".

Number of worker processes

The number of worker processes can be increased without restarting the server:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

What's going on here? There are 2 times more worker processes, and the query is only 1,6599 times faster. The calculations are interesting. We had 2 worker processes and 1 leader. After the change it became 4+1.

Our maximum speedup from parallel processing: 5/3 = 1,66(6) times.

How does it work?

Processes

Execution of a query always starts with the leading process. The leader does everything non-parallel and some of the parallel processing. Other processes that perform the same requests are called worker processes. Parallel processing uses infrastructure dynamic background workflows (since version 9.4). Since other parts of PostgreSQL use processes rather than threads, a query with 3 worker processes could be 4 times faster than traditional processing.

Interaction

Worker processes communicate with the leader via a message queue (based on shared memory). Each process has 2 queues: for errors and for tuples.

How many worker processes do you need?

The minimum limit sets the parameter max_parallel_workers_per_gather. Then the request executor takes worker processes from the pool, limited by the parameter max_parallel_workers size. The last limitation is max_worker_processes, which is the total number of background processes.

If it was not possible to allocate a worker process, processing will be single-process.

The query planner can reduce workflows depending on the size of the table or index. There are options for this. min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Every time the table is 3 times larger than min_parallel_(index|table)_scan_size, Postgres adds a worker process. The number of workflows is not cost based. Circular dependency makes complex implementations difficult. Instead, the scheduler uses simple rules.

In practice, these rules are not always suitable for production, so it is possible to change the number of worker processes for a particular table: ALTER TABLE ... SET (parallel_workers = N).

Why is parallel processing not used?

In addition to the long list of restrictions, there are also cost checks:

parallel_setup_cost - to avoid parallel processing of short requests. This parameter estimates the time for memory preparation, process startup, and initial data exchange.

parallel_tuple_cost: communication between the leader and the workers can be delayed in proportion to the number of tuples from the worker processes. This parameter considers the cost of data exchange.

Nested Loop Joins - Nested Loop Join

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Gathering occurs in the last step, so Nested Loop Left Join is a parallel operation. Parallel Index Only Scan was only introduced in version 10. It works similar to a parallel sequential scan. Condition c_custkey = o_custkey reads one order for each client line. So it's not parallel.

Hash Join - Hash Join

Each worker process creates its own hash table before PostgreSQL 11. And if there are more than four of these processes, performance will not improve. In the new version, the hash table is shared. Each worker process can use WORK_MEM to create a hash table.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Query 12 from TPC-H illustrates a parallel hash join. Each worker process participates in the creation of a shared hash table.

Merge Join

A merge join is non-parallel in nature. Don't worry if this is the last step of the query - it can still run in parallel.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

The "Merge Join" node is above the "Gather Merge". So merge doesn't use parallel processing. But the “Parallel Index Scan” node still helps with the segment part_pkey.

Section connection

In PostgreSQL 11 connection by sections disabled by default: it has very expensive scheduling. Tables with similar partitioning can be joined section by section. This will make Postgres use smaller hash tables. Each connection of sections can be parallel.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

The main thing is that the connection by sections is parallel only if these sections are large enough.

Parallel Append

Parallel Append can be used instead of different blocks in different workflows. This usually happens with UNION ALL queries. The downside is less concurrency, as each worker process only handles 1 request.

There are 2 worker processes running here, although 4 are enabled.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Most Important Variables

  • WORK_MEM limits the amount of memory per process, not just requests: work_mem processes connections = a lot of memory.
  • max_parallel_workers_per_gather - how many worker processes the executing program will use for parallel processing from the plan.
  • max_worker_processes - adjusts the total number of worker processes to the number of CPU cores on the server.
  • max_parallel_workers - the same, but for parallel workflows.

Results

Starting with version 9.6, parallel processing can greatly improve the performance of complex queries that scan many rows or indexes. In PostgreSQL 10, parallel processing is enabled by default. Don't forget to disable it on servers with a heavy OLTP workload. Sequential or index scans are very resource intensive. If you are not reporting on the entire dataset, queries can be made more performant by simply adding missing indexes or by using proper partitioning.

references

Source: habr.com

Add a comment