Consultas paralelas no PostgreSQL

Consultas paralelas no PostgreSQL
As CPUs modernas têm muitos núcleos. Durante anos, os aplicativos enviaram consultas aos bancos de dados em paralelo. Se for uma consulta de relatório em múltiplas linhas de uma tabela, ela será executada mais rapidamente ao usar múltiplas CPUs, e o PostgreSQL tem sido capaz de fazer isso desde a versão 9.6.

Demorou 3 anos para implementar o recurso de consulta paralela - tivemos que reescrever o código em diferentes estágios de execução da consulta. O PostgreSQL 9.6 introduziu infraestrutura para melhorar ainda mais o código. Nas versões subsequentes, outros tipos de consultas são executados em paralelo.

Restrições

  • Não habilite a execução paralela se todos os núcleos já estiverem ocupados, caso contrário, outras solicitações ficarão lentas.
  • Mais importante ainda, o processamento paralelo com valores altos de WORK_MEM usa muita memória - cada hash join ou sort ocupa memória de work_mem.
  • Consultas OLTP de baixa latência não podem ser aceleradas pela execução paralela. E se a consulta retornar uma linha, o processamento paralelo apenas a deixará mais lenta.
  • Os desenvolvedores adoram usar o benchmark TPC-H. Talvez você tenha consultas semelhantes para execução paralela perfeita.
  • Somente consultas SELECT sem bloqueio de predicado são executadas em paralelo.
  • Às vezes, a indexação adequada é melhor do que a varredura sequencial de tabelas em modo paralelo.
  • Pausar consultas e cursores não são suportados.
  • Funções de janela e funções agregadas de conjuntos ordenados não são paralelas.
  • Você não ganha nada na carga de trabalho de E/S.
  • Não existem algoritmos de classificação paralela. Mas consultas com classificações podem ser executadas em paralelo em alguns aspectos.
  • Substitua CTE (WITH ...) por um SELECT aninhado para ativar o processamento paralelo.
  • Wrappers de dados de terceiros ainda não suportam processamento paralelo (mas poderiam!)
  • FULL OUTER JOIN não é suportado.
  • max_rows desativa o processamento paralelo.
  • Se uma consulta tiver uma função que não esteja marcada como PARALLEL SAFE, ela será de thread único.
  • O nível de isolamento da transação SERIALIZABLE desativa o processamento paralelo.

Ambiente de teste

Os desenvolvedores do PostgreSQL tentaram reduzir o tempo de resposta das consultas de benchmark TPC-H. Baixe o benchmark e adapte-o ao PostgreSQL. Este é um uso não oficial do benchmark TPC-H - não para comparação de banco de dados ou hardware.

  1. Baixe TPC-H_Tools_v2.17.3.zip (ou versão mais recente) do TPC externo.
  2. Renomeie makefile.suite para Makefile e altere conforme descrito aqui: https://github.com/tvondra/pg_tpch . Compile o código com o comando make.
  3. Gerar dados: ./dbgen -s 10 cria um banco de dados de 23 GB. Isso é suficiente para ver a diferença no desempenho de consultas paralelas e não paralelas.
  4. Converter arquivos tbl в csv с for и sed.
  5. Clonar o repositório pg_tpch e copie os arquivos csv в pg_tpch/dss/data.
  6. Crie consultas com um comando qgen.
  7. Carregue dados no banco de dados com o comando ./tpch.sh.

Varredura sequencial paralela

Pode ser mais rápido não por causa da leitura paralela, mas porque os dados estão espalhados por vários núcleos da CPU. Nos sistemas operacionais modernos, os arquivos de dados PostgreSQL são bem armazenados em cache. Com a leitura antecipada, é possível obter um bloco de armazenamento maior do que as solicitações do daemon PG. Portanto, o desempenho da consulta não é limitado pela E/S do disco. Ele consome ciclos de CPU para:

  • leia as linhas, uma de cada vez, nas páginas da tabela;
  • comparar valores e condições de string WHERE.

Vamos executar uma consulta simples select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

A varredura sequencial produz muitas linhas sem agregação, portanto a consulta é executada por um único núcleo da CPU.

Se você adicionar SUM(), você pode ver que dois fluxos de trabalho ajudarão a acelerar a consulta:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agregação paralela

O nó Parallel Seq Scan produz linhas para agregação parcial. O nó "Partial Aggregate" corta essas linhas usando SUM(). Ao final, o contador SUM de cada processo de trabalho é coletado pelo nó “Gather”.

O resultado final é calculado pelo nó “Finalize Aggregate”. Se você possui suas próprias funções de agregação, não se esqueça de marcá-las como “paralelas seguras”.

Número de processos de trabalho

O número de processos de trabalho pode ser aumentado sem reiniciar o servidor:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

O que está acontecendo aqui? Foram 2 vezes mais processos de trabalho e a solicitação ficou apenas 1,6599 vezes mais rápida. Os cálculos são interessantes. Tínhamos 2 processos de trabalho e 1 líder. Após a mudança passou a ser 4+1.

Nossa aceleração máxima do processamento paralelo: 5/3 = 1,66(6) vezes.

Como isso funciona?

