Parallelle query's in PostgreSQL

Parallelle query's in PostgreSQL
Moderne CPU's hebben veel cores. Applicaties sturen al jaren parallel query's naar databases. Als het een rapportquery is op meerdere rijen in een tabel, werkt deze sneller bij gebruik van meerdere CPU's, en PostgreSQL kan dit doen sinds versie 9.6.

Het duurde drie jaar om de parallelle query-functie te implementeren. We moesten de code herschrijven in verschillende stadia van de query-uitvoering. PostgreSQL 3 introduceerde infrastructuur om de code verder te verbeteren. In volgende versies worden andere typen query's parallel uitgevoerd.

Beperkingen

  • Schakel parallelle uitvoering niet in als alle kernen al bezet zijn, anders zullen andere verzoeken vertragen.
  • Het belangrijkste is dat parallelle verwerking met hoge WORK_MEM-waarden veel geheugen gebruikt: elke hash-join of sortering neemt work_mem-geheugen in beslag.
  • OLTP-query's met lage latentie kunnen niet worden versneld door parallelle uitvoering. En als de query één rij retourneert, zal parallelle verwerking deze alleen maar vertragen.
  • Ontwikkelaars gebruiken graag de TPC-H-benchmark. Misschien heeft u vergelijkbare vragen voor een perfecte parallelle uitvoering.
  • Alleen SELECT-query's zonder predikaatvergrendeling worden parallel uitgevoerd.
  • Soms is een goede indexering beter dan het scannen van opeenvolgende tabellen in parallelle modus.
  • Het onderbreken van query's en cursors wordt niet ondersteund.
  • Vensterfuncties en geordende set-aggregaatfuncties zijn niet parallel.
  • U wint niets aan de I/O-werklast.
  • Er zijn geen parallelle sorteeralgoritmen. Maar query's met sortering kunnen in sommige aspecten parallel worden uitgevoerd.
  • Vervang CTE (MET ...) door een geneste SELECT om parallelle verwerking mogelijk te maken.
  • Datawrappers van derden ondersteunen nog geen parallelle verwerking (maar dat zou wel kunnen!)
  • FULL OUTER JOIN wordt niet ondersteund.
  • max_rows schakelt parallelle verwerking uit.
  • Als een query een functie heeft die niet als PARALLEL SAFE is gemarkeerd, zal deze een enkele thread hebben.
  • Het SERIALISEERBARE transactie-isolatieniveau schakelt parallelle verwerking uit.

Test omgeving

PostgreSQL-ontwikkelaars probeerden de responstijd van TPC-H-benchmarkquery's te verkorten. Download de benchmark en pas het aan PostgreSQL aan. Dit is een onofficieel gebruik van de TPC-H-benchmark - niet voor database- of hardwarevergelijking.

  1. Download TPC-H_Tools_v2.17.3.zip (of nieuwere versie) van TPC buiten de locatie.
  2. Hernoem makefile.suite naar Makefile en wijzig zoals hier beschreven: https://github.com/tvondra/pg_tpch . Compileer de code met het make-commando.
  3. Gegevens genereren: ./dbgen -s 10 creëert een database van 23 GB. Dit is voldoende om het verschil te zien in de prestaties van parallelle en niet-parallelle zoekopdrachten.
  4. Bestanden omzetten tbl в csv с for и sed.
  5. Kloon de opslagplaats pg_tpch en kopieer de bestanden csv в pg_tpch/dss/data.
  6. Maak query's met een opdracht qgen.
  7. Laad gegevens in de database met de opdracht ./tpch.sh.

Parallel sequentieel scannen

Het kan sneller zijn, niet vanwege het parallelle lezen, maar omdat de gegevens over veel CPU-kernen zijn verspreid. In moderne besturingssystemen worden PostgreSQL-gegevensbestanden goed in de cache opgeslagen. Met vooruitlezen is het mogelijk om een ​​groter blok uit de opslag te halen dan de PG-daemon vraagt. Daarom worden de queryprestaties niet beperkt door schijf-I/O. Het verbruikt CPU-cycli om:

  • lees rijen één voor één van tabelpagina's;
  • vergelijk stringwaarden en voorwaarden WHERE.

Laten we een eenvoudige query uitvoeren select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

De sequentiële scan levert te veel rijen op zonder aggregatie, waardoor de query wordt uitgevoerd door één enkele CPU-kern.

Als je toevoegt SUM(), kunt u zien dat twee workflows de zoekopdracht helpen versnellen:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Parallelle aggregatie

Het Parallel Seq Scan-knooppunt produceert rijen voor gedeeltelijke aggregatie. Het knooppunt "Gedeeltelijk aggregaat" trimt deze lijnen met behulp van SUM(). Aan het einde wordt de SOM-teller van elk werkproces verzameld door het knooppunt "Verzamelen".

Het eindresultaat wordt berekend door het knooppunt “Finalize Aggregate”. Als u over uw eigen aggregatiefuncties beschikt, vergeet dan niet deze als “parallel veilig” te markeren.

Aantal werkprocessen

Het aantal werkprocessen kan worden verhoogd zonder de server opnieuw op te starten:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Wat is hier aan de hand? Er waren 2 keer meer werkprocessen en de aanvraag werd slechts 1,6599 keer sneller. De berekeningen zijn interessant. We hadden 2 werkprocessen en 1 leider. Na de wijziging werd het 4+1.

Onze maximale versnelling door parallelle verwerking: 5/3 = 1,66(6) keer.

Hoe werkt het?

processen

