Parallelle forespørgsler i PostgreSQL

Parallelle forespørgsler i PostgreSQL
Moderne CPU'er har mange kerner. I årevis har applikationer sendt forespørgsler til databaser parallelt. Hvis det er en rapporteringsforespørgsel mod flere rækker i en tabel, er den hurtigere, når den bruger flere CPU'er, og PostgreSQL har været i stand til at gøre dette siden version 9.6.

Det tog 3 år at implementere den parallelle forespørgselsfunktion - jeg var nødt til at omskrive koden på forskellige stadier af udførelse af forespørgsler. PostgreSQL 9.6 introducerede infrastrukturen for at forbedre koden yderligere. I efterfølgende versioner udføres andre typer forespørgsler parallelt.

Begrænsninger

  • Aktiver ikke parallel eksekvering, hvis alle kerner allerede er optaget, ellers vil andre anmodninger blive langsommere.
  • Vigtigst er det, at parallel bearbejdning med høje WORK_MEM-værdier bruger meget hukommelse - hver hash-join eller sortering bruger hukommelse i mængden af ​​work_mem.
  • OLTP-forespørgsler med lav latens kan ikke accelereres ved parallel udførelse. Og hvis forespørgslen returnerer en enkelt række, vil parallel behandling kun bremse den.
  • Udviklere elsker at bruge TPC-H benchmark. Måske har du lignende forespørgsler til perfekt parallel eksekvering.
  • Kun SELECT-forespørgsler uden prædikatlås udføres parallelt.
  • Nogle gange er korrekt indeksering bedre end sekventielle tabelscanninger parallelt.
  • Forespørgselspause og markører understøttes ikke.
  • Vinduesfunktioner og ordnede aggregatfunktioner er ikke parallelle.
  • Du vinder intet i I/O-arbejdsbyrden.
  • Der er ingen parallelle sorteringsalgoritmer. Men forespørgsler med sorteringer kan køre parallelt i nogle aspekter.
  • Erstat CTE (MED ...) med en indlejret SELECT for at aktivere parallel behandling.
  • Udenlandske dataindpakninger understøtter endnu ikke parallel behandling (men de kunne!)
  • FULD YDRE JOIN understøttes ikke.
  • max_rows deaktiverer parallel behandling.
  • Hvis anmodningen har en funktion, der ikke er markeret som PARALLEL SIKKER, vil den være enkelt-trådet.
  • Transaktionsisolationsniveauet SERIALIZABLE deaktiverer parallel behandling.

Test miljø

PostgreSQL-udviklere har forsøgt at reducere responstiden for TPC-H benchmark-forespørgsler. Download benchmark og tilpasse det til PostgreSQL. Dette er en uofficiel brug af TPC-H benchmark - ikke til at sammenligne databaser eller hardware.

  1. Download TPC-H_Tools_v2.17.3.zip (eller nyere) fra offsite TPC.
  2. Omdøb makefile.suite til Makefile og rediger som beskrevet her: https://github.com/tvondra/pg_tpch . Kompiler koden med make.
  3. Generer data: ./dbgen -s 10 opretter en 23 GB database. Dette er nok til at se ydeevneforskellen mellem parallelle og ikke-parallelle forespørgsler.
  4. Konverter filer tbl в csv с for и sed.
  5. Klon depotet pg_tpch og kopier filerne csv в pg_tpch/dss/data.
  6. Opret anmodninger med en kommando qgen.
  7. Indlæs dataene i databasen med kommandoen ./tpch.sh.

Parallel sekventiel scanning

Det kan være hurtigere, ikke på grund af parallel læsning, men fordi dataene er spredt over mange CPU-kerner. I moderne operativsystemer er PostgreSQL-datafiler godt cachelagrede. Med read-ahead er det muligt at få en blok større fra lager end PG-dæmonen anmoder om. Derfor er forespørgselsydeevne ikke begrænset af disk I/O. Det bruger CPU-cyklusser til:

  • læs linjer en efter en fra tabellens sider;
  • sammenligne strengværdier og betingelser WHERE.

Lad os køre en simpel forespørgsel select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Sekventiel scanning giver for mange rækker uden aggregering, så forespørgslen udføres af én CPU-kerne.

Hvis tilføje SUM(), kan du se, at to arbejdsgange vil hjælpe med at fremskynde forespørgslen:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Parallel Aggregation

Noden "Parallel Seq Scan" producerer rækker til delvis aggregering. "Delvis aggregeret"-knudepunktet afkorter disse rækker med SUM(). I slutningen indsamles SUM-tælleren fra hver arbejdsgang af indsamlingsknuden.

Det endelige resultat beregnes af "Finalize Aggregate" noden. Hvis du har dine egne aggregeringsfunktioner, så glem ikke at markere dem som "parallel sikre".

Antal arbejdsprocesser

Antallet af arbejdsprocesser kan øges uden at genstarte serveren:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Hvad sker der her? Der er 2 gange flere arbejdsprocesser, og forespørgslen er kun 1,6599 gange hurtigere. Beregningerne er interessante. Vi havde 2 arbejdsprocesser og 1 leder. Efter ændringen blev det 4+1.

Vores maksimale fremskyndelse fra parallel behandling: 5/3 = 1,66(6) gange.

Hvordan fungerer det?

processer

