Parallelle queries yn PostgreSQL

Parallelle queries yn PostgreSQL
Moderne CPU's hawwe in protte kearnen. Jierrenlang stjoere applikaasjes queries parallel nei databases. As it in rapportfraach is op meardere rigen yn in tabel, rint it flugger by it brûken fan meardere CPU's, en PostgreSQL hat dit sûnt ferzje 9.6 dwaan kinnen.

It duorre 3 jier om de parallelle query-funksje te ymplementearjen - wy moasten de koade opnij skriuwe yn ferskate stadia fan query-útfiering. PostgreSQL 9.6 yntrodusearre ynfrastruktuer om de koade fierder te ferbetterjen. Yn folgjende ferzjes wurde oare soarten queries parallel útfierd.

Beskikberens

  • Skeakelje parallelle útfiering net yn as alle kearnen al drok binne, oars sille oare oanfragen fertrage.
  • It wichtichste is dat parallelle ferwurking mei hege WORK_MEM-wearden in protte ûnthâld brûkt - elke hash-join of sortearje nimt work_mem-ûnthâld op.
  • OLTP-fragen mei lege latency kinne net fersneld wurde troch parallelle útfiering. En as de fraach ien rige weromkomt, sil parallele ferwurking it allinich fertrage.
  • Untwikkelders wolle graach de TPC-H-benchmark brûke. Miskien hawwe jo ferlykbere fragen foar perfekte parallelle útfiering.
  • Allinich SELECT-fragen sûnder predikaatbeskoatteljen wurde parallel útfierd.
  • Soms is juste yndeksearring better as opfolgjende tafelskennen yn parallelle modus.
  • Pauze-fragen en rinnerkes wurde net stipe.
  • Finsterfunksjes en oardere set aggregaatfunksjes binne net parallel.
  • Jo krije neat yn 'e I/O-wurkdruk.
  • D'r binne gjin parallelle sortearalgoritmen. Mar fragen mei soarten kinne yn guon aspekten parallel wurde útfierd.
  • Ferfange CTE (WITH ...) mei in nestele SELECT om parallele ferwurking yn te skeakeljen.
  • Data wrappers fan tredden stypje noch gjin parallelle ferwurking (mar se koenen!)
  • FULL OUTER JOIN wurdt net stipe.
  • max_rows skeakelet parallelle ferwurking út.
  • As in query hat in funksje dy't net markearre PARALLEL SAFE, it sil wêze single thread.
  • It SERIALIZABLE transaksje-isolaasjenivo skeakelt parallelle ferwurking út.

Test Miljeu

PostgreSQL-ûntwikkelders besochten de antwurdtiid fan TPC-H-benchmark-fragen te ferminderjen. Download de benchmark en oanpasse it oan PostgreSQL. Dit is in net-offisjeel gebrûk fan 'e TPC-H-benchmark - net foar databank- of hardwarefergeliking.

  1. Download TPC-H_Tools_v2.17.3.zip (as nijere ferzje) fan TPC offsite.
  2. Omneame makefile.suite nei Makefile en feroarje lykas hjir beskreaun: https://github.com/tvondra/pg_tpch . Kompilearje de koade mei it kommando meitsje.
  3. Generearje gegevens: ./dbgen -s 10 makket in 23 GB databank. Dit is genôch om it ferskil te sjen yn 'e prestaasjes fan parallelle en net-parallelle queries.
  4. Konvertearje triemmen tbl в csv с for и sed.
  5. Kloon it repository pg_tpch en kopiearje de bestannen csv в pg_tpch/dss/data.
  6. Meitsje queries mei in kommando qgen.
  7. Laad gegevens yn 'e databank mei it kommando ./tpch.sh.

Parallel sekwinsjele skennen

