Paralēli vaicājumi programmā PostgreSQL

Paralēli vaicājumi programmā PostgreSQL
Mūsdienu CPU ir daudz kodolu. Jau gadiem lietojumprogrammas paralēli sūta vaicājumus uz datu bāzēm. Ja tas ir pārskata vaicājums vairākās tabulas rindās, tas darbojas ātrāk, ja tiek izmantoti vairāki centrālie procesori, un PostgreSQL to ir spējis izdarīt kopš versijas 9.6.

Paralēlās vaicājuma funkcijas ieviešana aizņēma 3 gadus – mums bija jāpārraksta kods dažādos vaicājuma izpildes posmos. PostgreSQL 9.6 ieviesa infrastruktūru, lai vēl vairāk uzlabotu kodu. Nākamajās versijās paralēli tiek izpildīti cita veida vaicājumi.

Ierobežojumi

  • Neiespējojiet paralēlo izpildi, ja visi kodoli jau ir aizņemti, pretējā gadījumā citi pieprasījumi palēnināsies.
  • Vissvarīgākais ir tas, ka paralēlā apstrāde ar augstām WORK_MEM vērtībām patērē daudz atmiņas — katra jaucējkrāna pievienošana vai kārtošana aizņem work_mem atmiņu.
  • Zema latentuma OLTP vaicājumus nevar paātrināt ar paralēlu izpildi. Un, ja vaicājums atgriež vienu rindu, paralēlā apstrāde to tikai palēninās.
  • Izstrādātājiem patīk izmantot TPC-H etalonu. Varbūt jums ir līdzīgi vaicājumi perfektai paralēlai izpildei.
  • Paralēli tiek izpildīti tikai SELECT vaicājumi bez predikātu bloķēšanas.
  • Dažreiz pareiza indeksēšana ir labāka nekā secīga tabulas skenēšana paralēlā režīmā.
  • Vaicājumu un kursoru apturēšana netiek atbalstīta.
  • Logu funkcijas un sakārtotās kopas apkopotās funkcijas nav paralēlas.
  • I/O darba slodzē jūs neko neiegūsit.
  • Nav paralēlu šķirošanas algoritmu. Bet vaicājumus ar šķirošanu dažos aspektos var izpildīt paralēli.
  • Aizstājiet CTE (WITH ...) ar ligzdotu SELECT, lai iespējotu paralēlo apstrādi.
  • Trešo pušu datu iesaiņotāji vēl neatbalsta paralēlo apstrādi (bet varētu!)
  • FULL OUTTER JOIN netiek atbalstīta.
  • max_rows atspējo paralēlo apstrādi.
  • Ja vaicājumam ir funkcija, kas nav atzīmēta PARALĒLI DROŠA, tas būs viens pavediens.
  • Darījuma izolācijas līmenis SERIALIZABLE atspējo paralēlo apstrādi.

Testa vide

PostgreSQL izstrādātāji mēģināja samazināt TPC-H etalona vaicājumu reakcijas laiku. Lejupielādējiet etalonu un pielāgot to PostgreSQL. Tas ir neoficiāls TPC-H etalona lietojums — nevis datu bāzes vai aparatūras salīdzināšanai.

  1. Lejupielādēt TPC-H_Tools_v2.17.3.zip (vai jaunāku versiju) no TPC izbraukuma.
  2. Pārdēvējiet makefile.suite par Makefile un mainiet, kā aprakstīts šeit: https://github.com/tvondra/pg_tpch . Apkopojiet kodu ar komandu make.
  3. Ģenerēt datus: ./dbgen -s 10 izveido 23 GB datu bāzi. Tas ir pietiekami, lai redzētu atšķirību paralēlo un neparalēlo vaicājumu veiktspējā.
  4. Konvertēt failus tbl в csv с for и sed.
  5. Klonējiet repozitoriju pg_tpch un kopējiet failus csv в pg_tpch/dss/data.
  6. Izveidojiet vaicājumus ar komandu qgen.
  7. Ielādējiet datus datu bāzē ar komandu ./tpch.sh.

Paralēlā secīgā skenēšana

