Paralelni upiti u PostgreSQL-u

Paralelni upiti u PostgreSQL-u
Moderni procesori imaju mnogo jezgri. Već godinama aplikacije paralelno šalju upite bazama podataka. Ako se radi o upitu izvješća o više redaka u tablici, radi brže kada se koristi više CPU-a, a PostgreSQL to može činiti od verzije 9.6.

Bile su potrebne 3 godine za implementaciju značajke paralelnog upita - morali smo ponovno pisati kod u različitim fazama izvršavanja upita. PostgreSQL 9.6 uveo je infrastrukturu za daljnje poboljšanje koda. U sljedećim verzijama paralelno se izvršavaju druge vrste upita.

Ograničenja

  • Nemojte omogućiti paralelno izvođenje ako su sve jezgre već zauzete, inače će se drugi zahtjevi usporiti.
  • Ono što je najvažnije, paralelna obrada s visokim vrijednostima WORK_MEM koristi puno memorije - svako hash pridruživanje ili sortiranje zauzima work_mem memoriju.
  • OLTP upiti niske latencije ne mogu se ubrzati paralelnim izvođenjem. A ako upit vrati jedan redak, paralelna obrada samo će ga usporiti.
  • Programeri vole koristiti TPC-H benchmark. Možda imate slične upite za savršeno paralelno izvođenje.
  • Paralelno se izvode samo SELECT upiti bez zaključavanja predikata.
  • Ponekad je ispravno indeksiranje bolje od sekvencijalnog skeniranja tablice u paralelnom načinu rada.
  • Pauziranje upita i kursora nije podržano.
  • Prozorske funkcije i agregatne funkcije uređenog skupa nisu paralelne.
  • Ne dobivate ništa na I/O opterećenju.
  • Ne postoje algoritmi paralelnog sortiranja. Ali upiti s sortiranjem mogu se izvršavati paralelno u nekim aspektima.
  • Zamijenite CTE (WITH ...) s ugniježđenim SELECT kako biste omogućili paralelnu obradu.
  • Omotači podataka trećih strana još ne podržavaju paralelnu obradu (ali bi mogli!)
  • FULL OUTER JOIN nije podržan.
  • max_rows onemogućuje paralelnu obradu.
  • Ako upit ima funkciju koja nije označena kao PARALLEL SAFE, bit će jednonit.
  • Razina izolacije transakcije SERIALIZABLE onemogućuje paralelnu obradu.

Testno okruženje

PostgreSQL programeri pokušali su smanjiti vrijeme odgovora TPC-H benchmark upita. Preuzmite referentnu vrijednost i prilagoditi ga PostgreSQL-u. Ovo je neslužbena upotreba referentne vrijednosti TPC-H - ne za usporedbu baze podataka ili hardvera.

  1. Preuzmite TPC-H_Tools_v2.17.3.zip (ili noviju verziju) iz TPC-a izvan mjesta.
  2. Preimenujte makefile.suite u Makefile i promijenite kako je ovdje opisano: https://github.com/tvondra/pg_tpch . Prevedite kod pomoću naredbe make.
  3. Generiraj podatke: ./dbgen -s 10 stvara bazu podataka od 23 GB. Ovo je dovoljno da vidite razliku u izvedbi paralelnih i neparalelnih upita.
  4. Pretvorite datoteke tbl в csv с for и sed.
  5. Klonirajte spremište pg_tpch i kopirajte datoteke csv в pg_tpch/dss/data.
  6. Napravite upite pomoću naredbe qgen.
  7. Učitaj podatke u bazu pomoću naredbe ./tpch.sh.

Paralelno sekvencijalno skeniranje

Možda je brži ne zbog paralelnog čitanja, već zato što su podaci raspoređeni kroz mnogo CPU jezgri. U modernim operativnim sustavima, datoteke s podacima PostgreSQL dobro su predmemorirane. S čitanjem unaprijed, moguće je dobiti veći blok iz pohrane nego što zahtijeva PG daemon. Stoga izvedba upita nije ograničena I/O diska. Troši CPU cikluse za:

  • čitati redove jedan po jedan sa stranica tablice;
  • usporedite vrijednosti niza i uvjete WHERE.

Pokrenimo jednostavan upit select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Sekvencijalno skeniranje proizvodi previše redaka bez agregacije, tako da upit izvršava jedna CPU jezgra.

Ako dodate SUM(), možete vidjeti da će dva tijeka rada ubrzati upit:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Paralelna agregacija

Čvor Parallel Seq Scan proizvodi retke za djelomičnu agregaciju. Čvor "Djelomični agregat" skraćuje ove retke pomoću SUM(). Na kraju, SUM brojač iz svakog radnog procesa prikuplja čvor "Gather".

Konačni rezultat izračunava čvor "Finalize Aggregate". Ako imate vlastite funkcije agregacije, ne zaboravite ih označiti kao "paralelno sigurne".

Broj radnih procesa

Broj radnih procesa može se povećati bez ponovnog pokretanja poslužitelja:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Što se ovdje događa? Bilo je 2 puta više radnih procesa, a zahtjev je postao samo 1,6599 puta brži. Zanimljivi su izračuni. Imali smo 2 radnička procesa i 1 voditelja. Nakon izmjene postalo je 4+1.

Naše maksimalno ubrzanje od paralelne obrade: 5/3 = 1,66(6) puta.

Kako se to radi?

procesi

