Parallelle spørringer i PostgreSQL

Parallelle spørringer i PostgreSQL
Moderne CPUer har mange kjerner. I årevis har applikasjoner sendt spørringer til databaser parallelt. Hvis det er en rapportspørring på flere rader i en tabell, kjører den raskere ved bruk av flere CPUer, og PostgreSQL har vært i stand til å gjøre dette siden versjon 9.6.

Det tok 3 år å implementere funksjonen for parallelle spørringer - vi måtte skrive om koden på forskjellige stadier av utføringen av spørringen. PostgreSQL 9.6 introduserte infrastruktur for å forbedre koden ytterligere. I etterfølgende versjoner utføres andre typer spørringer parallelt.

Restriksjoner

  • Ikke aktiver parallell kjøring hvis alle kjerner allerede er opptatt, ellers vil andre forespørsler bremse.
  • Det viktigste er at parallell prosessering med høye WORK_MEM-verdier bruker mye minne - hver hash-sammenføyning eller sortering tar opp work_mem-minne.
  • OLTP-spørringer med lav ventetid kan ikke akselereres ved parallell kjøring. Og hvis spørringen returnerer én rad, vil parallell behandling bare bremse den.
  • Utviklere elsker å bruke TPC-H benchmark. Kanskje du har lignende spørsmål for perfekt parallell utførelse.
  • Bare SELECT-spørringer uten predikatlåsing utføres parallelt.
  • Noen ganger er riktig indeksering bedre enn sekvensiell tabellskanning i parallellmodus.
  • Pause spørringer og markører støttes ikke.
  • Vindusfunksjoner og aggregatfunksjoner for bestilt sett er ikke parallelle.
  • Du får ikke noe i I/O-arbeidsmengden.
  • Det finnes ingen parallelle sorteringsalgoritmer. Men spørringer med sorteringer kan utføres parallelt i noen aspekter.
  • Erstatt CTE (MED ...) med en nestet SELECT for å aktivere parallell behandling.
  • Tredjeparts datainnpakninger støtter ennå ikke parallell behandling (men de kunne!)
  • FULL YTRE JOIN støttes ikke.
  • max_rows deaktiverer parallell behandling.
  • Hvis en spørring har en funksjon som ikke er merket PARALLELL SAFE, vil den være entrådet.
  • SERIALIZABLE transaksjonsisolasjonsnivået deaktiverer parallell behandling.

Test miljø

PostgreSQL-utviklere prøvde å redusere responstiden til TPC-H benchmark-spørringer. Last ned benchmark og tilpasse den til PostgreSQL. Dette er en uoffisiell bruk av TPC-H benchmark - ikke for sammenligning av databaser eller maskinvare.

  1. Last ned TPC-H_Tools_v2.17.3.zip (eller nyere versjon) fra TPC offsite.
  2. Gi nytt navn til makefile.suite til Makefile og endre som beskrevet her: https://github.com/tvondra/pg_tpch . Kompiler koden med make-kommandoen.
  3. Generer data: ./dbgen -s 10 oppretter en 23 GB database. Dette er nok til å se forskjellen i ytelsen til parallelle og ikke-parallelle spørringer.
  4. Konverter filer tbl в csv с for и sed.
  5. Klon depotet pg_tpch og kopier filene csv в pg_tpch/dss/data.
  6. Lag spørringer med en kommando qgen.
  7. Last inn data i databasen med kommandoen ./tpch.sh.

Parallell sekvensiell skanning

Det kan være raskere ikke på grunn av parallell lesing, men fordi dataene er spredt over mange CPU-kjerner. I moderne operativsystemer bufres PostgreSQL-datafiler godt. Med lesing fremover er det mulig å få en større blokk fra lagring enn PG-demonen ber om. Derfor begrenses ikke spørringsytelsen av disk I/O. Den bruker CPU-sykluser til:

  • les rader én om gangen fra tabellsider;
  • sammenligne strengverdier og betingelser WHERE.

La oss kjøre en enkel spørring select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Den sekvensielle skanningen produserer for mange rader uten aggregering, så spørringen utføres av en enkelt CPU-kjerne.

Hvis du legger til SUM(), kan du se at to arbeidsflyter vil bidra til å øke hastigheten på spørringen:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Parallell aggregering

Parallell Seq Scan-noden produserer rader for delvis aggregering. "Partial Aggregate"-noden trimmer disse linjene ved hjelp av SUM(). På slutten samles SUM-telleren fra hver arbeidsprosess av "Gather"-noden.

Det endelige resultatet beregnes av "Finalize Aggregate"-noden. Hvis du har dine egne aggregeringsfunksjoner, ikke glem å merke dem som "parallell sikre".

Antall arbeidsprosesser

Antall arbeidsprosesser kan økes uten å starte serveren på nytt:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Hva foregår her? Det var 2 ganger flere arbeidsprosesser, og forespørselen ble bare 1,6599 ganger raskere. Beregningene er interessante. Vi hadde 2 arbeidsprosesser og 1 leder. Etter endringen ble det 4+1.

Vår maksimale hastighetsøkning fra parallell behandling: 5/3 = 1,66(6) ganger.

Hvordan virker det?

prosesser

