Consultas paralelas en PostgreSQL

Consultas paralelas en PostgreSQL
As CPU modernas teñen moitos núcleos. Durante anos, as aplicacións envían consultas ás bases de datos en paralelo. Se se trata dunha consulta de informe en varias filas dunha táboa, execútase máis rápido cando se usan varias CPU, e PostgreSQL foi capaz de facelo desde a versión 9.6.

Levou 3 anos implementar a función de consulta paralela; tivemos que reescribir o código en diferentes etapas da execución da consulta. PostgreSQL 9.6 introduciu infraestrutura para mellorar aínda máis o código. En versións posteriores, outros tipos de consultas execútanse en paralelo.

Restricións

  • Non habilite a execución paralela se todos os núcleos xa están ocupados, se non, outras solicitudes ralentizarán.
  • O máis importante é que o procesamento paralelo con valores altos de WORK_MEM usa moita memoria; cada unión ou ordenación hash ocupa memoria de work_mem.
  • As consultas OLTP de baixa latencia non se poden acelerar mediante a execución paralela. E se a consulta devolve unha fila, o procesamento paralelo só a ralentizará.
  • Aos desenvolvedores encántalles usar o benchmark TPC-H. Quizais teñas consultas similares para unha execución paralela perfecta.
  • Só as consultas SELECT sen bloqueo de predicados se executan en paralelo.
  • Ás veces é mellor a indexación adecuada que a exploración de táboas secuenciais en modo paralelo.
  • Non se admite a pausa de consultas e cursores.
  • As funcións de fiestra e as funcións agregadas de conxuntos ordenados non son paralelas.
  • Non gaña nada na carga de traballo de E/S.
  • Non hai algoritmos de clasificación paralelos. Pero as consultas con ordenación pódense executar paralelamente nalgúns aspectos.
  • Substitúe CTE (WITH...) por un SELECT anidado para habilitar o procesamento paralelo.
  • Os envoltorios de datos de terceiros aínda non admiten o procesamento paralelo (pero poderían!)
  • FULL OUTER JOIN non é compatible.
  • max_rows desactiva o procesamento paralelo.
  • Se unha consulta ten unha función que non está marcada PARALLEL SAFE, terá un único fío.
  • O nivel de illamento da transacción SERIALIZABLE desactiva o procesamento paralelo.

Entorno de proba

Os desenvolvedores de PostgreSQL intentaron reducir o tempo de resposta das consultas de referencia de TPC-H. Descarga o benchmark e adaptalo a PostgreSQL. Este é un uso non oficial do benchmark TPC-H, non para comparación de bases de datos ou hardware.

  1. Descarga TPC-H_Tools_v2.17.3.zip (ou versión máis recente) de TPC fóra do sitio.
  2. Cambia o nome de makefile.suite a Makefile e cambia como se describe aquí: https://github.com/tvondra/pg_tpch . Compile o código co comando make.
  3. Xerar datos: ./dbgen -s 10 crea unha base de datos de 23 GB. Isto é suficiente para ver a diferenza no rendemento das consultas paralelas e non paralelas.
  4. Converter ficheiros tbl в csv с for и sed.
  5. Clonar o repositorio pg_tpch e copia os ficheiros csv в pg_tpch/dss/data.
  6. Crea consultas cun comando qgen.
  7. Carga datos na base de datos co comando ./tpch.sh.

Exploración secuencial paralela

Pode ser máis rápido non pola lectura paralela, senón porque os datos están repartidos por moitos núcleos de CPU. Nos sistemas operativos modernos, os ficheiros de datos PostgreSQL almacénanse ben na caché. Coa lectura adiante, é posible obter un bloque máis grande do almacenamento que o que solicita o daemon PG. Polo tanto, o rendemento das consultas non está limitado pola E/S do disco. Consume ciclos de CPU para:

  • ler as filas unha a unha das páxinas da táboa;
  • comparar valores e condicións das cadeas WHERE.

Imos realizar unha consulta sinxela select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

A exploración secuencial produce demasiadas filas sen agregación, polo que a consulta é executada por un único núcleo de CPU.

Se engades SUM(), podes ver que dous fluxos de traballo axudarán a acelerar a consulta:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agregación paralela

O nodo Parallel Seq Scan produce filas para a agregación parcial. O nodo "Agregado parcial" recorta estas liñas usando SUM(). Ao final, o contador SUMA de cada proceso de traballo é recollido polo nodo "Reunir".

O resultado final calcúlase polo nodo "Finalizar agregado". Se tes as túas propias funcións de agregación, non esquezas marcalas como "seguro paralelo".

Número de procesos obreiros

O número de procesos de traballo pódese aumentar sen reiniciar o servidor:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Que está pasando aquí? Houbo dúas veces máis procesos de traballo e a solicitude fíxose só 2 veces máis rápida. Os cálculos son interesantes. Tiñamos 1,6599 procesos obreiros e 2 líder. Despois do cambio pasou a ser 1+4.

A nosa máxima aceleración do procesamento paralelo: 5/3 = 1,66(6) veces.

Como funciona isto?

Os procesos

