Paralelné dotazy v PostgreSQL

Paralelné dotazy v PostgreSQL
Moderné CPU majú veľa jadier. Aplikácie už roky paralelne posielajú dopyty do databáz. Ak je to dotaz na zostavu na viacerých riadkoch v tabuľke, beží rýchlejšie pri použití viacerých CPU a PostgreSQL to dokáže od verzie 9.6.

Implementácia funkcie paralelného dotazu trvala 3 roky – museli sme prepísať kód v rôznych fázach vykonávania dotazu. PostgreSQL 9.6 zaviedol infraštruktúru na ďalšie vylepšenie kódu. V ďalších verziách sa paralelne vykonávajú iné typy dotazov.

Obmedzenie

  • Nepovoľujte paralelné vykonávanie, ak sú všetky jadrá už zaneprázdnené, inak sa spomalia ďalšie požiadavky.
  • Najdôležitejšie je, že paralelné spracovanie s vysokými hodnotami WORK_MEM využíva veľa pamäte – každé spojenie alebo triedenie hash zaberá pamäť work_mem.
  • Dotazy OLTP s nízkou latenciou nemožno urýchliť paralelným vykonávaním. A ak dotaz vráti jeden riadok, paralelné spracovanie ho iba spomalí.
  • Vývojári radi používajú benchmark TPC-H. Možno máte podobné otázky na dokonalé paralelné vykonávanie.
  • Paralelne sa vykonávajú iba SELECT dotazy bez predikátového uzamknutia.
  • Niekedy je správne indexovanie lepšie ako sekvenčné skenovanie tabuliek v paralelnom režime.
  • Pozastavenie dotazov a kurzorov nie je podporované.
  • Funkcie okien a usporiadané množinové agregačné funkcie nie sú paralelné.
  • V pracovnom zaťažení I/O nič nezískate.
  • Neexistujú žiadne paralelné triediace algoritmy. Dotazy s triedením však možno v niektorých aspektoch vykonávať paralelne.
  • Nahraďte CTE (WITH ...) vnoreným SELECT, aby ste umožnili paralelné spracovanie.
  • Obaly údajov tretích strán zatiaľ nepodporujú paralelné spracovanie (ale mohli by!)
  • FULL OUTER JOIN nie je podporované.
  • max_rows zakáže paralelné spracovanie.
  • Ak má dotaz funkciu, ktorá nie je označená ako PARALELNÁ BEZPEČNOSŤ, bude mať jedno vlákno.
  • Úroveň izolácie transakcií SERIALIZABLE zakáže paralelné spracovanie.

Testovacie prostredie

Vývojári PostgreSQL sa pokúsili skrátiť čas odozvy benchmarkových dotazov TPC-H. Stiahnite si benchmark a prispôsobiť ho PostgreSQL. Toto je neoficiálne použitie benchmarku TPC-H – nie na porovnanie databázy alebo hardvéru.

  1. Stiahnite si TPC-H_Tools_v2.17.3.zip (alebo novšiu verziu) z offsite TPC.
  2. Premenujte makefile.suite na Makefile a zmeňte podľa popisu tu: https://github.com/tvondra/pg_tpch . Kompilujte kód pomocou príkazu make.
  3. Generovať údaje: ./dbgen -s 10 vytvorí 23 GB databázu. To stačí na to, aby ste videli rozdiel vo výkone paralelných a neparalelných dopytov.
  4. Konvertovať súbory tbl в csv с for и sed.
  5. Naklonujte úložisko pg_tpch a skopírujte súbory csv в pg_tpch/dss/data.
  6. Vytvorte dopyty pomocou príkazu qgen.
  7. Načítajte údaje do databázy pomocou príkazu ./tpch.sh.

Paralelné sekvenčné skenovanie

Môže to byť rýchlejšie nie kvôli paralelnému čítaniu, ale kvôli tomu, že dáta sú rozložené medzi mnohými jadrami CPU. V moderných operačných systémoch sa dátové súbory PostgreSQL dobre ukladajú do vyrovnávacej pamäte. S čítaním dopredu je možné získať väčší blok z úložiska, ako požaduje démon PG. Výkon dotazov teda nie je obmedzený diskovými I/O. Spotrebúva cykly CPU na:

  • čítať riadky jeden po druhom zo stránok tabuľky;
  • porovnajte hodnoty reťazca a podmienky WHERE.

Spustíme jednoduchý dotaz select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Sekvenčné skenovanie vytvára príliš veľa riadkov bez agregácie, takže dotaz je vykonávaný jedným jadrom CPU.

Ak sa pridá SUM(), môžete vidieť, že dva pracovné postupy pomôžu urýchliť dotaz:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Paralelná agregácia

Uzol Parallel Seq Scan vytvára riadky na čiastočnú agregáciu. Uzol "Partial Aggregate" orezáva tieto riadky pomocou SUM(). Nakoniec počítadlo SUM z každého pracovného procesu zhromažďuje uzol „Zhromažďovať“.

Konečný výsledok vypočíta uzol „Finalize Aggregate“. Ak máte vlastné agregačné funkcie, nezabudnite ich označiť ako „paralelné bezpečné“.

Počet pracovných procesov

Počet pracovných procesov je možné zvýšiť bez reštartovania servera:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Čo sa tu deje? Pracovných procesov bolo 2-krát viac a požiadavka sa zrýchlila iba 1,6599-krát. Zaujímavé sú výpočty. Mali sme 2 pracovné procesy a 1 vedúceho. Po zmene to bolo 4+1.

Naše maximálne zrýchlenie z paralelného spracovania: 5/3 = 1,66 (6) krát.

Ako to funguje?

procesy

