Query parallele in PostgreSQL

Query parallele in PostgreSQL
Le CPU moderne hanno molti core. Per anni le applicazioni hanno inviato query ai database in parallelo. Se si tratta di una query di report su più righe in una tabella, viene eseguita più velocemente quando si utilizzano più CPU e PostgreSQL è in grado di farlo dalla versione 9.6.

Ci sono voluti 3 anni per implementare la funzionalità di query parallela: abbiamo dovuto riscrivere il codice in diverse fasi di esecuzione della query. PostgreSQL 9.6 ha introdotto l'infrastruttura per migliorare ulteriormente il codice. Nelle versioni successive, altri tipi di query vengono eseguiti in parallelo.

Restrizioni

  • Non abilitare l'esecuzione parallela se tutti i core sono già occupati, altrimenti le altre richieste rallenteranno.
  • Ancora più importante, l'elaborazione parallela con valori WORK_MEM elevati utilizza molta memoria: ogni hash join o ordinamento occupa memoria work_mem.
  • Le query OLTP a bassa latenza non possono essere accelerate dall'esecuzione parallela. E se la query restituisce una riga, l'elaborazione parallela non farà altro che rallentarla.
  • Gli sviluppatori adorano utilizzare il benchmark TPC-H. Forse hai domande simili per una perfetta esecuzione parallela.
  • Solo le query SELECT senza blocco dei predicati vengono eseguite in parallelo.
  • A volte l'indicizzazione corretta è migliore della scansione sequenziale delle tabelle in modalità parallela.
  • La sospensione di query e cursori non è supportata.
  • Le funzioni finestra e le funzioni aggregate di insiemi ordinati non sono parallele.
  • Non ottieni nulla nel carico di lavoro I/O.
  • Non esistono algoritmi di ordinamento parallelo. Ma per alcuni aspetti le query con gli ordinamenti possono essere eseguite in parallelo.
  • Sostituisci CTE (WITH ...) con una SELECT nidificata per abilitare l'elaborazione parallela.
  • I wrapper di dati di terze parti non supportano ancora l'elaborazione parallela (ma potrebbero!)
  • FULL OUTER JOIN non è supportato.
  • max_rows disabilita l'elaborazione parallela.
  • Se una query ha una funzione che non è contrassegnata PARALLEL SAFE, sarà a thread singolo.
  • Il livello di isolamento della transazione SERIALIZABLE disabilita l'elaborazione parallela.

Ambiente di test

Gli sviluppatori di PostgreSQL hanno cercato di ridurre il tempo di risposta delle query di benchmark TPC-H. Scarica il benchmark e adattarlo a PostgreSQL. Si tratta di un uso non ufficiale del benchmark TPC-H, non per il confronto di database o hardware.

  1. Scarica TPC-H_Tools_v2.17.3.zip (o versione più recente) da TPC fuori sede.
  2. Rinominare makefile.suite in Makefile e modificare come descritto qui: https://github.com/tvondra/pg_tpch . Compila il codice con il comando make.
  3. Genera dati: ./dbgen -s 10 crea un database da 23 GB. Questo è sufficiente per vedere la differenza nelle prestazioni delle query parallele e non parallele.
  4. Converti file tbl в csv с for и sed.
  5. Clonare il repository pg_tpch e copiare i file csv в pg_tpch/dss/data.
  6. Crea query con un comando qgen.
  7. Caricare i dati nel database con il comando ./tpch.sh.

Scansione sequenziale parallela

Potrebbe essere più veloce non a causa della lettura parallela, ma perché i dati sono distribuiti su molti core della CPU. Nei moderni sistemi operativi, i file di dati PostgreSQL vengono memorizzati bene nella cache. Con la lettura anticipata, è possibile ottenere un blocco più grande dallo storage rispetto a quanto richiesto dal demone PG. Pertanto, le prestazioni delle query non sono limitate dall'I/O del disco. Consuma cicli della CPU per:

  • leggere le righe una alla volta dalle pagine della tabella;
  • confrontare valori e condizioni di stringa WHERE.

Eseguiamo una semplice query select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

La scansione sequenziale produce troppe righe senza aggregazione, quindi la query viene eseguita da un singolo core della CPU.

Se aggiungi SUM(), puoi vedere che due flussi di lavoro aiuteranno ad accelerare la query:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Aggregazione parallela

Il nodo Parallel Seq Scan produce righe per l'aggregazione parziale. Il nodo "Aggregazione parziale" ritaglia queste linee utilizzando SUM(). Alla fine, il contatore SUM di ciascun processo di lavoro viene raccolto dal nodo “Gather”.

Il risultato finale viene calcolato dal nodo “Finalizza Aggregato”. Se disponi di funzioni di aggregazione personalizzate, non dimenticare di contrassegnarle come "sicure parallele".

Numero di processi di lavoro

Il numero di processi di lavoro può essere aumentato senza riavviare il server:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Cosa sta succedendo qui? Ci sono stati 2 volte più processi di lavoro e la richiesta è diventata solo 1,6599 volte più veloce. I calcoli sono interessanti. Avevamo 2 processi di lavoro e 1 leader. Dopo la modifica è diventato 4+1.

La nostra velocità massima dall'elaborazione parallela: 5/3 = 1,66(6) volte.

Come funziona?

processi