A execución da solicitude comeza sempre co proceso principal. O líder fai todo o procesamento non paralelo e algún procesamento paralelo. Outros procesos que realizan as mesmas solicitudes chámanse procesos de traballo. O procesamento paralelo utiliza infraestrutura procesos de traballo de fondo dinámicos (a partir da versión 9.4). Dado que outras partes de PostgreSQL usan procesos en lugar de fíos, unha consulta con 3 procesos de traballo pode ser catro veces máis rápida que o procesamento tradicional.

Interacción

Os procesos de traballo comunícanse co líder a través dunha fila de mensaxes (baseada na memoria compartida). Cada proceso ten 2 colas: para erros e para tuplas.

Cantos fluxos de traballo son necesarios?

O límite mínimo é especificado polo parámetro max_parallel_workers_per_gather. A continuación, o administrador de solicitudes toma os procesos de traballo do grupo limitado polo parámetro max_parallel_workers size. A última limitación é max_worker_processes, é dicir, o número total de procesos en segundo plano.

Se non fose posible asignar un proceso laboral, a tramitación será dun único proceso.

O planificador de consultas pode reducir os fluxos de traballo dependendo do tamaño da táboa ou do índice. Hai parámetros para iso min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Cada vez que a mesa é 3 veces maior que min_parallel_(index|table)_scan_size, Postgres engade un proceso de traballo. O número de fluxos de traballo non se basea nos custos. A dependencia circular dificulta as implementacións complexas. Pola contra, o planificador usa regras simples.

Na práctica, estas regras non sempre son adecuadas para a produción, polo que pode cambiar o número de procesos de traballo para unha táboa específica: ALTER TABLE ... SET (parallel_workers = N).

Por que non se utiliza o procesamento paralelo?

Ademais da longa lista de restricións, tamén hai controis de custos:

parallel_setup_cost - para evitar o procesamento paralelo de solicitudes curtas. Este parámetro estima o tempo para preparar a memoria, iniciar o proceso e o intercambio inicial de datos.

parallel_tuple_cost: a comunicación entre o líder e os traballadores pódese atrasar en proporción ao número de tuplas dos procesos de traballo. Este parámetro calcula o custo do intercambio de datos.

Unións de bucles anidados

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

A recollida prodúcese na última etapa, polo que a unión á esquerda do bucle anidado é unha operación paralela. A exploración de só índice paralelo só se introduciu na versión 10. Funciona de xeito similar á dixitalización en serie paralela. Condición c_custkey = o_custkey le un pedido por cadea de cliente. Polo tanto, non é paralelo.

Hash Join

Cada proceso de traballo crea a súa propia táboa hash ata PostgreSQL 11. E se hai máis de catro destes procesos, o rendemento non mellorará. Na nova versión, a táboa hash compártese. Cada proceso de traballo pode usar WORK_MEM para crear unha táboa hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

A consulta 12 de TPC-H mostra claramente unha conexión hash paralela. Cada proceso de traballo contribúe á creación dunha táboa hash común.

Unir Únete

Unha combinación de combinación non é de natureza paralela. Non te preocupes se este é o último paso da consulta, aínda pode executarse en paralelo.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

O nodo "Merge Join" está situado encima da "Gather Merge". Polo tanto, a fusión non usa procesamento paralelo. Pero o nodo "Escaneo de índice paralelo" aínda axuda co segmento part_pkey.

Conexión por tramos

En PostgreSQL 11 conexión por seccións desactivado por defecto: ten unha programación moi cara. As táboas con particións similares pódense unir partición por partición. Deste xeito, Postgres usará táboas hash máis pequenas. Cada conexión de seccións pode ser paralela.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

O principal é que a conexión en seccións é paralela só se estas seccións son o suficientemente grandes.

Anexo paralelo

Anexo paralelo pódese usar en lugar de diferentes bloques en diferentes fluxos de traballo. Isto adoita ocorrer coas consultas UNION ALL. A desvantaxe é menor paralelismo, porque cada proceso de traballo só procesa 1 solicitude.

Hai 2 procesos de traballo en execución aquí, aínda que 4 están activados.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

As variables máis importantes

  • WORK_MEM limita a memoria por proceso, non só consultas: work_mem procesos conexións = moita memoria.
  • max_parallel_workers_per_gather — cantos procesos de traballador utilizará o programa en execución para o procesamento paralelo do plan.
  • max_worker_processes — axusta o número total de procesos de traballo ao número de núcleos de CPU do servidor.
  • max_parallel_workers - o mesmo, pero para procesos de traballo paralelos.

Resultados de

A partir da versión 9.6, o procesamento paralelo pode mellorar moito o rendemento de consultas complexas que exploran moitas filas ou índices. En PostgreSQL 10, o procesamento paralelo está habilitado por defecto. Lembra desactivalo en servidores cunha gran carga de traballo OLTP. As exploracións secuenciais ou de índice consumen moitos recursos. Se non está a executar un informe sobre todo o conxunto de datos, pode mellorar o rendemento das consultas simplemente engadindo os índices que faltan ou utilizando a partición adecuada.

referencias

Fonte: www.habr.com

Engadir un comentario