Tas var būt ātrāks nevis paralēlās lasīšanas dēļ, bet gan tāpēc, ka dati tiek izplatīti daudzos CPU kodolos. Mūsdienu operētājsistēmās PostgreSQL datu faili tiek labi saglabāti kešatmiņā. Ar lasīšanu uz priekšu no krātuves ir iespējams iegūt lielāku bloku, nekā pieprasa PG dēmons. Tāpēc vaicājuma veiktspēju neierobežo diska I/O. Tas patērē CPU ciklus, lai:

  • lasīt rindas pa vienai no tabulas lapām;
  • salīdziniet virknes vērtības un nosacījumus WHERE.

Izpildīsim vienkāršu vaicājumu select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Secīgā skenēšana rada pārāk daudz rindu bez apkopošanas, tāpēc vaicājumu izpilda viens CPU kodols.

Ja pievienojat SUM(), varat redzēt, ka divas darbplūsmas palīdzēs paātrināt vaicājumu:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Paralēlā agregācija

Parallel Seq Scan mezgls veido rindas daļējai apkopošanai. Mezgls "Daļējs apkopojums" apgriež šīs līnijas, izmantojot SUM(). Beigās SUM skaitītājs no katra darbinieka procesa tiek savākts mezglā “Savākt”.

Gala rezultātu aprēķina, izmantojot mezglu “Finalize Aggregate”. Ja jums ir savas apkopošanas funkcijas, neaizmirstiet tās atzīmēt kā “paralēli drošas”.

Darbinieku procesu skaits

Darbinieku procesu skaitu var palielināt, nerestartējot serveri:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Kas šeit notiek? Darba procesu bija 2 reizes vairāk, un pieprasījums kļuva tikai 1,6599 reizes ātrāks. Aprēķini ir interesanti. Mums bija 2 strādnieku procesi un 1 vadītājs. Pēc maiņas kļuva 4+1.

Mūsu maksimālais paātrinājums no paralēlās apstrādes: 5/3 = 1,66 (6) reizes.

Kā tas strādā?

Procesi

Pieprasījuma izpilde vienmēr sākas ar vadošo procesu. Vadītājs dara visu, kas nav paralēls un kaut kāda paralēla apstrāde. Citus procesus, kas izpilda tos pašus pieprasījumus, sauc par darbinieku procesiem. Paralēlā apstrāde izmanto infrastruktūru dinamiski fona darbinieka procesi (no versijas 9.4). Tā kā citas PostgreSQL daļas izmanto procesus, nevis pavedienus, vaicājums ar 3 darbinieku procesiem varētu būt 4 reizes ātrāks nekā tradicionālā apstrāde.

Mijiedarbība

Darbinieku procesi sazinās ar vadītāju, izmantojot ziņojumu rindu (pamatojoties uz koplietojamo atmiņu). Katram procesam ir 2 rindas: kļūdām un kortežām.

Cik darbplūsmu ir nepieciešams?

Minimālo robežu nosaka parametrs max_parallel_workers_per_gather. Pēc tam pieprasījuma izpildītājs ņem darbinieku procesus no pūla, ko ierobežo parametrs max_parallel_workers size. Pēdējais ierobežojums ir max_worker_processes, tas ir, kopējais fona procesu skaits.

Ja nebija iespējams piešķirt darbinieka procesu, apstrāde būs viena procesa.

Vaicājumu plānotājs var samazināt darbplūsmas atkarībā no tabulas vai indeksa lieluma. Tam ir noteikti parametri min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Katru reizi tabula ir 3 reizes lielāka nekā min_parallel_(index|table)_scan_size, Postgres pievieno darbinieka procesu. Darbplūsmu skaits nav balstīts uz izmaksām. Cirkulārā atkarība sarežģī sarežģītas ieviešanas iespējas. Tā vietā plānotājs izmanto vienkāršus noteikumus.

Praksē šie noteikumi ne vienmēr ir piemēroti ražošanai, tāpēc jūs varat mainīt darbinieku procesu skaitu konkrētai tabulai: ALTER TABLE ... SET (parallel_workers = N).

Kāpēc paralēlā apstrāde netiek izmantota?

