Queries parallele in PostgreSQL

Queries parallele in PostgreSQL
I CPU muderni anu assai core. Dapoi anni, l'applicazioni anu mandatu dumande à e basa di dati in parallelu. S'ellu hè una dumanda di rapportu contr'à parechje fila in una tavula, hè più veloce quandu usa parechje CPU, è PostgreSQL hà sappiutu fà questu da a versione 9.6.

Ci hà pigliatu 3 anni per implementà a funzione di query parallela - aghju avutu à riscrive u codice in diverse tappe di l'esekzione di a dumanda. PostgreSQL 9.6 hà introduttu l'infrastruttura per migliurà ancu u codice. In e versioni successive, altri tipi di dumande sò eseguite in parallelu.

Restrictions

  • Ùn permette micca l'esecuzione parallela se tutti i nuclei sò digià occupati, altrimenti altre richieste rallentaranu.
  • U più impurtante, u processamentu parallelu cù alti valori WORK_MEM usa assai memoria - ogni hash join o sorta cunsuma memoria in quantità di work_mem.
  • E dumande OLTP di bassa latenza ùn ponu micca esse accelerate da l'esecuzione parallela. È se a dumanda torna una sola fila, u processamentu parallelu solu rallentà.
  • I sviluppatori amanu aduprà u benchmark TPC-H. Forse avete dumande simili per una esecuzione parallela perfetta.
  • Solu e dumande SELECT senza bloccu di predicate sò eseguite in parallelu.
  • A volte l'indicizzamentu propiu hè megliu cà scansi di tavule sequenziali in parallelu.
  • A pausa di a dumanda è i cursori ùn sò micca supportati.
  • E funzioni di a finestra è e funzioni aggregate urdinate ùn sò micca paralleli.
  • Ùn guadagnà nunda in a carica di travagliu I / O.
  • Ùn ci sò micca algoritmi di sorte parallela. Ma e dumande cù sorte ponu esse in parallelu in certi aspetti.
  • Sustituisci CTE (WITH ...) cun un SELECT nidificatu per attivà u processamentu parallelu.
  • I wrappers di dati stranieri ùn supportanu ancu u processamentu parallelu (ma puderanu!)
  • FULL OUTER JOIN ùn hè micca supportatu.
  • max_rows disattiva l'elaborazione parallela.
  • Se a dumanda hà una funzione chì ùn hè micca marcata cum'è PARALLEL SAFE, serà unicu filu.
  • U nivellu di isolamentu di transazzione SERIALIZABLE disattiva u prucessu parallelu.

Ambiente di prova

I sviluppatori di PostgreSQL anu pruvatu à riduce u tempu di risposta di e dumande di benchmark TPC-H. Scaricate u benchmark è adattà à PostgreSQL. Questu hè un usu non ufficiale di u benchmark TPC-H - micca per paragunà basa di dati o hardware.

  1. Scaricate TPC-H_Tools_v2.17.3.zip (o più tardi) da TPC fora di u situ.
  2. Rinominate makefile.suite à Makefile è mudificà cum'è descrittu quì: https://github.com/tvondra/pg_tpch . Cumpilà u codice cù make.
  3. Generate dati: ./dbgen -s 10 crea una basa di dati di 23 GB. Questu hè abbastanza per vede a diferenza di prestazione trà e dumande parallele è micca parallele.
  4. Cunvertisce i schedari tbl в csv с for и sed.
  5. Clona u repository pg_tpch è copià i schedari csv в pg_tpch/dss/data.
  6. Crea richieste cù un cumandamentu qgen.
  7. Caricà i dati in a basa di dati cù u cumandimu ./tpch.sh.

Scan Sequential Parallel

Pò esse più veloce micca per via di a lettura parallela, ma perchè e dati sò spargugliati in parechji core di CPU. In i sistemi operativi muderni, i schedarii di dati PostgreSQL sò ben cache. Cù lettura in anticipu, hè pussibule di ottene un bloccu più grande da u almacenamentu cà e richieste di daemon PG. Dunque, a prestazione di a dumanda ùn hè micca limitata da l'I / O di u discu. Consuma cicli di CPU per:

  • leghje e linee una per una da e pagine di a tavula;
  • paragunate i valori di stringa è e cundizioni WHERE.

Facemu una dumanda simplice select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

A scansione sequenziale dà troppu fila senza aggregazione, cusì a dumanda hè eseguita da un core CPU.

Se aghjunghje SUM(), pudete vede chì dui flussi di travagliu aiutanu à accelerà a quistione:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Aggregazione Parallela

U node "Parallel Seq Scan" produce file per aggregazione parziale. U node "Aggregate Parziale" trunca queste fila cù SUM(). À a fine, u contatore SUM da ogni flussu di travagliu hè recullatu da u node Gather.

U risultatu finali hè calculatu da u node "Finalize Aggregate". Sè vo avete u vostru propriu funzioni aggregation, ùn vi scurdate di marcà cum'è "parallel safe".

U numeru di prucessi di travagliu

U numeru di prucessi di travagliu pò esse aumentatu senza riavvia u servitore:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Chì si passa quì ? Ci hè 2 volte più prucessi di travagliu, è a dumanda hè solu 1,6599 volte più veloce. I calculi sò interessanti. Avemu avutu 2 prucessi di travagliu è 1 capu. Dopu à u cambiamentu hè diventatu 4 + 1.

A nostra accelerazione massima da u prucessu parallelu: 5/3 = 1,66 (6) volte.

Cumu viaghja?

I prucessi