Процессы

A execução da solicitação sempre começa com o processo principal. O líder faz tudo não paralelo e algum processamento paralelo. Outros processos que executam as mesmas solicitações são chamados de processos de trabalho. O processamento paralelo usa infraestrutura processos dinâmicos de trabalho em segundo plano (da versão 9.4). Como outras partes do PostgreSQL usam processos em vez de threads, uma consulta com 3 processos de trabalho poderia ser 4 vezes mais rápida que o processamento tradicional.

Interação

Os processos de trabalho comunicam-se com o líder através de uma fila de mensagens (baseada em memória compartilhada). Cada processo possui 2 filas: para erros e para tuplas.

Quantos fluxos de trabalho são necessários?

O limite mínimo é especificado pelo parâmetro max_parallel_workers_per_gather. O executor de solicitação então pega processos de trabalho do pool limitado pelo parâmetro max_parallel_workers size. A última limitação é max_worker_processes, ou seja, o número total de processos em segundo plano.

Caso não seja possível alocar um processo de trabalho, o processamento será de processo único.

O planejador de consultas pode reduzir fluxos de trabalho dependendo do tamanho da tabela ou índice. Existem parâmetros para isso min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Cada vez que a mesa é 3 vezes maior que min_parallel_(index|table)_scan_size, o Postgres adiciona um processo de trabalho. O número de fluxos de trabalho não é baseado em custos. A dependência circular dificulta implementações complexas. Em vez disso, o planejador usa regras simples.

Na prática, essas regras nem sempre são adequadas para produção, portanto você pode alterar o número de processos de trabalho para uma tabela específica: ALTER TABLE ... SET (parallel_workers = N).

Por que o processamento paralelo não é usado?

Além da longa lista de restrições, há também verificações de custos:

parallel_setup_cost - evitar o processamento paralelo de pedidos curtos. Este parâmetro estima o tempo para preparar a memória, iniciar o processo e iniciar a troca de dados.

parallel_tuple_cost: a comunicação entre o líder e os trabalhadores pode ser atrasada proporcionalmente ao número de tuplas dos processos de trabalho. Este parâmetro calcula o custo da troca de dados.

Junções de loop aninhadas

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

A coleta ocorre no último estágio, portanto Nested Loop Left Join é uma operação paralela. A varredura paralela de índice somente foi introduzida apenas na versão 10. Ela funciona de maneira semelhante à varredura serial paralela. Doença c_custkey = o_custkey lê um pedido por string do cliente. Então não é paralelo.

Junção de hash

Cada processo de trabalho cria sua própria tabela hash até o PostgreSQL 11. E se houver mais de quatro desses processos, o desempenho não melhorará. Na nova versão, a tabela hash é compartilhada. Cada processo de trabalho pode usar WORK_MEM para criar uma tabela hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

A consulta 12 do TPC-H mostra claramente uma conexão hash paralela. Cada processo de trabalho contribui para a criação de uma tabela hash comum.

Mesclar Junção

Uma junção de mesclagem não é paralela por natureza. Não se preocupe se esta for a última etapa da consulta – ela ainda pode ser executada em paralelo.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

O nó "Merge Join" está localizado acima de "Gather Merge". Portanto, a mesclagem não usa processamento paralelo. Mas o nó “Parallel Index Scan” ainda ajuda no segmento part_pkey.

Conexão por seções

No PostgreSQL 11 conexão por seções desabilitado por padrão: tem agendamento muito caro. Tabelas com particionamento semelhante podem ser unidas partição por partição. Dessa forma, o Postgres usará tabelas hash menores. Cada conexão de seções pode ser paralela.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

O principal é que a conexão em seções só seja paralela se essas seções forem grandes o suficiente.

Anexar Paralelo

Anexar Paralelo pode ser usado em vez de blocos diferentes em fluxos de trabalho diferentes. Isso geralmente acontece com consultas UNION ALL. A desvantagem é menos paralelismo, pois cada processo de trabalho processa apenas 1 solicitação.

Existem 2 processos de trabalho em execução aqui, embora 4 estejam habilitados.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

As variáveis ​​mais importantes

  • WORK_MEM limita a memória por processo, não apenas por consultas: work_mem os processos conexões = muita memória.
  • max_parallel_workers_per_gather — quantos processos de trabalho o programa em execução usará para processamento paralelo do plano.
  • max_worker_processes — ajusta o número total de processos de trabalho ao número de núcleos de CPU no servidor.
  • max_parallel_workers - o mesmo, mas para processos de trabalho paralelos.

Resultados de

A partir da versão 9.6, o processamento paralelo pode melhorar significativamente o desempenho de consultas complexas que verificam muitas linhas ou índices. No PostgreSQL 10, o processamento paralelo está habilitado por padrão. Lembre-se de desativá-lo em servidores com grande carga de trabalho OLTP. As verificações sequenciais ou de índice consomem muitos recursos. Se não estiver executando um relatório sobre todo o conjunto de dados, você poderá melhorar o desempenho da consulta simplesmente adicionando índices ausentes ou usando o particionamento adequado.

referências

Fonte: habr.com

Adicionar um comentário