Papildus garajam ierobežojumu sarakstam ir arī izmaksu pārbaudes:

parallel_setup_cost - lai izvairītos no īsu pieprasījumu paralēlas apstrādes. Šis parametrs nosaka laiku, kas nepieciešams atmiņas sagatavošanai, procesa sākšanai un sākotnējai datu apmaiņai.

parallel_tuple_cost: komunikācija starp vadītāju un strādniekiem var aizkavēties proporcionāli korešu skaitam no darba procesiem. Šis parametrs aprēķina datu apmaiņas izmaksas.

Ligzdotas cilpas pievienošanās

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Savākšana notiek pēdējā posmā, tāpēc Nested Loop Left Join ir paralēla darbība. Paralēlā indeksa skenēšana tika ieviesta tikai 10. versijā. Tā darbojas līdzīgi paralēlajai sērijas skenēšanai. Stāvoklis c_custkey = o_custkey nolasa vienu pasūtījumu katrai klienta virknei. Tātad tas nav paralēls.

Hash pievienojies

Katrs darbinieka process izveido savu jaucējtabulu līdz versijai PostgreSQL 11. Un, ja no šiem procesiem ir vairāk nekā četri, veiktspēja neuzlabosies. Jaunajā versijā hash tabula ir koplietota. Katrs darbinieka process var izmantot WORK_MEM, lai izveidotu hash tabulu.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Vaicājums 12 no TPC-H skaidri parāda paralēlo jaucējsavienojumu. Katrs darbinieka process veicina kopējas jaucēj tabulas izveidi.

Apvienot Join

Apvienojuma savienojums pēc būtības nav paralēls. Neuztraucieties, ja šis ir pēdējais vaicājuma solis — tas joprojām var darboties paralēli.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Merge "Merge Join" atrodas virs "Apkopot sapludināšanu". Tātad apvienošana neizmanto paralēlu apstrādi. Bet mezgls “Paralēlā indeksa skenēšana” joprojām palīdz segmentam part_pkey.

Savienojums pa sekcijām

Programmā PostgreSQL 11 savienojums pa sekcijām pēc noklusējuma atspējots: tai ir ļoti dārga plānošana. Tabulas ar līdzīgu nodalījumu var savienot pa nodalījumiem. Tādā veidā Postgres izmantos mazākas hash tabulas. Katrs sekciju savienojums var būt paralēls.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Galvenais ir tas, ka savienojums sekcijās ir paralēls tikai tad, ja šīs sadaļas ir pietiekami lielas.

Paralēli pievienot

Paralēli pievienot var izmantot dažādu bloku vietā dažādās darbplūsmās. Tas parasti notiek ar UNION ALL vaicājumiem. Trūkums ir mazāks paralēlisms, jo katrs darbinieka process apstrādā tikai 1 pieprasījumu.

Šeit darbojas 2 darbinieku procesi, lai gan 4 ir iespējoti.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Svarīgākie mainīgie

  • WORK_MEM ierobežo atmiņu katram procesam, nevis tikai vaicājumiem: work_mem procesi savienojumi = daudz atmiņas.
  • max_parallel_workers_per_gather — cik darbinieku procesus izpildprogramma izmantos paralēlai apstrādei no plāna.
  • max_worker_processes — pielāgo kopējo darbinieku procesu skaitu CPU kodolu skaitam serverī.
  • max_parallel_workers - tas pats, bet paralēliem darba procesiem.

Rezultāti

Sākot ar versiju 9.6, paralēlā apstrāde var ievērojami uzlabot sarežģītu vaicājumu veiktspēju, kas skenē daudzas rindas vai indeksus. Programmā PostgreSQL 10 paralēlā apstrāde ir iespējota pēc noklusējuma. Atcerieties to atspējot serveros ar lielu OLTP darba slodzi. Secīgā skenēšana vai indeksu skenēšana patērē daudz resursu. Ja nepalaižat pārskatu par visu datu kopu, varat uzlabot vaicājuma veiktspēju, vienkārši pievienojot trūkstošos indeksus vai izmantojot pareizu sadalīšanu.

atsauces

Avots: www.habr.com

Pievieno komentāru