Consultes paral·leles a PostgreSQL

Consultes paral·leles a PostgreSQL
Les CPU modernes tenen molts nuclis. Durant anys, les aplicacions han estat enviant consultes a bases de dades en paral·lel. Si es tracta d'una consulta d'informes sobre diverses files d'una taula, és més ràpid quan utilitza diverses CPU, i PostgreSQL ho ha pogut fer des de la versió 9.6.

Va trigar 3 anys a implementar la funció de consulta paral·lela: vaig haver de reescriure el codi en diferents etapes d'execució de la consulta. PostgreSQL 9.6 va introduir la infraestructura per millorar encara més el codi. En versions posteriors, altres tipus de consultes s'executen en paral·lel.

Restriccions

  • No habiliteu l'execució en paral·lel si tots els nuclis ja estan ocupats, en cas contrari, altres sol·licituds s'alentiran.
  • El més important és que el processament paral·lel amb valors WORK_MEM elevats utilitza molta memòria: cada unió o ordenació hash consumeix memòria en la quantitat de work_mem.
  • Les consultes OLTP de baixa latència no es poden accelerar mitjançant l'execució paral·lela. I si la consulta retorna una sola fila, el processament paral·lel només la frenarà.
  • Als desenvolupadors els encanta utilitzar el punt de referència TPC-H. Potser teniu consultes similars per a una execució paral·lela perfecta.
  • Només s'executen en paral·lel les consultes SELECT sense bloqueig de predicats.
  • De vegades, la indexació adequada és millor que les exploracions de taules seqüencials en paral·lel.
  • La pausa de consultes i els cursors no són compatibles.
  • Les funcions de finestra i les funcions agregades de conjunt ordenat no són paral·leles.
  • No guanyes res amb la càrrega de treball d'E/S.
  • No hi ha algorismes d'ordenació paral·lel. Però les consultes amb ordenacions es poden executar paral·lelament en alguns aspectes.
  • Substituïu CTE (AMB ...) per un SELECT imbricat per habilitar el processament paral·lel.
  • Els embolcalls de dades estrangers encara no admeten el processament paral·lel (però sí que podrien!)
  • FULL OUTER JOIN no s'admet.
  • max_rows desactiva el processament paral·lel.
  • Si la sol·licitud té una funció que no està marcada com a PARAL·LEL SAFE, serà d'un sol fil.
  • El nivell d'aïllament de la transacció SERIALIZABLE inhabilita el processament paral·lel.

Entorn de prova

Els desenvolupadors de PostgreSQL han intentat reduir el temps de resposta de les consultes de referència TPC-H. Descarrega el benchmark i adaptar-lo a PostgreSQL. Aquest és un ús no oficial del punt de referència TPC-H, no per comparar bases de dades o maquinari.

  1. Baixeu TPC-H_Tools_v2.17.3.zip (o posterior) des del TPC fora del lloc.
  2. Canvieu el nom de makefile.suite a Makefile i modifiqueu-lo tal com es descriu aquí: https://github.com/tvondra/pg_tpch . Compileu el codi amb make.
  3. Generar dades: ./dbgen -s 10 crea una base de dades de 23 GB. Això és suficient per veure la diferència de rendiment entre consultes paral·leles i no paral·leles.
  4. Converteix fitxers tbl в csv с for и sed.
  5. Clonar el repositori pg_tpch i copieu els fitxers csv в pg_tpch/dss/data.
  6. Crea sol·licituds amb una ordre qgen.
  7. Carregueu les dades a la base de dades amb l'ordre ./tpch.sh.

Escaneig seqüencial paral·lel

Pot ser que sigui més ràpid no per la lectura paral·lela, sinó perquè les dades es troben disperses per molts nuclis de CPU. En els sistemes operatius moderns, els fitxers de dades PostgreSQL estan ben guardats a la memòria cau. Amb la lectura anticipada, és possible obtenir un bloc més gran d'emmagatzematge que el que sol·licita el dimoni PG. Per tant, el rendiment de la consulta no està limitat per l'E/S del disc. Consumeix cicles de CPU per:

  • llegir les línies una per una de les pàgines de la taula;
  • comparar els valors i les condicions de la cadena WHERE.

Anem a fer una consulta senzilla select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

L'exploració seqüencial dóna massa files sense agregació, de manera que la consulta l'executa un nucli de la CPU.

Si afegeixes SUM(), podeu veure que dos fluxos de treball ajudaran a accelerar la consulta:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agregació paral·lela

El node "Parallel Seq Scan" produeix files per a l'agregació parcial. El node "Agregat parcial" trunca aquestes files amb SUM(). Al final, el node Gather recull el comptador SUMA de cada flux de treball.

El resultat final el calcula el node "Finalitza l'agregat". Si teniu les vostres pròpies funcions d'agregació, no us oblideu de marcar-les com a "paral·lel segur".

Nombre de processos de treball

El nombre de processos de treball es pot augmentar sense reiniciar el servidor:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Que està passant aquí? Hi ha 2 vegades més processos de treball i la consulta només és 1,6599 vegades més ràpida. Els càlculs són interessants. Teníem 2 processos de treball i 1 líder. Després del canvi es va convertir en 4+1.

La nostra màxima acceleració del processament paral·lel: 5/3 = 1,66(6) vegades.