It kin wêze flugger net fanwege parallel lêzen, mar omdat de gegevens wurdt ferspraat oer in protte CPU kearnen. Yn moderne bestjoeringssystemen wurde PostgreSQL-gegevensbestannen goed cache. Mei foarút lêzen is it mooglik om in grutter blok fan opslach te krijen dan de PG-daemon freget. Dêrom wurdt query prestaasjes net beheind troch skiif I / O. It konsumearret CPU-syklusen om:

  • lês rigen ien foar ien fan tabelsiden;
  • fergelykje stringwearden en betingsten WHERE.

Litte wy in ienfâldige query útfiere select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

De sekwinsjele scan produseart tefolle rigen sûnder aggregaasje, sadat de query wurdt útfierd troch ien CPU-kearn.

As jo ​​tafoegje SUM(), kinne jo sjen dat twa workflows sille helpe om de query te fersnellen:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Parallelle aggregaasje

It knooppunt Parallel Seq Scan produseart rigen foar parsjele aggregaasje. De knooppunt "Partial Aggregate" trimt dizze rigels mei SUM(). Oan 'e ein wurdt de SUM-teller fan elk arbeidersproses sammele troch it knooppunt "Gather".

It einresultaat wurdt berekkene troch it knooppunt "Finalize Aggregate". As jo ​​​​jo eigen aggregaasjefunksjes hawwe, ferjit dan net om se te markearjen as "parallel feilich".

Oantal arbeider prosessen

It oantal arbeidersprosessen kin ferhege wurde sûnder de tsjinner opnij te starten:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Wat bart hjir? D'r wiene 2 kear mear wurkprosessen, en it fersyk waard mar 1,6599 kear flugger. De berekkeningen binne nijsgjirrich. Wy hiene 2 arbeidersprosessen en 1 lieder. Nei de feroaring waard it 4+1.

Us maksimale snelheid fan parallelle ferwurking: 5/3 = 1,66 (6) kear.

Hoe wurket it?

Prozessen

Eksekúsje fan fersyk begjint altyd mei it liedende proses. De lieder docht alles net-parallel en wat parallelle ferwurking. Oare prosessen dy't deselde oanfragen útfiere wurde arbeidersprosessen neamd. Parallelle ferwurking brûkt ynfrastruktuer dynamyske eftergrûn arbeider prosessen (fan ferzje 9.4). Om't oare dielen fan PostgreSQL prosessen brûke ynstee fan threads, kin in query mei 3 arbeidersprosessen 4 kear rapper wêze as tradisjonele ferwurking.

Ynteraksje

Arbeiderprosessen kommunisearje mei de lieder fia in berjochtwachtrige (basearre op dielde ûnthâld). Elk proses hat 2 wachtrijen: foar flaters en foar tuples.

Hoefolle workflows binne nedich?

De minimale limyt wurdt oantsjutte troch de parameter max_parallel_workers_per_gather. De fersyk runner nimt dan arbeider prosessen út it swimbad beheind troch de parameter max_parallel_workers size. De lêste beheining is max_worker_processes, dat is, it totale oantal eftergrûnprosessen.

As it net mooglik wie om in arbeidersproses te allocearjen, sil de ferwurking ienproses wêze.

De queryplanner kin workflows ferminderje ôfhinklik fan de grutte fan 'e tabel of yndeks. D'r binne parameters foar dit min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Elke kear is de tafel 3 kear grutter as min_parallel_(index|table)_scan_size, Postgres foeget in arbeidersproses ta. It oantal workflows is net basearre op kosten. Sirkulêre ôfhinklikens makket komplekse ymplemintaasjes lestich. Ynstee brûkt de planner ienfâldige regels.

Yn 'e praktyk binne dizze regels net altyd geskikt foar produksje, dus jo kinne it oantal arbeidersprosessen foar in spesifike tabel feroarje: ALTER TABLE ... SET (parallel_workers = N).

Wêrom wurdt parallelle ferwurking net brûkt?

Neist de lange list mei beheiningen binne d'r ek kostenkontrôles:

