Interogări paralele în PostgreSQL

Interogări paralele în PostgreSQL
CPU-urile moderne au o mulțime de nuclee. De ani de zile, aplicațiile trimit interogări către bazele de date în paralel. Dacă este o interogare de raport pe mai multe rânduri dintr-un tabel, rulează mai rapid atunci când se utilizează mai multe procesoare, iar PostgreSQL a reușit să facă acest lucru începând cu versiunea 9.6.

A fost nevoie de 3 ani pentru a implementa caracteristica de interogare paralelă - a trebuit să rescriem codul în diferite etape de execuție a interogării. PostgreSQL 9.6 a introdus infrastructura pentru a îmbunătăți și mai mult codul. În versiunile ulterioare, alte tipuri de interogări sunt executate în paralel.

Restricții

  • Nu activați execuția paralelă dacă toate nucleele sunt deja ocupate, altfel alte solicitări vor încetini.
  • Cel mai important, procesarea paralelă cu valori WORK_MEM ridicate utilizează multă memorie - fiecare îmbinare sau sortare hash ocupă memoria work_mem.
  • Interogările OLTP cu latență scăzută nu pot fi accelerate prin execuție paralelă. Și dacă interogarea returnează un rând, procesarea paralelă o va încetini doar.
  • Dezvoltatorilor le place să folosească benchmark-ul TPC-H. Poate aveți interogări similare pentru o execuție paralelă perfectă.
  • Doar interogările SELECT fără blocarea predicatelor sunt executate în paralel.
  • Uneori, indexarea corectă este mai bună decât scanarea tabelului secvenţial în modul paralel.
  • Întreruperea interogărilor și cursoarelor nu este acceptată.
  • Funcțiile fereastră și funcțiile agregate ale setului ordonat nu sunt paralele.
  • Nu câștigi nimic din volumul de lucru I/O.
  • Nu există algoritmi de sortare paralel. Dar interogările cu sortări pot fi executate în paralel în unele aspecte.
  • Înlocuiți CTE (CU ...) cu un SELECT imbricat pentru a activa procesarea paralelă.
  • Wrapper-urile de date terțe nu acceptă încă procesarea paralelă (dar ar putea!)
  • FULL OUTER JOIN nu este acceptată.
  • max_rows dezactivează procesarea paralelă.
  • Dacă o interogare are o funcție care nu este marcată PARALLEL SAFE, aceasta va fi cu un singur thread.
  • Nivelul de izolare a tranzacției SERIALIZABLE dezactivează procesarea paralelă.

Mediu de testare

Dezvoltatorii PostgreSQL au încercat să reducă timpul de răspuns al interogărilor de referință TPC-H. Descărcați benchmark-ul și adaptați-l la PostgreSQL. Aceasta este o utilizare neoficială a benchmark-ului TPC-H - nu pentru compararea bazelor de date sau hardware.

  1. Descărcați TPC-H_Tools_v2.17.3.zip (sau o versiune mai nouă) de la TPC offsite.
  2. Redenumiți makefile.suite în Makefile și modificați așa cum este descris aici: https://github.com/tvondra/pg_tpch . Compilați codul cu comanda make.
  3. Generați date: ./dbgen -s 10 creează o bază de date de 23 GB. Acest lucru este suficient pentru a vedea diferența în performanța interogărilor paralele și non-paralele.
  4. Convertiți fișiere tbl в csv с for и sed.
  5. Clonează depozitul pg_tpch și copiați fișierele csv в pg_tpch/dss/data.
  6. Creați interogări cu o comandă qgen.
  7. Încărcați datele în baza de date cu comanda ./tpch.sh.

Scanare secvenţială paralelă

Poate fi mai rapid nu din cauza citirii paralele, ci pentru că datele sunt răspândite pe mai multe nuclee CPU. În sistemele de operare moderne, fișierele de date PostgreSQL sunt bine stocate în cache. Cu citirea în avans, este posibil să obțineți un bloc mai mare din stocare decât solicită demonul PG. Prin urmare, performanța interogărilor nu este limitată de I/O pe disc. Consumă cicluri CPU pentru a:

  • citiți rândurile pe rând din paginile tabelului;
  • comparați valorile și condițiile șirurilor WHERE.

Să rulăm o interogare simplă select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Scanarea secvenţială produce prea multe rânduri fără agregare, astfel încât interogarea este executată de un singur nucleu CPU.

Daca adaugati SUM(), puteți vedea că două fluxuri de lucru vor ajuta la accelerarea interogării:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agregare paralelă

Nodul Parallel Seq Scan produce rânduri pentru agregare parțială. Nodul „Agregat parțial” decupează aceste linii folosind SUM(). La sfârșit, contorul SUM de la fiecare proces de lucru este colectat de nodul „Gather”.

Rezultatul final este calculat de nodul „Finalizare agregat”. Dacă aveți propriile funcții de agregare, nu uitați să le marcați ca „sigure în paralel”.

Numărul de procese de lucru

Numărul de procese de lucru poate fi crescut fără a reporni serverul:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Ce se petrece aici? Au fost de 2 ori mai multe procese de lucru, iar cererea a devenit de numai 1,6599 ori mai rapidă. Calculele sunt interesante. Am avut 2 procese de lucru și 1 lider. După schimbare a devenit 4+1.

Accelerarea noastră maximă de la procesarea paralelă: 5/3 = 1,66(6) ori.

Cum funcționează?

Procesele

