Requêtes parallèles dans PostgreSQL

Requêtes parallèles dans PostgreSQL
Les processeurs modernes ont beaucoup de cœurs. Depuis des années, les applications envoient des requêtes aux bases de données en parallèle. S'il s'agit d'une requête de rapport sur plusieurs lignes d'une table, elle s'exécute plus rapidement lors de l'utilisation de plusieurs processeurs, et PostgreSQL est capable de le faire depuis la version 9.6.

Il a fallu 3 ans pour implémenter la fonctionnalité de requête parallèle - nous avons dû réécrire le code à différentes étapes de l'exécution de la requête. PostgreSQL 9.6 a introduit une infrastructure pour améliorer encore le code. Dans les versions ultérieures, d'autres types de requêtes sont exécutés en parallèle.

Restrictions

  • N'activez pas l'exécution parallèle si tous les cœurs sont déjà occupés, sinon les autres requêtes ralentiront.
  • Plus important encore, le traitement parallèle avec des valeurs WORK_MEM élevées utilise beaucoup de mémoire - chaque jointure ou tri par hachage occupe de la mémoire work_mem.
  • Les requêtes OLTP à faible latence ne peuvent pas être accélérées par une exécution parallèle. Et si la requête renvoie une ligne, le traitement parallèle ne fera que la ralentir.
  • Les développeurs adorent utiliser le benchmark TPC-H. Peut-être avez-vous des requêtes similaires pour une exécution parallèle parfaite.
  • Seules les requêtes SELECT sans verrouillage de prédicat sont exécutées en parallèle.
  • Parfois, une indexation appropriée est préférable à une analyse séquentielle des tables en mode parallèle.
  • La suspension des requêtes et des curseurs n'est pas prise en charge.
  • Les fonctions de fenêtre et les fonctions d'agrégation d'ensembles ordonnés ne sont pas parallèles.
  • Vous ne gagnez rien en termes de charge de travail d'E/S.
  • Il n’existe pas d’algorithmes de tri parallèle. Mais les requêtes avec tris peuvent être exécutées en parallèle sous certains aspects.
  • Remplacez CTE (WITH ...) par un SELECT imbriqué pour activer le traitement parallèle.
  • Les wrappers de données tiers ne prennent pas encore en charge le traitement parallèle (mais ils le pourraient !)
  • FULL OUTER JOIN n’est pas pris en charge.
  • max_rows désactive le traitement parallèle.
  • Si une requête a une fonction qui n'est pas marquée PARALLEL SAFE, elle sera monothread.
  • Le niveau d'isolement des transactions SERIALIZABLE désactive le traitement parallèle.

Environnement de test

Les développeurs de PostgreSQL ont tenté de réduire le temps de réponse des requêtes de référence TPC-H. Téléchargez le benchmark et adaptez-le à PostgreSQL. Il s'agit d'une utilisation non officielle du benchmark TPC-H - et non pour une comparaison de bases de données ou de matériel.

  1. Téléchargez TPC-H_Tools_v2.17.3.zip (ou version plus récente) depuis TPC hors site.
  2. Renommez makefile.suite en Makefile et modifiez comme décrit ici : https://github.com/tvondra/pg_tpch . Compilez le code avec la commande make.
  3. Générer des données : ./dbgen -s 10 crée une base de données de 23 Go. Cela suffit pour voir la différence de performances des requêtes parallèles et non parallèles.
  4. Convertir des fichiers tbl в csv с for и sed.
  5. Cloner le dépôt pg_tpch et copiez les fichiers csv в pg_tpch/dss/data.
  6. Créer des requêtes avec une commande qgen.
  7. Chargez les données dans la base de données avec la commande ./tpch.sh.

Balayage séquentiel parallèle

Cela peut être plus rapide non pas à cause de la lecture parallèle, mais parce que les données sont réparties sur de nombreux cœurs de processeur. Dans les systèmes d'exploitation modernes, les fichiers de données PostgreSQL sont bien mis en cache. Avec la lecture anticipée, il est possible d'obtenir un bloc de stockage plus grand que ce que demande le démon PG. Par conséquent, les performances des requêtes ne sont pas limitées par les E/S disque. Il consomme des cycles CPU pour :

  • lire les lignes une par une à partir des pages du tableau ;
  • comparer les valeurs et les conditions des chaînes WHERE.

Lançons une requête simple select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

L'analyse séquentielle produit trop de lignes sans agrégation, la requête est donc exécutée par un seul cœur de processeur.

Si vous ajoutez SUM(), vous pouvez voir que deux workflows permettront d'accélérer la requête :

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agrégation parallèle

Le nœud Parallel Seq Scan produit des lignes pour une agrégation partielle. Le nœud « Partial Aggregate » supprime ces lignes à l'aide de SUM(). À la fin, le compteur SUM de chaque processus de travail est collecté par le nœud « Gather ».

Le résultat final est calculé par le nœud « Finalize Aggregate ». Si vous disposez de vos propres fonctions d’agrégation, n’oubliez pas de les marquer comme « parallèle sécurisé ».

Nombre de processus de travail

Le nombre de processus de travail peut être augmenté sans redémarrer le serveur :

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Que se passe t-il ici? Il y a eu 2 fois plus de processus de travail et la demande n'est devenue que 1,6599 fois plus rapide. Les calculs sont intéressants. Nous avions 2 processus de travail et 1 leader. Après le changement, c'est devenu 4+1.

Notre accélération maximale du traitement parallèle : 5/3 = 1,66(6) fois.

Comment ça marche?

Processus

