Parallella frågor i PostgreSQL

Parallella frågor i PostgreSQL
Moderna processorer har många kärnor. I flera år har applikationer skickat frågor till databaser parallellt. Om det är en rapportfråga på flera rader i en tabell, går den snabbare när man använder flera processorer, och PostgreSQL har kunnat göra detta sedan version 9.6.

Det tog 3 år att implementera den parallella frågefunktionen - vi var tvungna att skriva om koden i olika skeden av frågekörningen. PostgreSQL 9.6 introducerade infrastruktur för att ytterligare förbättra koden. I efterföljande versioner exekveras andra typer av frågor parallellt.

Begränsningar

  • Aktivera inte parallell körning om alla kärnor redan är upptagna, annars kommer andra förfrågningar att sakta ner.
  • Viktigast av allt, parallell bearbetning med höga WORK_MEM-värden använder mycket minne - varje hash-join eller sortering tar upp work_mem-minne.
  • OLTP-frågor med låg latens kan inte påskyndas genom parallell exekvering. Och om frågan returnerar en rad kommer parallell bearbetning bara att sakta ner den.
  • Utvecklare älskar att använda TPC-H-riktmärket. Kanske har du liknande frågor för perfekt parallellt exekvering.
  • Endast SELECT-frågor utan predikatlåsning exekveras parallellt.
  • Ibland är korrekt indexering bättre än sekventiell tabellskanning i parallellt läge.
  • Pausa frågor och markörer stöds inte.
  • Fönsterfunktioner och ordnade aggregatfunktioner är inte parallella.
  • Du vinner ingenting på I/O-arbetsbelastningen.
  • Det finns inga parallella sorteringsalgoritmer. Men frågor med sortering kan utföras parallellt i vissa aspekter.
  • Ersätt CTE (MED ...) med en kapslad SELECT för att möjliggöra parallell bearbetning.
  • Datainpackningar från tredje part stöder ännu inte parallell bearbetning (men de skulle kunna!)
  • FULL OUTER JOIN stöds inte.
  • max_rows inaktiverar parallell bearbetning.
  • Om en fråga har en funktion som inte är märkt PARALLELL SAFE, kommer den att vara entrådig.
  • Transaktionsisoleringsnivån SERIALIZABLE inaktiverar parallell bearbetning.

Testmiljö

PostgreSQL-utvecklare försökte minska svarstiden för TPC-H benchmark-frågor. Ladda ner benchmark och anpassa den till PostgreSQL. Detta är en inofficiell användning av TPC-H-riktmärket - inte för databas- eller hårdvarujämförelse.

  1. Ladda ner TPC-H_Tools_v2.17.3.zip (eller nyare version) från TPC offsite.
  2. Byt namn på makefile.suite till Makefile och ändra enligt beskrivningen här: https://github.com/tvondra/pg_tpch . Kompilera koden med kommandot make.
  3. Generera data: ./dbgen -s 10 skapar en 23 GB databas. Detta är tillräckligt för att se skillnaden i prestanda för parallella och icke-parallella frågor.
  4. Konvertera filer tbl в csv с for и sed.
  5. Klona förvaret pg_tpch och kopiera filerna csv в pg_tpch/dss/data.
  6. Skapa frågor med ett kommando qgen.
  7. Ladda data till databasen med kommandot ./tpch.sh.

Parallell sekventiell skanning

Det kan vara snabbare inte på grund av parallell läsning, utan på grund av att data sprids över många CPU-kärnor. I moderna operativsystem cachelagras PostgreSQL-datafiler väl. Med läsning framåt är det möjligt att få ett större block från lagring än vad PG-demonen begär. Därför begränsas inte frågeprestanda av disk I/O. Den förbrukar CPU-cykler för att:

  • läs rader en i taget från tabellsidor;
  • jämför strängvärden och villkor WHERE.

Låt oss köra en enkel fråga select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Den sekventiella genomsökningen producerar för många rader utan aggregering, så frågan exekveras av en enda CPU-kärna.

Om du lägger till SUM(), kan du se att två arbetsflöden hjälper till att påskynda frågan:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Parallell aggregation

Parallell Seq Scan-noden producerar rader för partiell aggregering. Noden "Partial Aggregate" trimmar dessa linjer med hjälp av SUM(). I slutet samlas SUM-räknaren från varje arbetsprocess av noden "Gather".

Det slutliga resultatet beräknas av noden "Finalize Aggregate". Om du har dina egna aggregeringsfunktioner, glöm inte att markera dem som "parallellsäkra".

Antal arbetsprocesser

Antalet arbetsprocesser kan ökas utan att starta om servern:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Vad händer här? Det fanns 2 gånger fler arbetsprocesser, och begäran blev bara 1,6599 gånger snabbare. Beräkningarna är intressanta. Vi hade 2 arbetsprocesser och 1 ledare. Efter bytet blev det 4+1.

Vår maximala hastighet från parallell bearbetning: 5/3 = 1,66(6) gånger.

Hur fungerar det?

Processerna