parallel_setup_cost - om parallelle ferwurking fan koarte oanfragen te foarkommen. Dizze parameter skatte de tiid om ûnthâld te meitsjen, it proses te begjinnen en inisjele gegevensútwikseling.

parallel_tuple_cost: kommunikaasje tusken de lieder en arbeiders kin wurde fertrage yn ferhâlding mei it oantal tuples út wurk prosessen. Dizze parameter berekkent de kosten fan gegevensútwikseling.

Nested Loop Joins

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

De kolleksje komt foar yn 'e lêste etappe, dus Nested Loop Left Join is in parallelle operaasje. Parallel Index Only Scan waard yntrodusearre allinnich yn ferzje 10. It wurket fergelykber mei parallel serial skennen. Betingst c_custkey = o_custkey lêst ien bestelling per client string. Dus it is net parallel.

Hash Join

Elk arbeidersproses makket in eigen hash-tabel oant PostgreSQL 11. En as d'r mear as fjouwer fan dizze prosessen binne, sille prestaasjes net ferbetterje. Yn 'e nije ferzje wurdt de hash-tabel dield. Elk arbeidersproses kin WORK_MEM brûke om in hash-tabel te meitsjen.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Query 12 fan TPC-H lit dúdlik in parallelle hashferbining sjen. Elk arbeidersproses draacht by oan it meitsjen fan in mienskiplike hash-tabel.

Join gearfoegje

In gearfoeging is net-parallel fan aard. Meitsje jo gjin soargen as dit de lêste stap fan 'e query is - it kin noch parallel rinne.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

It knooppunt "Meitsje gearfoegje" leit boppe de "Fúzje sammelje". Sa fúzje brûkt gjin parallelle ferwurking. Mar de knooppunt "Parallel Index Scan" helpt noch altyd mei it segmint part_pkey.

Ferbining troch seksjes

Yn PostgreSQL 11 ferbining troch seksjes standert útskeakele: it hat heul djoere skema. Tabellen mei ferlykbere partitioning kinne wurde ferbûn partition troch partition. Op dizze manier sil Postgres lytsere hashtabellen brûke. Elke ferbining fan seksjes kin parallel wêze.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

It wichtichste is dat de ferbining yn seksjes allinich parallel is as dizze seksjes grut genôch binne.

Parallel taheakke

Parallel taheakke kin brûkt wurde ynstee fan ferskate blokken yn ferskate workflows. Dit bart normaal mei UNION ALL-fragen. It neidiel is minder parallellisme, om't elk arbeidersproses allinich 1 fersyk ferwurket.

D'r rinne hjir 2 wurkprosessen, hoewol 4 binne ynskeakele.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

De wichtichste fariabelen

  • WORK_MEM limitearret ûnthâld per proses, net allinnich queries: work_mem prosessen ferbinings = in protte ûnthâld.
  • max_parallel_workers_per_gather - hoefolle arbeidersprosessen it útfierende programma sil brûke foar parallelle ferwurking fan it plan.
  • max_worker_processes - past it totale oantal arbeidersprosessen oan oan it oantal CPU-kearnen op 'e tsjinner.
  • max_parallel_workers - itselde, mar foar parallelle wurkprosessen.

Resultaten

Fanôf ferzje 9.6 kin parallelle ferwurking de prestaasjes fan komplekse queries gâns ferbetterje dy't in protte rigen of yndeksen scannen. Yn PostgreSQL 10 is parallele ferwurking standert ynskeakele. Unthâld om it út te skeakeljen op servers mei in grutte OLTP-wurkdruk. Opfolgjende scans as yndeksscans ferbrûke in protte boarnen. As jo ​​gjin rapport útfiere oer de folsleine dataset, kinne jo de prestaasjes fan query ferbetterje troch gewoan ûntbrekkende yndeksen ta te foegjen of juste partitionearring te brûken.

referinsjes

Boarne: www.habr.com

Add a comment