De uitvoering van verzoeken begint altijd met het leidende proces. De leider doet alles niet-parallel en enige parallelle verwerking. Andere processen die dezelfde verzoeken uitvoeren, worden werkprocessen genoemd. Parallelle verwerking maakt gebruik van infrastructuur dynamische achtergrondwerkprocessen (vanaf versie 9.4). Omdat andere delen van PostgreSQL processen gebruiken in plaats van threads, kan een query met drie werkprocessen vier keer sneller zijn dan traditionele verwerking.

Wisselwerking

Werkprocessen communiceren met de leider via een berichtenwachtrij (gebaseerd op gedeeld geheugen). Elk proces heeft 2 wachtrijen: voor fouten en voor tupels.

Hoeveel workflows zijn er nodig?

De minimumlimiet wordt gespecificeerd door de parameter max_parallel_workers_per_gather. De aanvraagrunner haalt vervolgens werkprocessen uit de pool die wordt beperkt door de parameter max_parallel_workers size. De laatste beperking is max_worker_processes, dat wil zeggen het totale aantal achtergrondprocessen.

Als het niet mogelijk was om een ​​werkproces toe te wijzen, vindt de verwerking uit één proces plaats.

De queryplanner kan de workflows verminderen, afhankelijk van de grootte van de tabel of index. Daar zijn parameters voor min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Elke keer is de tafel 3 keer groter dan min_parallel_(index|table)_scan_size, voegt Postgres een werkproces toe. Het aantal workflows is niet gebaseerd op kosten. Circulaire afhankelijkheid maakt complexe implementaties lastig. In plaats daarvan gebruikt de planner eenvoudige regels.

In de praktijk zijn deze regels niet altijd geschikt voor productie, dus u kunt het aantal werkprocessen voor een specifieke tabel wijzigen: ALTER TABLE ... SET (parallel_workers = N).

Waarom wordt er geen parallelle verwerking gebruikt?

Naast de lange lijst met beperkingen zijn er ook kostencontroles:

parallel_setup_cost - om parallelle verwerking van korte verzoeken te voorkomen. Deze parameter schat de tijd die nodig is om het geheugen voor te bereiden, het proces te starten en de eerste gegevensuitwisseling uit te voeren.

parallel_tuple_cost: communicatie tussen de leider en werknemers kan worden vertraagd in verhouding tot het aantal tupels uit werkprocessen. Deze parameter berekent de kosten van gegevensuitwisseling.

Geneste lusverbindingen

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Het verzamelen vindt plaats in de laatste fase, dus Nested Loop Left Join is een parallelle bewerking. Alleen parallelle indexscan is pas in versie 10 geïntroduceerd. Het werkt vergelijkbaar met parallel serieel scannen. Voorwaarde c_custkey = o_custkey leest één bestelling per klantreeks. Het is dus niet parallel.

Hash Doe mee

Elk werkproces creëert zijn eigen hashtabel tot PostgreSQL 11. En als er meer dan vier van deze processen zijn, zullen de prestaties niet verbeteren. In de nieuwe versie wordt de hashtabel gedeeld. Elk werkproces kan WORK_MEM gebruiken om een ​​hashtabel te maken.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Query 12 van TPC-H laat duidelijk een parallelle hashverbinding zien. Elk werkproces draagt ​​bij aan het creëren van een gemeenschappelijke hashtabel.

Samenvoegen Doe mee

Een samenvoeging is niet-parallel van aard. Maakt u zich geen zorgen als dit de laatste stap van de query is; deze kan nog steeds parallel worden uitgevoerd.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Het knooppunt "Merge Join" bevindt zich boven het knooppunt "Gather Merge". Bij het samenvoegen wordt dus geen gebruik gemaakt van parallelle verwerking. Maar het knooppunt “Parallel Index Scan” helpt nog steeds bij het segment part_pkey.

Verbinding per sectie

In PostgreSQL 11 verbinding per sectie standaard uitgeschakeld: het heeft een zeer dure planning. Tabellen met een vergelijkbare partitie kunnen partitie voor partitie worden samengevoegd. Op deze manier zal Postgres kleinere hashtabellen gebruiken. Elke verbinding van secties kan parallel zijn.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Het belangrijkste is dat de verbinding in secties alleen parallel is als deze secties groot genoeg zijn.

Parallel toevoegen

Parallel toevoegen kan worden gebruikt in plaats van verschillende blokken in verschillende workflows. Dit gebeurt meestal bij UNION ALL-query's. Het nadeel is minder parallelliteit, omdat elk werkproces slechts 1 aanvraag verwerkt.

Er worden hier twee werkprocessen uitgevoerd, hoewel er vier zijn ingeschakeld.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

De belangrijkste variabelen

  • WORK_MEM beperkt het geheugen per proces, niet alleen query's: work_mem processen aansluitingen = veel geheugen.
  • max_parallel_workers_per_gather — hoeveel werkprocessen het uitvoerende programma zal gebruiken voor parallelle verwerking van het plan.
  • max_worker_processes — past het totale aantal werkprocessen aan het aantal CPU-kernen op de server aan.
  • max_parallel_workers - hetzelfde, maar dan voor parallelle werkprocessen.

Resultaten van

Vanaf versie 9.6 kan parallelle verwerking de prestaties van complexe query's die veel rijen of indexen scannen aanzienlijk verbeteren. In PostgreSQL 10 is parallelle verwerking standaard ingeschakeld. Vergeet niet om dit uit te schakelen op servers met een grote OLTP-werklast. Opeenvolgende scans of indexscans verbruiken veel bronnen. Als u geen rapport over de gehele gegevensset uitvoert, kunt u de queryprestaties verbeteren door eenvoudigweg ontbrekende indexen toe te voegen of de juiste partities te gebruiken.

referenties

Bron: www.habr.com

Voeg een reactie