Udførelse af en forespørgsel starter altid med den førende proces. Lederen gør alt ikke-parallelt og noget af den parallelle behandling. Andre processer, der udfører de samme anmodninger, kaldes arbejdsprocesser. Parallel behandling bruger infrastruktur dynamiske baggrundsarbejdsgange (siden version 9.4). Da andre dele af PostgreSQL bruger processer i stedet for tråde, kan en forespørgsel med 3 arbejdsprocesser være 4 gange hurtigere end traditionel behandling.

Interaktion

Arbejdsprocesser kommunikerer med lederen via en beskedkø (baseret på delt hukommelse). Hver proces har 2 køer: for fejl og for tuples.

Hvor mange arbejdsprocesser har du brug for?

Minimumsgrænsen sætter parameteren max_parallel_workers_per_gather. Derefter tager anmodningsudøveren arbejdsprocesser fra puljen, begrænset af parameteren max_parallel_workers size. Den sidste begrænsning er max_worker_processes, som er det samlede antal baggrundsprocesser.

Hvis det ikke var muligt at allokere en arbejdsproces, vil behandlingen være en enkelt proces.

Forespørgselsplanlæggeren kan reducere arbejdsgange afhængigt af størrelsen på tabellen eller indekset. Der er muligheder for dette. min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Hver gang er bordet 3 gange større end min_parallel_(index|table)_scan_size, Postgres tilføjer en arbejdsproces. Antallet af arbejdsgange er ikke omkostningsbaseret. Cirkulær afhængighed gør komplekse implementeringer vanskelige. I stedet bruger skemalæggeren simple regler.

I praksis er disse regler ikke altid egnede til produktion, så det er muligt at ændre antallet af arbejdsprocesser for en bestemt tabel: ALTER TABLE ... SET (parallel_workers = N).

Hvorfor bruges parallel behandling ikke?

Ud over den lange liste af restriktioner er der også omkostningstjek:

parallel_setup_cost - at undgå parallel behandling af korte anmodninger. Denne parameter estimerer tiden for hukommelsesforberedelse, processtart og indledende dataudveksling.

parallel_tuple_cost: kommunikation mellem lederen og arbejderne kan forsinkes i forhold til antallet af tuples fra arbejdsprocesserne. Denne parameter tager hensyn til omkostningerne ved dataudveksling.

Indlejrede løkkeforbindelser - Indlejrede løkkeforbindelser

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Samling finder sted i det sidste trin, så Nested Loop Left Join er en parallel operation. Parallel Index Only Scan blev kun introduceret i version 10. Det fungerer på samme måde som en parallel sekventiel scanning. Tilstand c_custkey = o_custkey læser én ordre for hver kundelinje. Så det er ikke parallelt.

Hash Join - Hash Join

Hver arbejdsproces opretter sin egen hash-tabel før PostgreSQL 11. Og hvis der er mere end fire af disse processer, forbedres ydeevnen ikke. I den nye version er hash-tabellen delt. Hver arbejdsproces kan bruge WORK_MEM til at oprette en hash-tabel.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Forespørgsel 12 fra TPC-H illustrerer en parallel hash join. Hver arbejdsproces deltager i oprettelsen af ​​en delt hash-tabel.

Flet Deltag

En sammenføjning er ikke-parallel. Bare rolig, hvis dette er sidste trin i forespørgslen - den kan stadig køre parallelt.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

"Merge Join"-knuden er over "Gather Merge". Så fletning bruger ikke parallel behandling. Men noden "Parallel Index Scan" hjælper stadig med segmentet part_pkey.

Sektionsforbindelse

I PostgreSQL 11 forbindelse efter sektioner deaktiveret som standard: den har meget dyr planlægning. Tabeller med lignende opdeling kan samles sektion for sektion. Dette vil få Postgres til at bruge mindre hash-tabeller. Hver forbindelse af sektioner kan være parallelle.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Det vigtigste er, at forbindelsen med sektioner kun er parallel, hvis disse sektioner er store nok.

Parallel Tillæg

Parallel Tillæg kan bruges i stedet for forskellige blokke i forskellige arbejdsgange. Dette sker normalt med UNION ALL-forespørgsler. Ulempen er mindre samtidighed, da hver arbejdsproces kun håndterer 1 anmodning.

Der kører 2 arbejdsprocesser her, selvom 4 er aktiveret.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Vigtigste variabler

  • WORK_MEM begrænser mængden af ​​hukommelse pr. proces, ikke kun anmodninger: work_mem processer forbindelser = meget hukommelse.
  • max_parallel_workers_per_gather - hvor mange arbejdsprocesser det eksekverende program vil bruge til parallel behandling fra planen.
  • max_worker_processes - justerer det samlede antal arbejdsprocesser til antallet af CPU-kerner på serveren.
  • max_parallel_workers - det samme, men for parallelle arbejdsgange.

Resultaterne af

Fra version 9.6 kan parallel behandling i høj grad forbedre ydeevnen af ​​komplekse forespørgsler, der scanner mange rækker eller indekser. I PostgreSQL 10 er parallel behandling aktiveret som standard. Glem ikke at deaktivere det på servere med en stor OLTP-arbejdsbelastning. Sekventielle eller indeksscanninger er meget ressourcekrævende. Hvis du ikke rapporterer om hele datasættet, kan forespørgsler gøres mere effektive ved blot at tilføje manglende indekser eller ved at bruge korrekt partitionering.

RЎSЃS <P "RєRё

Kilde: www.habr.com

Tilføj en kommentar