Realizácia požiadavky vždy začína vedúcim procesom. Vedúci robí všetko neparalelné a nejaké paralelné spracovanie. Ostatné procesy, ktoré vykonávajú rovnaké požiadavky, sa nazývajú pracovné procesy. Paralelné spracovanie využíva infraštruktúru dynamické procesy pracovníkov na pozadí (od verzie 9.4). Keďže iné časti PostgreSQL používajú procesy, nie vlákna, dopyt s 3 pracovnými procesmi môže byť 4-krát rýchlejší ako tradičné spracovanie.

Interakcie

Pracovné procesy komunikujú s vedúcim prostredníctvom frontu správ (na základe zdieľanej pamäte). Každý proces má 2 fronty: pre chyby a pre n-tice.

Koľko pracovných postupov je potrebných?

Minimálna hranica je určená parametrom max_parallel_workers_per_gather. Spúšťač požiadaviek potom prevezme pracovné procesy z oblasti obmedzenej parametrom max_parallel_workers size. Posledným obmedzením je max_worker_processes, teda celkový počet procesov na pozadí.

Ak nebolo možné prideliť pracovný proces, spracovanie bude jednoprocesové.

Plánovač dotazov môže znížiť pracovné toky v závislosti od veľkosti tabuľky alebo indexu. Sú na to parametre min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Zakaždým, keď je tabuľka 3-krát väčšia ako min_parallel_(index|table)_scan_size, Postgres pridáva pracovný proces. Počet pracovných postupov nie je založený na nákladoch. Kruhová závislosť sťažuje zložité implementácie. Namiesto toho používa plánovač jednoduché pravidlá.

V praxi tieto pravidlá nie sú vždy vhodné pre výrobu, takže počet pracovných procesov pre konkrétnu tabuľku môžete zmeniť: ALTER TABLE ... SET (parallel_workers = N).

Prečo sa nepoužíva paralelné spracovanie?

Okrem dlhého zoznamu obmedzení existujú aj kontroly nákladov:

parallel_setup_cost - vyhnúť sa paralelnému spracovaniu krátkych požiadaviek. Tento parameter odhaduje čas na prípravu pamäte, spustenie procesu a počiatočnú výmenu údajov.

parallel_tuple_cost: komunikácia medzi vedúcim a pracovníkmi môže byť oneskorená úmerne počtu n-tic z pracovných procesov. Tento parameter vypočítava náklady na výmenu údajov.

Vnorené slučkové spojenia

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Zhromažďovanie prebieha v poslednej fáze, takže Nested Loop Left Join je paralelná operácia. Parallel Index Only Scan bol zavedený až vo verzii 10. Funguje podobne ako paralelné sériové skenovanie. Podmienka c_custkey = o_custkey prečíta jednu objednávku na klientsky reťazec. Takže to nie je paralelné.

Hash Pripojte sa

Každý pracovný proces vytvára svoju vlastnú hašovaciu tabuľku až do PostgreSQL 11. A ak je týchto procesov viac ako štyri, výkon sa nezlepší. V novej verzii je hašovacia tabuľka zdieľaná. Každý pracovný proces môže použiť WORK_MEM na vytvorenie hašovacej tabuľky.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Dotaz 12 z TPC-H jasne ukazuje paralelné hash pripojenie. Každý pracovný proces prispieva k vytvoreniu spoločnej hašovacej tabuľky.

Zlúčiť Pripojiť sa

Zlúčené spojenie je svojou povahou neparalelné. Nerobte si starosti, ak je toto posledný krok dotazu – stále môže prebiehať paralelne.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Uzol "Merge Join" sa nachádza nad "Gather Merge". Takže zlučovanie nepoužíva paralelné spracovanie. Ale uzol „Parallel Index Scan“ stále pomáha so segmentom part_pkey.

Spojenie podľa sekcií

V PostgreSQL 11 spojenie po sekciách predvolene vypnuté: má veľmi drahé plánovanie. Tabuľky s podobným rozdelením možno spájať oddiel po oddiele. Takto bude Postgres používať menšie hašovacie tabuľky. Každé spojenie sekcií môže byť paralelné.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Hlavná vec je, že spojenie v sekciách je paralelné iba vtedy, ak sú tieto sekcie dostatočne veľké.

Paralelná príloha

Paralelná príloha možno použiť namiesto rôznych blokov v rôznych pracovných postupoch. To sa zvyčajne stáva pri dopytoch UNION ALL. Nevýhodou je menší paralelizmus, pretože každý pracovný proces spracuje iba 1 požiadavku.

Bežia tu 2 pracovné procesy, hoci 4 sú povolené.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Najdôležitejšie premenné

  • WORK_MEM obmedzuje pamäť na proces, nielen na dotazy: work_mem procesy spojenia = veľa pamäte.
  • max_parallel_workers_per_gather — koľko pracovných procesov vykonávací program použije na paralelné spracovanie z plánu.
  • max_worker_processes — prispôsobí celkový počet pracovných procesov počtu jadier CPU na serveri.
  • max_parallel_workers - to isté, ale pre paralelné pracovné procesy.

Výsledky

Od verzie 9.6 môže paralelné spracovanie výrazne zlepšiť výkon zložitých dotazov, ktoré skenujú veľa riadkov alebo indexov. V PostgreSQL 10 je paralelné spracovanie štandardne povolené. Nezabudnite ho zakázať na serveroch s veľkým pracovným zaťažením OLTP. Sekvenčné skenovanie alebo skenovanie indexu spotrebuje veľa zdrojov. Ak nespúšťate zostavu pre celú množinu údajov, môžete zlepšiť výkon dotazu jednoduchým pridaním chýbajúcich indexov alebo použitím správneho rozdelenia.

referencie

Zdroj: hab.com

Pridať komentár