L'esecuzione della richiesta inizia sempre con il processo principale. Il leader fa tutto ciò che non è parallelo e alcune elaborazioni parallele. Altri processi che eseguono le stesse richieste sono chiamati processi di lavoro. L'elaborazione parallela utilizza l'infrastruttura processi di lavoro in background dinamici (dalla versione 9.4). Poiché altre parti di PostgreSQL utilizzano processi anziché thread, una query con 3 processi di lavoro potrebbe essere 4 volte più veloce dell'elaborazione tradizionale.

Interazione

I processi di lavoro comunicano con il leader attraverso una coda di messaggi (basata sulla memoria condivisa). Ogni processo ha 2 code: per errori e per tuple.

Quanti flussi di lavoro sono necessari?

Il limite minimo è specificato dal parametro max_parallel_workers_per_gather. Il runner della richiesta prende quindi i processi di lavoro dal pool limitato dal parametro max_parallel_workers size. L'ultima limitazione è max_worker_processes, ovvero il numero totale di processi in background.

Se non è stato possibile allocare un processo lavoratore, l'elaborazione sarà a processo singolo.

Il pianificatore di query può ridurre i flussi di lavoro a seconda della dimensione della tabella o dell'indice. Ci sono parametri per questo min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Ogni volta la tabella è 3 volte più grande di min_parallel_(index|table)_scan_size, Postgres aggiunge un processo di lavoro. Il numero di flussi di lavoro non è basato sui costi. La dipendenza circolare rende difficili le implementazioni complesse. Invece, il pianificatore utilizza regole semplici.

In pratica, queste regole non sono sempre adatte alla produzione, quindi è possibile modificare il numero di processi di lavoro per una tabella specifica: ALTER TABLE ... SET (parallel_workers = N).

Perché non viene utilizzata l'elaborazione parallela?

Oltre al lungo elenco di restrizioni, ci sono anche controlli sui costi:

parallel_setup_cost - evitare l'elaborazione parallela di richieste brevi. Questo parametro stima il tempo necessario per preparare la memoria, avviare il processo e lo scambio iniziale di dati.

parallel_tuple_cost: la comunicazione tra leader e lavoratori può essere ritardata in proporzione al numero di tuple provenienti dai processi di lavoro. Questo parametro calcola il costo dello scambio dati.

Join di cicli nidificati

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

La raccolta avviene nell'ultima fase, quindi Nested Loop Left Join è un'operazione parallela. La scansione parallela solo dell'indice è stata introdotta solo nella versione 10. Funziona in modo simile alla scansione seriale parallela. Condizione c_custkey = o_custkey legge un ordine per stringa client. Quindi non è parallelo.

Hash Partecipa

Ogni processo di lavoro crea la propria tabella hash fino a PostgreSQL 11. E se sono presenti più di quattro processi, le prestazioni non miglioreranno. Nella nuova versione la tabella hash è condivisa. Ogni processo di lavoro può utilizzare WORK_MEM per creare una tabella hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

La query 12 di TPC-H mostra chiaramente una connessione hash parallela. Ogni processo di lavoro contribuisce alla creazione di una tabella hash comune.

Unisci Unisci

Un'unione di unione è di natura non parallela. Non preoccuparti se questo è l'ultimo passaggio della query: può ancora essere eseguito in parallelo.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Il nodo "Merge Join" si trova sopra "Gather Merge". Pertanto l'unione non utilizza l'elaborazione parallela. Ma il nodo “Parallel Index Scan” aiuta ancora con il segmento part_pkey.

Collegamento per sezioni

In PostgreSQL 11 collegamento per sezioni disabilitato di default: ha una programmazione molto costosa. Le tabelle con partizionamento simile possono essere unite partizione per partizione. In questo modo Postgres utilizzerà tabelle hash più piccole. Ogni connessione di sezioni può essere parallela.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

La cosa principale è che la connessione in sezioni sia parallela solo se queste sezioni sono sufficientemente grandi.

Aggiunta parallela

Aggiunta parallela può essere utilizzato al posto di blocchi diversi in flussi di lavoro diversi. Questo di solito accade con le query UNION ALL. Lo svantaggio è il minor parallelismo, perché ogni processo di lavoro elabora solo 1 richiesta.

Ci sono 2 processi di lavoro in esecuzione qui, sebbene 4 siano abilitati.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Le variabili più importanti

  • WORK_MEM limita la memoria per processo, non solo per le query: work_mem processi connessioni = molta memoria.
  • max_parallel_workers_per_gather — quanti processi di lavoro verranno utilizzati dal programma in esecuzione per l'elaborazione parallela dal piano.
  • max_worker_processes — adatta il numero totale di processi di lavoro al numero di core della CPU sul server.
  • max_parallel_workers - lo stesso, ma per processi di lavoro paralleli.

Risultati di

A partire dalla versione 9.6, l'elaborazione parallela può migliorare notevolmente le prestazioni di query complesse che eseguono la scansione di molte righe o indici. In PostgreSQL 10, l'elaborazione parallela è abilitata per impostazione predefinita. Ricordarsi di disabilitarlo sui server con un carico di lavoro OLTP elevato. Le scansioni sequenziali o le scansioni dell'indice consumano molte risorse. Se non stai eseguendo un report sull'intero set di dati, puoi migliorare le prestazioni delle query semplicemente aggiungendo gli indici mancanti o utilizzando il partizionamento corretto.

riferimenti

Fonte: habr.com

Aggiungi un commento