Com funciona?

Процессы

L'execució d'una consulta sempre comença amb el procés inicial. El líder fa tot el que no és paral·lel i una part del processament paral·lel. Altres processos que realitzen les mateixes peticions s'anomenen processos de treball. El processament paral·lel utilitza infraestructura fluxos de treball de fons dinàmics (des de la versió 9.4). Com que altres parts de PostgreSQL utilitzen processos en lloc de fils, una consulta amb 3 processos de treball podria ser 4 vegades més ràpida que el processament tradicional.

Interacció

Els processos de treball es comuniquen amb el líder mitjançant una cua de missatges (basada en la memòria compartida). Cada procés té 2 cues: per a errors i per tuples.

Quants processos de treball necessiteu?

El límit mínim estableix el paràmetre max_parallel_workers_per_gather. Aleshores, l'executor de sol·licituds pren els processos de treball del grup, limitat pel paràmetre max_parallel_workers size. L'última limitació és max_worker_processes, que és el nombre total de processos en segon pla.

Si no fos possible assignar un procés de treballador, la tramitació serà d'un sol procés.

El planificador de consultes pot reduir els fluxos de treball en funció de la mida de la taula o de l'índex. Hi ha opcions per a això. min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Cada cop la taula és 3 vegades més gran que min_parallel_(index|table)_scan_size, Postgres afegeix un procés de treball. El nombre de fluxos de treball no es basa en el cost. La dependència circular dificulta les implementacions complexes. En canvi, el planificador utilitza regles senzilles.

A la pràctica, aquestes regles no sempre són adequades per a la producció, de manera que és possible canviar el nombre de processos de treball per a una taula concreta: ALTER TABLE ... SET (parallel_workers = N).

Per què no s'utilitza el processament paral·lel?

A més de la llarga llista de restriccions, també hi ha controls de costos:

parallel_setup_cost - per evitar el processament paral·lel de sol·licituds curtes. Aquest paràmetre estima el temps per a la preparació de la memòria, l'inici del procés i l'intercanvi inicial de dades.

parallel_tuple_cost: la comunicació entre el líder i els treballadors es pot retardar en proporció al nombre de tuples dels processos de treball. Aquest paràmetre considera el cost de l'intercanvi de dades.

Unió de bucle imbricat: unió de bucle imbricat

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

La recollida es produeix a l'últim pas, de manera que la unió a l'esquerra del bucle imbricat és una operació paral·lela. L'exploració només d'índex paral·lel només es va introduir a la versió 10. Funciona de manera semblant a una exploració seqüencial paral·lela. Condició c_custkey = o_custkey llegeix una comanda per a cada fila de client. Així que no és paral·lel.

Hash Join - Hash Join

Cada procés de treball crea la seva pròpia taula hash abans de PostgreSQL 11. I si hi ha més de quatre d'aquests processos, el rendiment no millorarà. A la nova versió, la taula hash es comparteix. Cada procés de treball pot utilitzar WORK_MEM per crear una taula hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

La consulta 12 de TPC-H il·lustra una unió hash paral·lela. Cada procés de treball participa en la creació d'una taula hash compartida.

Unir Unir-se

Una unió de fusió no és de naturalesa paral·lela. No us preocupeu si aquest és l'últim pas de la consulta, encara es pot executar en paral·lel.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

El node "Combinar unió" està a sobre de "Reunir fusionar". Per tant, la fusió no utilitza el processament paral·lel. Però el node "Escaneig d'índex paral·lel" encara ajuda amb el segment part_pkey.

Connexió de la secció

A PostgreSQL 11 connexió per secció desactivat per defecte: té una programació molt cara. Les taules amb particions similars es poden unir secció per secció. Això farà que Postgres utilitzi taules hash més petites. Cada connexió de seccions pot ser paral·lela.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

El més important és que la connexió per seccions és paral·lela només si aquestes seccions són prou grans.

Apèndix paral·lel

Apèndix paral·lel es pot utilitzar en lloc de diferents blocs en diferents fluxos de treball. Això sol passar amb les consultes UNION ALL. L'inconvenient és menys concurrència, ja que cada procés de treball només gestiona 1 sol·licitud.

Hi ha 2 processos de treball en execució aquí, tot i que 4 estan activats.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Variables més importants

  • WORK_MEM limita la quantitat de memòria per procés, no només les sol·licituds: work_mem processos connexions = molta memòria.
  • max_parallel_workers_per_gather - Quants processos de treball utilitzarà el programa executiu per al processament paral·lel del pla.
  • max_worker_processes - ajusta el nombre total de processos de treball al nombre de nuclis de CPU del servidor.
  • max_parallel_workers - el mateix, però per a processos de treball paral·lels.

Resultats de

A partir de la versió 9.6, el processament paral·lel pot millorar molt el rendiment de les consultes complexes que escanegen moltes files o índexs. A PostgreSQL 10, el processament paral·lel està habilitat per defecte. No us oblideu de desactivar-lo en servidors amb una gran càrrega de treball OLTP. Les exploracions seqüencials o d'índex requereixen molt recursos. Si no esteu informant sobre tot el conjunt de dades, les consultes es poden fer més eficients simplement afegint índexs que falten o utilitzant particions correctes.

Referències

Font: www.habr.com

Afegeix comentari