L'esecuzione di una dumanda principia sempre cù u prucessu di guida. U capimachja faci tuttu ciò chì ùn hè micca parallelu è un pocu di u prucessu parallelu. L'altri prucessi chì facenu i stessi dumande sò chjamati prucessi di u travagliu. U processu parallelu usa infrastruttura flussi di travagliu dinamichi di fondo (dapoi a versione 9.4). Siccomu l'altri parti di PostgreSQL utilizanu prucessi piuttostu cà i fili, una dumanda cù 3 prucessi di travagliu puderia esse 4 volte più veloce di u prucessu tradiziunale.

Interaction

I prucessi di u travagliu cumunicanu cù u capu via una fila di messagi (basatu nantu à a memoria spartuta). Ogni prucessu hà 2 file: per errori è per tuple.

Quanti prucessi di travagliu avete bisognu?

U limitu minimu stabilisce u paràmetru max_parallel_workers_per_gather. Allora l'esecutore di a dumanda piglia i prucessi di u travagliu da u pool, limitatu da u paràmetru max_parallel_workers size. L'ultima limitazione hè max_worker_processes, chì hè u numeru tutale di prucessi di fondo.

S'ellu ùn era micca pussibule di assignà un prucessu di u travagliu, u prucessu serà un solu prucessu.

U pianificatore di query pò riduce i flussi di travagliu secondu a dimensione di a tavola o l'indice. Ci sò opzioni per questu. min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Ogni volta a tavula hè 3 volte più grande min_parallel_(index|table)_scan_size, Postgres aghjusta un prucessu di travagliu. U numeru di flussi di travagliu ùn hè micca basatu nantu à u costu. A dipendenza circulare rende difficili implementazioni cumplesse. Invece, u pianificatore usa regule simplici.

In pratica, sti reguli ùn sò micca sempre adattati per a produzzione, cusì hè pussibule cambià u numeru di prucessi di travagliu per una tavola particulari: ALTER TABLE ... SET (parallel_workers = N).

Perchè u prucessu parallelu ùn hè micca usatu?

In più di a longa lista di restrizioni, ci sò ancu cuntrolli di costu:

parallel_setup_cost - per evità u trattamentu parallelu di e dumande brevi. Stu paràmetru stima u tempu per a preparazione di memoria, l'iniziu di u prucessu è u scambiu di dati iniziale.

parallel_tuple_cost: a cumunicazione trà u capu è i travagliadori pò esse ritardata in proporzione à u numeru di tuple da i prucessi di u travagliu. Stu paràmetru cunsidereghja u costu di u scambiu di dati.

Unisci in Loop Nidificatu - Unisci in Loop Nidificatu

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

A riunione si trova in l'ultimu passu, cusì Nested Loop Left Join hè una operazione parallela. Parallel Index Only Scan hè statu introduttu solu in a versione 10. Funziona simili à un scan sequential parallel. Cundizione c_custkey = o_custkey leghje un ordine per ogni linea di cliente. Allora ùn hè micca parallelu.

Hash Join - Hash Join

Ogni prucessu di u travagliu crea a so propria tavola hash prima di PostgreSQL 11. E s'ellu ci sò più di quattru di sti prucessi, u rendiment ùn hà micca megliurà. In a nova versione, a tavola hash hè sparta. Ogni prucessu di travagliu pò aduprà WORK_MEM per creà una tavola hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Query 12 da TPC-H illustra un hash join parallelu. Ogni prucessu di u travagliu participa à a creazione di una tavola hash spartuta.

Unisci unisce

Un join join ùn hè micca parallelu in natura. Ùn vi preoccupate micca s'ellu hè l'ultimu passu di a dumanda - pò ancu eseguisce in parallelu.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

U node "Merge Join" hè sopra à "Gather Merge". Dunque, a fusione ùn usa micca u prucessu parallelu. Ma u node "Scan Parallel Index" aiuta sempre cù u segmentu part_pkey.

cunnessione di a seccione

In PostgreSQL 11 cunnessione per sezzioni disattivatu per difettu: hà una pianificazione assai caru. Tavule cù partizioni simili ponu esse unite sezione per sezione. Questu farà chì Postgres utilizeghja tavule hash più chjuche. Ogni cunnessione di rùbbriche pò esse parallella.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

A cosa principal hè chì a cunnessione per rùbbriche hè parallela solu s'è sti rùbbriche sò abbastanza grande.

Appendice parallelu

Appendice parallelu pò esse usatu invece di diversi blocchi in diversi flussi di travagliu. Questu succede di solitu cù l'UNION ALL queries. U svantaghju hè menu cuncurrenza, postu chì ogni prucessu di u travagliu solu gestisce 1 dumanda.

Ci sò 2 prucessi di travagliu in esecuzione quì, anche se 4 sò attivati.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Variabili più impurtanti

  • WORK_MEM limita a quantità di memoria per prucessu, micca solu dumande: work_mem prucessi cunnessione = assai memoria.
  • max_parallel_workers_per_gather - quanti travagliadori prucessa u prugramma esecutivu aduprà per u processamentu parallelu da u pianu.
  • max_worker_processes - aghjusta u numeru tutale di prucessi di travagliu à u numeru di core CPU in u servitore.
  • max_parallel_workers - u listessu, ma per i prucessi paralleli di u travagliu.

Risultati

Partendu da a versione 9.6, u processamentu parallelu pò migliurà assai u rendiment di e dumande cumplesse chì scannanu parechje fila o indici. In PostgreSQL 10, u processamentu parallelu hè attivatu per automaticamente. Ùn vi scurdate di disattivà lu in i servitori cù un pesu di travagliu OLTP. I scans sequenziali o indici sò assai intensivi di risorse. Se ùn site micca un rapportu nantu à tuttu u set di dati, e dumande ponu esse più performanti per simplificà l'aghjunzione di l'indici mancanti o usendu un particionamentu propiu.

referenze

Source: www.habr.com

Add a comment