Utførelse av forespørsel starter alltid med den ledende prosessen. Lederen gjør alt ikke-parallelt og noe parallell prosessering. Andre prosesser som utfører de samme forespørslene kalles arbeiderprosesser. Parallell prosessering bruker infrastruktur dynamiske arbeidsprosesser i bakgrunnen (fra versjon 9.4). Siden andre deler av PostgreSQL bruker prosesser i stedet for tråder, kan en spørring med 3 arbeidsprosesser være 4 ganger raskere enn tradisjonell behandling.

Interaction,ru

Arbeiderprosesser kommuniserer med lederen gjennom en meldingskø (basert på delt minne). Hver prosess har 2 køer: for feil og for tupler.

Hvor mange arbeidsflyter trengs?

Minimumsgrensen er spesifisert av parameteren max_parallel_workers_per_gather. Forespørselsløperen tar deretter arbeiderprosesser fra bassenget begrenset av parameteren max_parallel_workers size. Den siste begrensningen er max_worker_processes, det vil si det totale antallet bakgrunnsprosesser.

Hvis det ikke var mulig å tildele en arbeidsprosess, vil behandlingen være enkeltprosess.

Spørringsplanleggeren kan redusere arbeidsflyter avhengig av størrelsen på tabellen eller indeksen. Det er parametere for dette min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Hver gang er bordet 3 ganger større enn min_parallel_(index|table)_scan_size, legger Postgres til en arbeidsprosess. Antall arbeidsflyter er ikke basert på kostnader. Sirkulær avhengighet gjør komplekse implementeringer vanskelig. I stedet bruker planleggeren enkle regler.

I praksis er disse reglene ikke alltid egnet for produksjon, så du kan endre antall arbeidsprosesser for en bestemt tabell: ALTER TABLE ... SET (parallel_workers = N).

Hvorfor brukes ikke parallell prosessering?

I tillegg til den lange listen over restriksjoner, er det også kostnadskontroller:

parallel_setup_cost - for å unngå parallell behandling av korte forespørsler. Denne parameteren estimerer tiden det tar å forberede minnet, starte prosessen og innledende datautveksling.

parallel_tuple_cost: kommunikasjon mellom leder og arbeidere kan bli forsinket i forhold til antall tupler fra arbeidsprosesser. Denne parameteren beregner kostnadene ved datautveksling.

Nestede løkker

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Samlingen skjer på siste trinn, så Nested Loop Left Join er en parallell operasjon. Parallel Index Only Scan ble introdusert kun i versjon 10. Den fungerer på samme måte som parallell seriell skanning. Betingelse c_custkey = o_custkey leser én ordre per klientstreng. Så det er ikke parallelt.

Hash Bli med

Hver arbeidsprosess lager sin egen hashtabell frem til PostgreSQL 11. Og hvis det er mer enn fire av disse prosessene, vil ikke ytelsen forbedres. I den nye versjonen er hash-tabellen delt. Hver arbeidsprosess kan bruke WORK_MEM til å lage en hashtabell.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Spørring 12 fra TPC-H viser tydelig en parallell hash-forbindelse. Hver arbeidsprosess bidrar til å lage en felles hashtabell.

Slå sammen Bli med

En sammenslåing er ikke-parallell. Ikke bekymre deg hvis dette er det siste trinnet i spørringen – den kan fortsatt kjøres parallelt.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

"Merge Join"-noden er plassert over "Gather Merge". Så sammenslåing bruker ikke parallell prosessering. Men "Parallell Index Scan"-noden hjelper fortsatt med segmentet part_pkey.

Forbindelse etter seksjoner

I PostgreSQL 11 kobling etter seksjoner deaktivert som standard: den har svært kostbar planlegging. Tabeller med lignende partisjonering kan slås sammen partisjon for partisjon. På denne måten vil Postgres bruke mindre hash-tabeller. Hver tilkobling av seksjoner kan være parallelle.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Hovedsaken er at koblingen i seksjoner er parallell bare hvis disse seksjonene er store nok.

Parallell vedlegg

Parallell vedlegg kan brukes i stedet for forskjellige blokker i forskjellige arbeidsflyter. Dette skjer vanligvis med UNION ALL-spørringer. Ulempen er mindre parallellitet, fordi hver arbeidsprosess kun behandler 1 forespørsel.

Det er 2 arbeidsprosesser som kjører her, selv om 4 er aktivert.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

De viktigste variablene

  • WORK_MEM begrenser minnet per prosess, ikke bare spørringer: work_mem prosesser tilkoblinger = mye minne.
  • max_parallel_workers_per_gather — hvor mange arbeidsprosesser det utførende programmet vil bruke for parallell behandling fra planen.
  • max_worker_processes — justerer det totale antallet arbeidsprosesser til antall CPU-kjerner på serveren.
  • max_parallel_workers - det samme, men for parallelle arbeidsprosesser.

Resultater av

Fra og med versjon 9.6 kan parallell behandling forbedre ytelsen til komplekse spørringer som skanner mange rader eller indekser betraktelig. I PostgreSQL 10 er parallellbehandling aktivert som standard. Husk å deaktivere den på servere med stor OLTP-arbeidsbelastning. Sekvensielle skanninger eller indeksskanninger bruker mye ressurser. Hvis du ikke kjører en rapport på hele datasettet, kan du forbedre søkeytelsen ved å legge til manglende indekser eller bruke riktig partisjonering.

referanser

Kilde: www.habr.com

Legg til en kommentar