Executarea cererii începe întotdeauna cu procesul de conducere. Liderul face totul neparalel și unele procesări paralele. Alte procese care efectuează aceleași solicitări sunt numite procese de lucru. Procesarea paralelă utilizează infrastructura procese dinamice ale lucrătorilor de fundal (din versiunea 9.4). Deoarece alte părți ale PostgreSQL folosesc procese mai degrabă decât fire, o interogare cu 3 procese de lucru ar putea fi de 4 ori mai rapidă decât procesarea tradițională.

Interacţiune

Procesele de lucru comunică cu liderul printr-o coadă de mesaje (pe baza memoriei partajate). Fiecare proces are 2 cozi: pentru erori și pentru tupluri.

De câte fluxuri de lucru sunt necesare?

Limita minimă este specificată de parametru max_parallel_workers_per_gather. Rulerul cererii preia apoi procesele de lucru din pool limitat de parametru max_parallel_workers size. Ultima limitare este max_worker_processes, adică numărul total de procese de fundal.

Dacă nu a fost posibilă alocarea unui proces de lucru, procesarea va fi un singur proces.

Planificatorul de interogări poate reduce fluxurile de lucru în funcție de dimensiunea tabelului sau a indexului. Există parametri pentru asta min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

De fiecare dată masa este de 3 ori mai mare decât min_parallel_(index|table)_scan_size, Postgres adaugă un proces de lucru. Numărul de fluxuri de lucru nu se bazează pe costuri. Dependența circulară face dificile implementările complexe. În schimb, planificatorul folosește reguli simple.

În practică, aceste reguli nu sunt întotdeauna potrivite pentru producție, așa că puteți modifica numărul de procese de lucru pentru un anumit tabel: ALTER TABLE ... SET (parallel_workers = N).

De ce nu se utilizează procesarea paralelă?

Pe lângă lista lungă de restricții, există și verificări ale costurilor:

parallel_setup_cost - pentru a evita procesarea paralelă a cererilor scurte. Acest parametru estimează timpul pentru pregătirea memoriei, începerea procesului și schimbul inițial de date.

parallel_tuple_cost: comunicarea dintre lider și muncitori poate fi întârziată proporțional cu numărul de tupluri din procesele de lucru. Acest parametru calculează costul schimbului de date.

Îmbinări în buclă imbricată

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Colectarea are loc în ultima etapă, astfel încât Imbricarea buclă stânga este o operație paralelă. Parallel Index Only Scan a fost introdus doar în versiunea 10. Funcționează similar cu scanarea în serie paralelă. Condiție c_custkey = o_custkey citește o comandă pentru fiecare șir de client. Deci nu este paralel.

Hash Join

Fiecare proces de lucru își creează propriul tabel hash până la PostgreSQL 11. Și dacă există mai mult de patru dintre aceste procese, performanța nu se va îmbunătăți. În noua versiune, tabelul hash este partajat. Fiecare proces de lucru poate folosi WORK_MEM pentru a crea un tabel hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Interogarea 12 de la TPC-H arată clar o conexiune hash paralelă. Fiecare proces de lucru contribuie la crearea unui tabel hash comun.

Merge Join

O îmbinare de îmbinare nu este de natură paralelă. Nu vă faceți griji dacă acesta este ultimul pas al interogării - poate rula în continuare în paralel.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Nodul „Merge Join” este situat deasupra „Gather Merge”. Deci, fuzionarea nu folosește procesarea paralelă. Dar nodul „Scanare index paralel” ajută în continuare cu segmentul part_pkey.

Racordare pe secțiuni

În PostgreSQL 11 racordare pe sectiuni dezactivat implicit: are o programare foarte scumpă. Tabelele cu partiții similare pot fi unite partiție cu partiție. În acest fel, Postgres va folosi tabele hash mai mici. Fiecare conexiune de secțiuni poate fi paralelă.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Principalul lucru este că conexiunea în secțiuni este paralelă numai dacă aceste secțiuni sunt suficient de mari.

Anexă paralelă

Anexă paralelă poate fi folosit în loc de blocuri diferite în fluxuri de lucru diferite. Acest lucru se întâmplă de obicei cu interogările UNION ALL. Dezavantajul este mai puțin paralelism, deoarece fiecare proces de lucrător procesează doar 1 cerere.

Există 2 procese de lucru care rulează aici, deși 4 sunt activate.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Cele mai importante variabile

  • WORK_MEM limitează memoria pe proces, nu doar interogări: work_mem proceselor conexiuni = multă memorie.
  • max_parallel_workers_per_gather — câte procese de lucru va folosi programul executant pentru procesarea paralelă din plan.
  • max_worker_processes — ajustează numărul total de procese de lucru la numărul de nuclee CPU de pe server.
  • max_parallel_workers - la fel, dar pentru procese de lucru paralele.

Rezultatele

Începând cu versiunea 9.6, procesarea paralelă poate îmbunătăți considerabil performanța interogărilor complexe care scanează multe rânduri sau indecși. În PostgreSQL 10, procesarea paralelă este activată implicit. Nu uitați să îl dezactivați pe serverele cu o sarcină mare de lucru OLTP. Scanările secvențiale sau scanările indexate consumă o mulțime de resurse. Dacă nu rulați un raport pentru întregul set de date, puteți îmbunătăți performanța interogării prin simpla adăugare a indecșilor lipsă sau folosind o partiționare adecvată.

referințe

Sursa: www.habr.com

Adauga un comentariu