L'exécution de la demande commence toujours par le processus principal. Le leader fait tout de manière non parallèle et effectue certains traitements parallèles. Les autres processus qui exécutent les mêmes requêtes sont appelés processus de travail. Le traitement parallèle utilise l'infrastructure processus de travail en arrière-plan dynamiques (à partir de la version 9.4). Étant donné que d'autres parties de PostgreSQL utilisent des processus plutôt que des threads, une requête avec 3 processus de travail pourrait être 4 fois plus rapide qu'un traitement traditionnel.

Interaction

Les processus de travail communiquent avec le leader via une file d'attente de messages (basée sur la mémoire partagée). Chaque processus dispose de 2 files d'attente : pour les erreurs et pour les tuples.

Combien de flux de travail sont nécessaires ?

La limite minimale est spécifiée par le paramètre max_parallel_workers_per_gather. L'exécuteur de requêtes extrait ensuite les processus de travail du pool limité par le paramètre max_parallel_workers size. La dernière limite est max_worker_processes, c'est-à-dire le nombre total de processus en arrière-plan.

S'il n'a pas été possible d'attribuer un processus de travail, le traitement sera monoprocessus.

Le planificateur de requêtes peut réduire les flux de travail en fonction de la taille de la table ou de l'index. Il y a des paramètres pour cela min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Chaque fois que la table est 3 fois plus grande que min_parallel_(index|table)_scan_size, Postgres ajoute un processus de travail. Le nombre de flux de travail n'est pas basé sur les coûts. La dépendance circulaire rend les implémentations complexes difficiles. Le planificateur utilise plutôt des règles simples.

En pratique, ces règles ne sont pas toujours adaptées à la production, vous pouvez donc modifier le nombre de processus de travail pour une table spécifique : ALTER TABLE ... SET (parallel_workers = N).

Pourquoi le traitement parallèle n’est-il pas utilisé ?

À la longue liste de restrictions s’ajoutent également des contrôles de coûts :

parallel_setup_cost - pour éviter le traitement parallèle de demandes courtes. Ce paramètre estime le temps nécessaire pour préparer la mémoire, démarrer le processus et échanger les données initiales.

parallel_tuple_cost: la communication entre le leader et les travailleurs peut être retardée proportionnellement au nombre de tuples issus des processus de travail. Ce paramètre calcule le coût de l'échange de données.

Jointures de boucles imbriquées

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

La collecte se produit à la dernière étape, donc Nested Loop Left Join est une opération parallèle. Parallel Index Only Scan n'a été introduit que dans la version 10. Il fonctionne de la même manière que l'analyse série parallèle. Condition c_custkey = o_custkey lit une commande par chaîne client. Ce n'est donc pas parallèle.

Rejoindre par hachage

Chaque processus de travail crée sa propre table de hachage jusqu'à PostgreSQL 11. Et s'il y a plus de quatre de ces processus, les performances ne s'amélioreront pas. Dans la nouvelle version, la table de hachage est partagée. Chaque processus de travail peut utiliser WORK_MEM pour créer une table de hachage.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

La requête 12 de TPC-H montre clairement une connexion de hachage parallèle. Chaque processus de travail contribue à la création d'une table de hachage commune.

Fusionner Rejoindre

Une jointure par fusion n’est pas de nature parallèle. Ne vous inquiétez pas s'il s'agit de la dernière étape de la requête : elle peut toujours s'exécuter en parallèle.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Le nœud « Merge Join » est situé au-dessus du « Gather Merge ». La fusion n'utilise donc pas de traitement parallèle. Mais le nœud « Parallel Index Scan » aide toujours avec le segment part_pkey.

Connexion par sections

Dans PostgreSQL 11 connexion par sections désactivé par défaut : sa planification est très coûteuse. Les tables avec un partitionnement similaire peuvent être jointes partition par partition. De cette façon, Postgres utilisera des tables de hachage plus petites. Chaque connexion de sections peut être parallèle.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

L'essentiel est que la connexion en sections ne soit parallèle que si ces sections sont suffisamment grandes.

Ajout parallèle

Ajout parallèle peut être utilisé à la place de différents blocs dans différents flux de travail. Cela se produit généralement avec les requêtes UNION ALL. L'inconvénient est moins de parallélisme, car chaque processus de travail ne traite qu'une seule requête.

Deux processus de travail sont en cours d'exécution ici, bien que quatre soient activés.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Les variables les plus importantes

  • WORK_MEM limite la mémoire par processus, pas seulement par requêtes : work_mem processus connexions = beaucoup de mémoire.
  • max_parallel_workers_per_gather — combien de processus de travail le programme en cours d'exécution utilisera pour le traitement parallèle à partir du plan.
  • max_worker_processes — ajuste le nombre total de processus de travail au nombre de cœurs de processeur sur le serveur.
  • max_parallel_workers - pareil, mais pour des processus de travail parallèles.

Les résultats de

Depuis la version 9.6, le traitement parallèle peut améliorer considérablement les performances des requêtes complexes qui analysent de nombreuses lignes ou index. Dans PostgreSQL 10, le traitement parallèle est activé par défaut. N'oubliez pas de le désactiver sur les serveurs avec une charge de travail OLTP importante. Les analyses séquentielles ou les analyses d'index consomment beaucoup de ressources. Si vous n'exécutez pas de rapport sur l'intégralité de l'ensemble de données, vous pouvez améliorer les performances des requêtes en ajoutant simplement les index manquants ou en utilisant un partitionnement approprié.

références

Source: habr.com

Ajouter un commentaire