Consultas paralelas en PostgreSQL

Consultas paralelas en PostgreSQL
Las CPU modernas tienen muchos núcleos. Durante años, las aplicaciones envían consultas a bases de datos en paralelo. Si se trata de una consulta de informe en varias filas de una tabla, se ejecuta más rápido cuando se utilizan varias CPU, y PostgreSQL ha podido hacer esto desde la versión 9.6.

Se necesitaron 3 años para implementar la función de consulta paralela; tuvimos que reescribir el código en diferentes etapas de ejecución de la consulta. PostgreSQL 9.6 introdujo infraestructura para mejorar aún más el código. En versiones posteriores se ejecutan otros tipos de consultas en paralelo.

restricciones

  • No habilite la ejecución paralela si todos los núcleos ya están ocupados; de lo contrario, otras solicitudes se ralentizarán.
  • Lo más importante es que el procesamiento paralelo con valores altos de WORK_MEM utiliza mucha memoria: cada unión u ordenación hash ocupa memoria work_mem.
  • Las consultas OLTP de baja latencia no se pueden acelerar mediante la ejecución paralela. Y si la consulta devuelve una fila, el procesamiento paralelo sólo la ralentizará.
  • A los desarrolladores les encanta utilizar el punto de referencia TPC-H. Quizás tenga consultas similares para una ejecución paralela perfecta.
  • Sólo las consultas SELECT sin bloqueo de predicado se ejecutan en paralelo.
  • A veces, la indexación adecuada es mejor que el escaneo secuencial de tablas en modo paralelo.
  • No se admiten pausas de consultas ni cursores.
  • Las funciones de ventana y las funciones agregadas de conjuntos ordenados no son paralelas.
  • No gana nada en la carga de trabajo de E/S.
  • No existen algoritmos de clasificación paralelos. Pero las consultas con tipos se pueden ejecutar en paralelo en algunos aspectos.
  • Reemplace CTE (CON...) con un SELECT anidado para habilitar el procesamiento paralelo.
  • Los contenedores de datos de terceros aún no admiten el procesamiento paralelo (¡pero podrían hacerlo!)
  • No se admite la UNIÓN EXTERIOR COMPLETA.
  • max_rows deshabilita el procesamiento paralelo.
  • Si una consulta tiene una función que no está marcada PARALLEL SAFE, será de un solo subproceso.
  • El nivel de aislamiento de transacciones SERIALIZABLE deshabilita el procesamiento paralelo.

Entorno de prueba

Los desarrolladores de PostgreSQL intentaron reducir el tiempo de respuesta de las consultas de referencia de TPC-H. Descargue el punto de referencia y adaptarlo a PostgreSQL. Este es un uso no oficial del punto de referencia TPC-H, no para comparar bases de datos o hardware.

  1. Descargue TPC-H_Tools_v2.17.3.zip (o una versión más reciente) desde TPC fuera del sitio.
  2. Cambie el nombre de makefile.suite a Makefile y cámbielo como se describe aquí: https://github.com/tvondra/pg_tpch . Compile el código con el comando make.
  3. Generar datos: ./dbgen -s 10 crea una base de datos de 23 GB. Esto es suficiente para ver la diferencia en el rendimiento de consultas paralelas y no paralelas.
  4. Convertir archivos tbl в csv с for и sed.
  5. Clonar el repositorio pg_tpch y copiar los archivos csv в pg_tpch/dss/data.
  6. Crear consultas con un comando qgen.
  7. Cargar datos en la base de datos con el comando. ./tpch.sh.

Escaneo secuencial paralelo

Puede que sea más rápido no debido a la lectura paralela, sino porque los datos se distribuyen entre muchos núcleos de CPU. En los sistemas operativos modernos, los archivos de datos PostgreSQL se almacenan bien en caché. Con la lectura anticipada, es posible obtener un bloque más grande del almacenamiento que el que solicita el demonio PG. Por lo tanto, el rendimiento de las consultas no está limitado por la E/S del disco. Consume ciclos de CPU para:

  • leer filas una por una desde las páginas de la tabla;
  • comparar valores y condiciones de cadena WHERE.

Ejecutemos una consulta simple select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

El escaneo secuencial produce demasiadas filas sin agregación, por lo que la consulta se ejecuta mediante un único núcleo de CPU.

si agregas SUM(), puede ver que dos flujos de trabajo ayudarán a acelerar la consulta:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agregación paralela

El nodo Parallel Seq Scan produce filas para agregación parcial. El nodo "Agregado parcial" recorta estas líneas usando SUM(). Al final, el nodo "Reunir" recopila el contador SUMA de cada proceso de trabajo.

El resultado final lo calcula el nodo "Finalizar agregado". Si tiene sus propias funciones de agregación, no olvide marcarlas como “paralelas seguras”.

Número de procesos de trabajo

La cantidad de procesos de trabajo se puede aumentar sin reiniciar el servidor:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

¿Que está pasando aqui? Hubo 2 veces más procesos de trabajo y la solicitud se volvió solo 1,6599 veces más rápida. Los cálculos son interesantes. Teníamos 2 procesos de trabajo y 1 líder. Tras el cambio quedó 4+1.

Nuestra aceleración máxima del procesamiento paralelo: 5/3 = 1,66(6) veces.

Como funciona?

Процессы