Izvršenje zahtjeva uvijek počinje vodećim procesom. Voditelj sve radi neparalelno i neku paralelnu obradu. Ostali procesi koji izvršavaju iste zahtjeve nazivaju se radnički procesi. Paralelna obrada koristi infrastrukturu dinamički pozadinski radnički procesi (od verzije 9.4). Budući da drugi dijelovi PostgreSQL-a koriste procese, a ne niti, upit s 3 radna procesa mogao bi biti 4 puta brži od tradicionalne obrade.

Interakcija

Radnički procesi komuniciraju s voditeljem putem reda poruka (na temelju zajedničke memorije). Svaki proces ima 2 reda: za greške i za torke.

Koliko je radnih procesa potrebno?

Minimalna granica određena je parametrom max_parallel_workers_per_gather. Pokretač zahtjeva zatim preuzima radne procese iz skupa ograničenog parametrom max_parallel_workers size. Posljednje ograničenje je max_worker_processes, odnosno ukupan broj pozadinskih procesa.

Ako nije bilo moguće dodijeliti radni proces, obrada će biti jednoprocesna.

Planer upita može smanjiti tijek rada ovisno o veličini tablice ili indeksa. Za to postoje parametri min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Svaki put kada je tablica 3 puta veća od min_parallel_(index|table)_scan_size, Postgres dodaje radni proces. Broj tijekova rada ne temelji se na troškovima. Kružna ovisnost otežava složene implementacije. Umjesto toga, planer koristi jednostavna pravila.

U praksi ova pravila nisu uvijek prikladna za proizvodnju, tako da možete promijeniti broj radnih procesa za određenu tablicu: ALTER TABLE ... SET (parallel_workers = N).

Zašto se ne koristi paralelna obrada?

Osim dugog popisa ograničenja, postoje i provjere troškova:

parallel_setup_cost - izbjegavanje paralelne obrade kratkih zahtjeva. Ovaj parametar procjenjuje vrijeme za pripremu memorije, pokretanje procesa i početnu razmjenu podataka.

parallel_tuple_cost: komunikacija između voditelja i radnika može se odgoditi proporcionalno broju torki iz radnih procesa. Ovaj parametar izračunava trošak razmjene podataka.

Spojevi ugniježđene petlje

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Prikupljanje se događa u posljednjoj fazi, tako da je lijevo spajanje ugniježđene petlje paralelna operacija. Paralelno skeniranje samo indeksa uvedeno je tek u verziji 10. Djeluje slično paralelnom serijskom skeniranju. Stanje c_custkey = o_custkey čita jednu narudžbu po klijentskom nizu. Dakle, nije paralelno.

Hash Join

Svaki radni proces stvara vlastitu hash tablicu do PostgreSQL 11. A ako postoji više od četiri ova procesa, performanse se neće poboljšati. U novoj verziji, hash tablica se dijeli. Svaki radni proces može koristiti WORK_MEM za stvaranje hash tablice.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Upit 12 iz TPC-H jasno pokazuje paralelnu hash vezu. Svaki radni proces pridonosi stvaranju zajedničke hash tablice.

Spajanje Pridružite se

Spajanje spajanjem je neparalelno po prirodi. Ne brinite ako je ovo zadnji korak upita - i dalje se može izvoditi paralelno.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Čvor "Merge Join" nalazi se iznad "Gather Merge". Dakle, spajanje ne koristi paralelnu obradu. Ali čvor "Paralelno indeksno skeniranje" i dalje pomaže u segmentu part_pkey.

Povezivanje po dijelovima

U PostgreSQL 11 veza po sekcijama onemogućeno prema zadanim postavkama: ima vrlo skupo raspoređivanje. Tablice sa sličnim particioniranjem mogu se spajati particiju po particiju. Na ovaj način Postgres će koristiti manje hash tablice. Svaka veza sekcija može biti paralelna.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Glavna stvar je da je veza u dijelovima paralelna samo ako su ti dijelovi dovoljno veliki.

Paralelno dodavanje

Paralelno dodavanje može se koristiti umjesto različitih blokova u različitim tijekovima rada. To se obično događa s UNION ALL upitima. Nedostatak je manji paralelizam, jer svaki radni proces obrađuje samo 1 zahtjev.

Ovdje su pokrenuta 2 radna procesa, iako su 4 omogućena.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Najvažnije varijable

  • WORK_MEM ograničava memoriju po procesu, a ne samo po upitima: work_mem procesa veze = puno memorije.
  • max_parallel_workers_per_gather — koliko će radnih procesa program koji se izvršava koristiti za paralelnu obradu iz plana.
  • max_worker_processes — prilagođava ukupni broj radnih procesa broju CPU jezgri na poslužitelju.
  • max_parallel_workers - isto, ali za paralelne procese rada.

Rezultati

Od verzije 9.6, paralelna obrada može uvelike poboljšati izvedbu složenih upita koji skeniraju mnogo redaka ili indeksa. U PostgreSQL 10, paralelna obrada je omogućena prema zadanim postavkama. Ne zaboravite ga onemogućiti na poslužiteljima s velikim OLTP radnim opterećenjem. Sekvencijalna skeniranja ili skeniranja indeksa troše puno resursa. Ako ne pokrećete izvješće o cijelom skupu podataka, možete poboljšati izvedbu upita jednostavnim dodavanjem indeksa koji nedostaju ili korištenjem odgovarajućeg particioniranja.

reference

Izvor: www.habr.com

Dodajte komentar