Modern CPUs hu vill Cores. Zënter Joeren hunn Uwendungen parallel Ufroen un Datenbanken geschéckt. Wann et eng Berichtsufro op verschidde Reihen an enger Tabell ass, leeft se méi séier wann Dir verschidde CPUs benotzt, an PostgreSQL konnt dëst zënter der Versioun 9.6 maachen.
Et huet 3 Joer gedauert fir d'Parallel Query Feature ëmzesetzen - mir hu missen de Code a verschiddene Stadien vun der Ufro Ausféierung iwwerschreiwe. PostgreSQL 9.6 huet Infrastruktur agefouert fir de Code weider ze verbesseren. A spéider Versioune ginn aner Aarte vu Ufroen parallel ausgefouert.
Restriktiounen
Aktivéiert keng parallel Ausféierung wann all Käre scho beschäftegt sinn, soss ginn aner Ufroe verlangsamt.
Virun allem, parallel Veraarbechtung mat héije WORK_MEM Wäerter benotzt vill Erënnerung - all Hash join oder Zort hëlt work_mem Erënnerung.
Niddereg latency OLTP Ufroen kënnen net duerch parallel Ausféierung beschleunegt ginn. A wann d'Ufro eng Zeil zréckkënnt, wäert d'Parallelveraarbechtung et nëmmen verlangsamen.
Entwéckler gär den TPC-H Benchmark ze benotzen. Vläicht hutt Dir ähnlech Ufroe fir perfekt parallel Ausféierung.
Nëmme SELECT Ufroen ouni Prädikatverschloss ginn parallel ausgefouert.
Heiansdo ass richteg Indexéierung besser wéi sequentiell Dëschscannen am Parallelmodus.
Pausen Ufroen a Cursoren ginn net ënnerstëtzt.
Fënsterfunktiounen a bestallt aggregéiert Funktiounen sinn net parallel.
Dir kritt näischt an der I/O Aarbechtslaascht.
Et gi keng parallel Zortéieren Algorithmen. Awer Ufroe mat Zorte kënnen an e puer Aspekter parallel ausgefouert ginn.
Ersetzen CTE (MAT ...) mat engem nestéierten SELECT fir parallel Veraarbechtung z'erméiglechen.
Drëtt Partei Datewrapper ënnerstëtzen nach keng parallel Veraarbechtung (awer si kéinten!)
FULL OUTER JOIN gëtt net ënnerstëtzt.
max_rows deaktivéiert parallel Veraarbechtung.
Wann eng Ufro eng Funktioun huet déi net PARALLEL SAFE markéiert ass, gëtt se eenzeg thread.
De SERIALIZABLE Transaktiounsisolatiounsniveau deaktivéiert parallel Veraarbechtung.
Test Ëmfeld
PostgreSQL Entwéckler hu probéiert d'Äntwertzäit vun TPC-H Benchmark Ufroen ze reduzéieren. Luet de Benchmark erof an passt et op PostgreSQL un. Dëst ass eng inoffiziell Notzung vum TPC-H Benchmark - net fir Datebank oder Hardware Verglach.
Luet TPC-H_Tools_v2.17.3.zip erof (oder méi nei Versioun) vun TPC offsite.
Den Numm makefile.suite op Makefile ëmbenennen an änneren wéi hei beschriwwen: https://github.com/tvondra/pg_tpch . Kompiléiert de Code mam Make Kommando.
Daten generéieren: ./dbgen -s 10 schaaft eng 23 GB Datebank. Dëst ass genuch fir den Ënnerscheed an der Leeschtung vu parallelen an net-parallelle Ufroen ze gesinn.
Konvertéieren Dateien tbl в csv с for и sed.
Klon de Repository pg_tpch a kopéiert d'Dateien csv в pg_tpch/dss/data.
Erstellt Ufroe mat engem Kommando qgen.
Lued Daten an d'Datebank mam Kommando ./tpch.sh.
Parallel sequenziell Scannen
Et ka méi séier sinn net wéinst parallele Liesen, awer well d'Donnéeën iwwer vill CPU-Cores verbreet sinn. A modernen Betribssystemer gi PostgreSQL Datendateien gutt cache. Mat viruliesen ass et méiglech e gréissere Block aus der Späichere ze kréien wéi de PG Daemon freet. Dofir ass d'Ufroleistung net limitéiert duerch Disk I / O. Et verbraucht CPU Zyklen fir:
liesen Zeile ee bei enger Zäit vun Dësch Säiten;
vergläichen String Wäerter a Konditiounen WHERE.
Loosst eis eng einfach Ufro lafen select:
tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms
De sequentielle Scan produzéiert ze vill Reihen ouni Aggregatioun, sou datt d'Ufro vun engem eenzegen CPU Kär ausgefouert gëtt.
Wann Dir derbäi SUM(), Dir kënnt gesinn datt zwee Workflows hëllefen d'Ufro ze beschleunegen:
explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms
Parallel Aggregatioun
De Parallel Seq Scan Node produzéiert Reihen fir deelweis Aggregatioun. Den "Partial Aggregate" Node trimt dës Linnen mat SUM(). Um Enn gëtt de SUM Konter vun all Aarbechterprozess vum "Gather" Node gesammelt.
D'Finale Resultat gëtt vum Node "Finalize Aggregate" berechent. Wann Dir Är eege Aggregatiounsfunktiounen hutt, vergiesst net se als "parallel sécher" ze markéieren.
Zuel vun Aarbechter Prozesser
D'Zuel vun den Aarbechterprozesser kann erhéicht ginn ouni de Server nei ze starten:
explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms
Wat ass hei lass? Et waren 2 Mol méi Aarbechtsprozesser, an d'Demande gouf nëmmen 1,6599 Mol méi séier. D'Berechnungen sinn interessant. Mir haten 2 Aarbechter Prozesser an 1 Leader. No der Ännerung gouf et 4+1.
Eis maximal Vitesse vun parallel Veraarbechtung: 5/3 = 1,66 (6) Mol.
Wéi heescht et schaffen?
D'Prozesser
Ufro Ausféierung fänkt ëmmer mam féierende Prozess un. De Leader mécht alles net-parallel an e puer parallel Veraarbechtung. Aner Prozesser déi déiselwecht Ufroe maachen, ginn Aarbechterprozesser genannt. Parallel Veraarbechtung benotzt Infrastruktur dynamesch Hannergrond Aarbechter Prozesser (aus Versioun 9.4). Well aner Deeler vu PostgreSQL Prozesser benotzen anstatt Threads, kann eng Ufro mat 3 Aarbechterprozesser 4 Mol méi séier sinn wéi traditionell Veraarbechtung.
Interaktioun
Aarbechter Prozesser kommunizéieren mam Leader duerch eng Message Schlaang (baséiert op gemeinsam Erënnerung). All Prozess huet 2 Schlaangen: fir Feeler a fir tuples.
All Kéier den Dësch ass 3 Mol méi grouss wéi min_parallel_(index|table)_scan_size, Postgres füügt en Aarbechterprozess un. D'Zuel vun de Workflows baséiert net op Käschten. Kreesfërmeg Ofhängegkeet mécht komplex Implementatiounen schwéier. Amplaz benotzt de Planer einfache Regelen.
An der Praxis sinn dës Reegelen net ëmmer gëeegent fir d'Produktioun, sou datt Dir d'Zuel vun den Aarbechterprozesser fir eng spezifesch Dësch ännere kënnt: ALTER TABLE ... SET (parallel_workers = N).
Firwat gëtt parallel Veraarbechtung net benotzt?
Zousätzlech zu der laanger Lëscht vu Restriktiounen, ginn et och Käschtenchecken:
parallel_setup_cost - fir parallel Veraarbechtung vu kuerzen Ufroen ze vermeiden. Dëse Parameter schätzt d'Zäit fir Erënnerung ze preparéieren, de Prozess unzefänken an den initialen Datenaustausch.
parallel_tuple_cost: Kommunikatioun tëscht dem Leader an Aarbechter kann am Verhältnis zu der Zuel vun tuples aus Aarbecht Prozesser verspéiten ginn. Dëse Parameter berechent d'Käschte vum Datenaustausch.
Nested Loop Joins
PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.
explain (costs off) select c_custkey, count(o_orderkey)
from customer left outer join orders on
c_custkey = o_custkey and o_comment not like '%special%deposits%'
group by c_custkey;
QUERY PLAN
--------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: customer.c_custkey
-> Gather Merge
Workers Planned: 4
-> Partial GroupAggregate
Group Key: customer.c_custkey
-> Nested Loop Left Join
-> Parallel Index Only Scan using customer_pkey on customer
-> Index Scan using idx_orders_custkey on orders
Index Cond: (customer.c_custkey = o_custkey)
Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
D'Kollektioun geschitt op der leschter Etapp, sou Nested Loop Left Join ass eng parallel Operatioun. Parallel Index Nëmme Scan war nëmmen an der Versioun agefouert 10. Et Wierker ähnlech ze parallel Serien Scannen. Zoustand c_custkey = o_custkey liest eng Bestellung pro Client String. Also et ass net parallel.
Hash Join
All Aarbechter Prozess schaaft seng eege Hash Dësch bis PostgreSQL 11. A wann et méi wéi véier vun dëse Prozesser, Leeschtung wäert net verbesseren. An der neier Versioun gëtt den Hash-Tabelle gedeelt. All Aarbechterprozess kann WORK_MEM benotzen fir en Hash-Table ze kreéieren.
select
l_shipmode,
sum(case
when o_orderpriority = '1-URGENT'
or o_orderpriority = '2-HIGH'
then 1
else 0
end) as high_line_count,
sum(case
when o_orderpriority <> '1-URGENT'
and o_orderpriority <> '2-HIGH'
then 1
else 0
end) as low_line_count
from
orders,
lineitem
where
o_orderkey = l_orderkey
and l_shipmode in ('MAIL', 'AIR')
and l_commitdate < l_receiptdate
and l_shipdate < l_commitdate
and l_receiptdate >= date '1996-01-01'
and l_receiptdate < date '1996-01-01' + interval '1' year
group by
l_shipmode
order by
l_shipmode
LIMIT 1;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
-> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
Group Key: lineitem.l_shipmode
-> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
Group Key: lineitem.l_shipmode
-> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
Sort Key: lineitem.l_shipmode
Sort Method: external merge Disk: 2304kB
Worker 0: Sort Method: external merge Disk: 2064kB
Worker 1: Sort Method: external merge Disk: 2384kB
Worker 2: Sort Method: external merge Disk: 2264kB
Worker 3: Sort Method: external merge Disk: 2336kB
-> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
Rows Removed by Filter: 11934691
-> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
Buckets: 65536 Batches: 256 Memory Usage: 3840kB
-> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
Planning Time: 0.977 ms
Execution Time: 7923.770 ms
Query 12 vun TPC-H weist kloer eng parallel Hash Verbindung. All Aarbechterprozess dréit zur Schafung vun engem gemeinsamen Hash-Table bäi.
Merge Join
E Fusiounsverband ass net-parallell an der Natur. Maacht Iech keng Suergen, wann dëst de leschte Schrëtt vun der Ufro ass - et kann nach ëmmer parallel lafen.
-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from part, supplier, partsupp, nation, region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and p_size = 36
and p_type like '%BRASS'
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'AMERICA'
and ps_supplycost = (
select
min(ps_supplycost)
from partsupp, supplier, nation, region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'AMERICA'
)
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
QUERY PLAN
----------------------------------------------------------------------------------------------------------
Limit
-> Sort
Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
-> Merge Join
Merge Cond: (part.p_partkey = partsupp.ps_partkey)
Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
-> Gather Merge
Workers Planned: 4
-> Parallel Index Scan using <strong>part_pkey</strong> on part
Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
-> Materialize
-> Sort
Sort Key: partsupp.ps_partkey
-> Nested Loop
-> Nested Loop
Join Filter: (nation.n_regionkey = region.r_regionkey)
-> Seq Scan on region
Filter: (r_name = 'AMERICA'::bpchar)
-> Hash Join
Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
-> Seq Scan on supplier
-> Hash
-> Seq Scan on nation
-> Index Scan using idx_partsupp_suppkey on partsupp
Index Cond: (ps_suppkey = supplier.s_suppkey)
SubPlan 1
-> Aggregate
-> Nested Loop
Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
-> Seq Scan on region region_1
Filter: (r_name = 'AMERICA'::bpchar)
-> Nested Loop
-> Nested Loop
-> Index Scan using idx_partsupp_partkey on partsupp partsupp_1
Index Cond: (part.p_partkey = ps_partkey)
-> Index Scan using supplier_pkey on supplier supplier_1
Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
-> Index Scan using nation_pkey on nation nation_1
Index Cond: (n_nationkey = supplier_1.s_nationkey)
De "Merge Join" Node läit iwwer dem "Gather Merge". Also Fusioun benotzt keng parallel Veraarbechtung. Awer de "Parallel Index Scan" Node hëlleft nach ëmmer mam Segment part_pkey.
Verbindung duerch Sektiounen
An PostgreSQL 11 Verbindung duerch Sektiounen Par défaut behënnert: et huet ganz deier Zäitplang. Dëscher mat enger ähnlecher Partitionéierung kënne Partition duerch Partition ugeschloss ginn. Op dës Manéier wäert Postgres méi kleng Hash Dëscher benotzen. All Verbindung vu Sektiounen kann parallel sinn.
tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
QUERY PLAN
---------------------------------------------------
Append
-> Hash Join
Hash Cond: (t2.b = t1.a)
-> Seq Scan on prt2_p1 t2
Filter: ((b >= 0) AND (b <= 10000))
-> Hash
-> Seq Scan on prt1_p1 t1
Filter: (b = 0)
-> Hash Join
Hash Cond: (t2_1.b = t1_1.a)
-> Seq Scan on prt2_p2 t2_1
Filter: ((b >= 0) AND (b <= 10000))
-> Hash
-> Seq Scan on prt1_p2 t1_1
Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
QUERY PLAN
-----------------------------------------------------------
Gather
Workers Planned: 4
-> Parallel Append
-> Parallel Hash Join
Hash Cond: (t2_1.b = t1_1.a)
-> Parallel Seq Scan on prt2_p2 t2_1
Filter: ((b >= 0) AND (b <= 10000))
-> Parallel Hash
-> Parallel Seq Scan on prt1_p2 t1_1
Filter: (b = 0)
-> Parallel Hash Join
Hash Cond: (t2.b = t1.a)
-> Parallel Seq Scan on prt2_p1 t2
Filter: ((b >= 0) AND (b <= 10000))
-> Parallel Hash
-> Parallel Seq Scan on prt1_p1 t1
Filter: (b = 0)
Den Haapt Saach ass datt d'Verbindung an de Sektiounen nëmmen parallel ass wann dës Sektiounen grouss genuch sinn.
Parallel Anhang
Parallel Anhang kann amplaz vu verschiddene Blocken a verschiddene Workflows benotzt ginn. Dëst geschitt normalerweis mat UNION ALL Ufroen. Den Nodeel ass manner Parallelismus, well all Aarbechterprozess nëmmen 1 Ufro veraarbecht.
Hei lafen 2 Aarbechterprozesser, obwuel 4 aktivéiert sinn.
tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
QUERY PLAN
------------------------------------------------------------------------------------------------
Gather
Workers Planned: 2
-> Parallel Append
-> Aggregate
-> Seq Scan on lineitem
Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
-> Aggregate
-> Seq Scan on lineitem lineitem_1
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Déi wichtegst Verännerlechen
WORK_MEM limitéiert Erënnerung pro Prozess, net nëmmen Ufroen: work_mem Prozesser Verbindungen = vill Erënnerung.
max_parallel_workers_per_gather - wéivill Aarbechterprozesser den ausféierende Programm fir parallel Veraarbechtung vum Plang benotzt.
max_worker_processes - passt d'Gesamtzuel vun den Aarbechterprozesser un d'Zuel vun den CPU-Kären um Server un.
Zënter der Versioun 9.6 kann d'Parallelveraarbechtung d'Performance vu komplexe Ufroen staark verbesseren, déi vill Reihen oder Indexen scannen. Am PostgreSQL 10 ass parallel Veraarbechtung als Standard aktivéiert. Denkt drun et op Serveren mat enger grousser OLTP Aarbechtslaascht auszeschalten. Sequentiell Scans oder Indexscannen verbrauchen vill Ressourcen. Wann Dir kee Bericht iwwer de ganzen Dataset leeft, kënnt Dir d'Queryleistung verbesseren andeems Dir einfach fehlend Indexen derbäigesat oder déi richteg Partitionéierung benotzt.