La ejecución de la solicitud siempre comienza con el proceso principal. El líder hace todo el procesamiento no paralelo y algo de procesamiento paralelo. Otros procesos que realizan las mismas solicitudes se denominan procesos de trabajo. El procesamiento paralelo utiliza infraestructura. procesos dinámicos de trabajo en segundo plano (a partir de la versión 9.4). Dado que otras partes de PostgreSQL utilizan procesos en lugar de subprocesos, una consulta con 3 procesos de trabajo podría ser 4 veces más rápida que el procesamiento tradicional.

Interacción

Los procesos de trabajo se comunican con el líder a través de una cola de mensajes (basada en memoria compartida). Cada proceso tiene 2 colas: de errores y de tuplas.

¿Cuántos flujos de trabajo se necesitan?

El límite mínimo está especificado por el parámetro. max_parallel_workers_per_gather. Luego, el corredor de solicitudes toma procesos de trabajo del grupo limitado por el parámetro max_parallel_workers size. La última limitación es max_worker_processes, es decir, el número total de procesos en segundo plano.

Si no fue posible asignar un proceso de trabajo, el procesamiento será de proceso único.

El planificador de consultas puede reducir los flujos de trabajo según el tamaño de la tabla o índice. Hay parámetros para esto. min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Cada vez que la mesa es 3 veces más grande que min_parallel_(index|table)_scan_size, Postgres agrega un proceso de trabajo. La cantidad de flujos de trabajo no se basa en los costos. La dependencia circular dificulta las implementaciones complejas. En cambio, el planificador utiliza reglas simples.

En la práctica, estas reglas no siempre son adecuadas para la producción, por lo que puede cambiar el número de procesos de trabajo para una tabla específica: ALTER TABLE... SET (parallel_workers = N).

¿Por qué no se utiliza el procesamiento paralelo?

Además de la larga lista de restricciones, también existen controles de costes:

parallel_setup_cost - evitar el procesamiento paralelo de solicitudes breves. Este parámetro estima el tiempo para preparar la memoria, iniciar el proceso y el intercambio de datos inicial.

parallel_tuple_cost: la comunicación entre el líder y los trabajadores puede retrasarse en proporción al número de tuplas de los procesos de trabajo. Este parámetro calcula el costo del intercambio de datos.

Uniones de bucle anidados

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

La recopilación se produce en la última etapa, por lo que Nested Loop Left Join es una operación paralela. El escaneo paralelo de solo índice se introdujo solo en la versión 10. Funciona de manera similar al escaneo en serie paralelo. Condición c_custkey = o_custkey lee un pedido por cadena de cliente. Entonces no es paralelo.

Unión hash

Cada proceso de trabajo crea su propia tabla hash hasta PostgreSQL 11. Y si hay más de cuatro de estos procesos, el rendimiento no mejorará. En la nueva versión, la tabla hash se comparte. Cada proceso de trabajo puede usar WORK_MEM para crear una tabla hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

La consulta 12 de TPC-H muestra claramente una conexión hash paralela. Cada proceso de trabajo contribuye a la creación de una tabla hash común.

Fusionar Unirse

Una unión de fusión no es de naturaleza paralela. No se preocupe si este es el último paso de la consulta: aún puede ejecutarse en paralelo.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

El nodo "Merge Join" se encuentra encima de "Gather Merge". Por lo tanto, la fusión no utiliza procesamiento paralelo. Pero el nodo "Escaneo de índice paralelo" todavía ayuda con el segmento part_pkey.

Conexión por tramos

En PostgreSQL 11 conexión por tramos deshabilitado por defecto: tiene una programación muy costosa. Las tablas con particiones similares se pueden unir partición por partición. De esta forma, Postgres utilizará tablas hash más pequeñas. Cada conexión de secciones puede ser paralela.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Lo principal es que la conexión en secciones es paralela solo si estas secciones son lo suficientemente grandes.

Anexar en paralelo

Anexar en paralelo se puede utilizar en lugar de diferentes bloques en diferentes flujos de trabajo. Esto suele suceder con las consultas UNION ALL. La desventaja es un menor paralelismo, porque cada proceso de trabajo solo procesa 1 solicitud.

Aquí se ejecutan 2 procesos de trabajo, aunque 4 están habilitados.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Las variables más importantes

  • WORK_MEM limita la memoria por proceso, no solo por consultas: work_mem procesos conexiones = mucha memoria.
  • max_parallel_workers_per_gather — cuántos procesos de trabajo utilizará el programa en ejecución para el procesamiento paralelo del plan.
  • max_worker_processes — ajusta el número total de procesos de trabajo al número de núcleos de CPU en el servidor.
  • max_parallel_workers - Lo mismo, pero para procesos de trabajo paralelos.

resultados

A partir de la versión 9.6, el procesamiento paralelo puede mejorar enormemente el rendimiento de consultas complejas que escanean muchas filas o índices. En PostgreSQL 10, el procesamiento paralelo está habilitado de forma predeterminada. Recuerde desactivarlo en servidores con una gran carga de trabajo OLTP. Los escaneos secuenciales o escaneos de índice consumen muchos recursos. Si no está ejecutando un informe sobre todo el conjunto de datos, puede mejorar el rendimiento de las consultas simplemente agregando índices faltantes o utilizando la partición adecuada.

referencias

Fuente: habr.com

Añadir un comentario