Exekvering av begäran börjar alltid med den ledande processen. Ledaren gör allt icke-parallellt och viss parallell bearbetning. Andra processer som utför samma förfrågningar kallas arbetsprocesser. Parallell bearbetning använder infrastruktur dynamiska bakgrundsarbetarprocesser (från version 9.4). Eftersom andra delar av PostgreSQL använder processer snarare än trådar, kan en fråga med 3 arbetsprocesser vara 4 gånger snabbare än traditionell bearbetning.

Interaktion

Arbetarprocesser kommunicerar med ledaren genom en meddelandekö (baserat på delat minne). Varje process har 2 köer: för fel och för tupler.

Hur många arbetsflöden behövs?

Minimigränsen anges av parametern max_parallel_workers_per_gather. Förfrågningsledaren tar sedan arbetsprocesser från poolen som begränsas av parametern max_parallel_workers size. Den sista begränsningen är max_worker_processes, det vill säga det totala antalet bakgrundsprocesser.

Om det inte var möjligt att allokera en arbetsprocess kommer behandlingen att vara en process.

Frågeplaneraren kan minska arbetsflöden beroende på storleken på tabellen eller indexet. Det finns parametrar för detta min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Varje gång är bordet 3 gånger större än min_parallel_(index|table)_scan_size, Postgres lägger till en arbetsprocess. Antalet arbetsflöden baseras inte på kostnader. Cirkulärt beroende gör komplexa implementeringar svåra. Istället använder planeraren enkla regler.

I praktiken är dessa regler inte alltid lämpliga för produktion, så du kan ändra antalet arbetsprocesser för en specifik tabell: ALTER TABLE ... SET (parallel_workers = N).

Varför används inte parallell bearbetning?

Utöver den långa listan med restriktioner finns det även kostnadskontroller:

parallel_setup_cost - att undvika parallell behandling av korta förfrågningar. Denna parameter uppskattar tiden för att förbereda minnet, starta processen och initialt datautbyte.

parallel_tuple_cost: kommunikationen mellan ledaren och arbetarna kan försenas i proportion till antalet tuplar från arbetsprocesser. Denna parameter beräknar kostnaden för datautbyte.

Kapslade slingor

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Insamlingen sker i det sista skedet, så Nested Loop Left Join är en parallell operation. Parallel Index Only Scan introducerades endast i version 10. Det fungerar på samma sätt som parallell seriell skanning. Skick c_custkey = o_custkey läser en order per kundsträng. Så det är inte parallellt.

Hash Gå med

Varje arbetsprocess skapar sin egen hashtabell fram till PostgreSQL 11. Och om det finns fler än fyra av dessa processer kommer inte prestandan att förbättras. I den nya versionen är hashtabellen delad. Varje arbetsprocess kan använda WORK_MEM för att skapa en hashtabell.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Fråga 12 från TPC-H visar tydligt en parallell hash-anslutning. Varje arbetsprocess bidrar till att skapa en gemensam hashtabell.

Slå samman Gå med

En sammanfogning är icke-parallell till sin natur. Oroa dig inte om detta är det sista steget i frågan - den kan fortfarande köras parallellt.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

"Merge Join"-noden är placerad ovanför "Gather Merge". Så sammanslagning använder inte parallell bearbetning. Men noden "Parallell Index Scan" hjälper fortfarande med segmentet part_pkey.

Anslutning via sektioner

I PostgreSQL 11 anslutning genom sektioner inaktiverad som standard: den har mycket dyr schemaläggning. Tabeller med liknande partitionering kan sammanfogas partition för partition. På så sätt kommer Postgres att använda mindre hashtabeller. Varje anslutning av sektioner kan vara parallella.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Huvudsaken är att anslutningen i sektioner är parallell endast om dessa sektioner är tillräckligt stora.

Parallell Append

Parallell Append kan användas istället för olika block i olika arbetsflöden. Detta händer vanligtvis med UNION ALL-frågor. Nackdelen är mindre parallellitet, eftersom varje arbetsprocess bara behandlar en begäran.

Det finns två arbetsprocesser som körs här, även om fyra är aktiverade.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

De viktigaste variablerna

  • WORK_MEM begränsar minnet per process, inte bara frågor: work_mem processer anslutningar = mycket minne.
  • max_parallel_workers_per_gather — hur många arbetsprocesser det körande programmet kommer att använda för parallell bearbetning från planen.
  • max_worker_processes — justerar det totala antalet arbetsprocesser till antalet CPU-kärnor på servern.
  • max_parallel_workers - samma, men för parallella arbetsprocesser.

Resultat av

Från och med version 9.6 kan parallell bearbetning avsevärt förbättra prestandan för komplexa frågor som skannar många rader eller index. I PostgreSQL 10 är parallell bearbetning aktiverad som standard. Kom ihåg att inaktivera det på servrar med stor OLTP-arbetsbelastning. Sekventiella genomsökningar eller indexsökningar förbrukar mycket resurser. Om du inte kör en rapport på hela datamängden kan du förbättra frågeprestanda genom att helt enkelt lägga till saknade index eller använda korrekt partitionering.

referenser

Källa: will.com

Lägg en kommentar