Paralelní dotazy v PostgreSQL

Paralelní dotazy v PostgreSQL
Moderní CPU mají hodně jader. Aplikace již léta paralelně posílají dotazy do databází. Pokud se jedná o reportovací dotaz proti více řádkům v tabulce, je rychlejší, když používá více CPU a PostgreSQL to umí od verze 9.6.

Implementace funkce paralelního dotazu trvala 3 roky – musel jsem přepsat kód v různých fázích provádění dotazu. PostgreSQL 9.6 představil infrastrukturu pro další vylepšení kódu. V dalších verzích jsou paralelně prováděny další typy dotazů.

Omezení

  • Nepovolujte paralelní spouštění, pokud jsou všechna jádra již zaneprázdněna, jinak se ostatní požadavky zpomalí.
  • A co je nejdůležitější, paralelní zpracování s vysokými hodnotami WORK_MEM využívá hodně paměti – každé spojení hash nebo řazení spotřebovává paměť v množství work_mem.
  • Dotazy OLTP s nízkou latencí nelze urychlit paralelním prováděním. A pokud dotaz vrátí jeden řádek, paralelní zpracování jej pouze zpomalí.
  • Vývojáři rádi používají benchmark TPC-H. Možná máte podobné dotazy na dokonalé paralelní provedení.
  • Paralelně jsou prováděny pouze SELECT dotazy bez predikátového zamykání.
  • Někdy je správné indexování lepší než sekvenční prohledávání tabulek paralelně.
  • Pozastavení dotazu a kurzory nejsou podporovány.
  • Funkce okna a uspořádané množinové agregační funkce nejsou paralelní.
  • V pracovní zátěži I/O nic nezískáte.
  • Neexistují žádné paralelní třídicí algoritmy. Ale dotazy s řazením mohou v některých aspektech běžet paralelně.
  • Nahraďte CTE (WITH ...) vnořeným SELECT, abyste umožnili paralelní zpracování.
  • Zahraniční obaly dat zatím nepodporují paralelní zpracování (ale mohly by!)
  • FULL OUTER JOIN není podporováno.
  • max_rows zakáže paralelní zpracování.
  • Pokud má požadavek funkci, která není označena jako PARALELNÍ BEZPEČNÁ, bude jednovláknový.
  • Úroveň izolace transakcí SERIALIZABLE zakáže paralelní zpracování.

Testovací prostředí

Vývojáři PostgreSQL se pokusili zkrátit dobu odezvy na benchmarkové dotazy TPC-H. Stáhněte si benchmark a přizpůsobte jej PostgreSQL. Jedná se o neoficiální použití benchmarku TPC-H – nikoli pro porovnávání databází nebo hardwaru.

  1. Stáhnout TPC-H_Tools_v2.17.3.zip (nebo novější) z externího TPC.
  2. Přejmenujte makefile.suite na Makefile a upravte, jak je popsáno zde: https://github.com/tvondra/pg_tpch . Zkompilujte kód pomocí make.
  3. Vygenerovat data: ./dbgen -s 10 vytvoří databázi o velikosti 23 GB. To stačí k zobrazení rozdílu ve výkonu mezi paralelními a neparalelními dotazy.
  4. Převést soubory tbl в csv с for и sed.
  5. Klonujte úložiště pg_tpch a zkopírujte soubory csv в pg_tpch/dss/data.
  6. Vytvořte požadavky pomocí příkazu qgen.
  7. Pomocí příkazu načtěte data do databáze ./tpch.sh.

Paralelní sekvenční skenování

Může být rychlejší ne kvůli paralelnímu čtení, ale proto, že data jsou rozptýlena mezi mnoha jádry CPU. V moderních operačních systémech jsou datové soubory PostgreSQL dobře ukládány do mezipaměti. S předčítáním je možné získat z úložiště blok větší, než požaduje démon PG. Výkon dotazů tedy není omezen vstupem a výstupem disku. Spotřebovává cykly CPU, aby:

  • číst řádky jeden po druhém ze stránek tabulky;
  • porovnat hodnoty a podmínky řetězce WHERE.

Spusťte jednoduchý dotaz select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Sekvenční skenování poskytuje příliš mnoho řádků bez agregace, takže dotaz provádí jedno jádro CPU.

Pokud přidáte SUM(), můžete vidět, že dva pracovní postupy pomohou urychlit dotaz:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Paralelní agregace

Uzel "Parallel Seq Scan" vytváří řádky pro částečnou agregaci. Uzel "Partial Aggregate" zkrátí tyto řádky pomocí SUM(). Na konci je počítadlo SUM z každého pracovního postupu shromážděno uzlem Gather.

Konečný výsledek vypočítá uzel „Finalize Aggregate“. Pokud máte vlastní agregační funkce, nezapomeňte je označit jako „paralelně bezpečné“.

Počet pracovních procesů

Počet pracovních procesů lze zvýšit bez restartování serveru:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Co se tam děje? Existuje 2krát více pracovních procesů a dotaz je pouze 1,6599krát rychlejší. Zajímavé jsou výpočty. Měli jsme 2 pracovní procesy a 1 vedoucího. Po změně to bylo 4+1.

Naše maximální zrychlení z paralelního zpracování: 5/3 = 1,66(6) krát.

Jak to funguje?

Procesy

Provedení dotazu vždy začíná hlavním procesem. Vedoucí dělá vše neparalelně a některé paralelní zpracování. Ostatní procesy, které provádějí stejné požadavky, se nazývají pracovní procesy. Paralelní zpracování využívá infrastrukturu dynamické pracovní postupy na pozadí (od verze 9.4). Protože jiné části PostgreSQL používají procesy spíše než vlákna, dotaz se 3 pracovními procesy by mohl být 4krát rychlejší než tradiční zpracování.

Interakce

Pracovní procesy komunikují s vedoucím prostřednictvím fronty zpráv (na základě sdílené paměti). Každý proces má 2 fronty: pro chyby a pro n-tice.

Kolik pracovních procesů potřebujete?

Minimální limit nastavuje parametr max_parallel_workers_per_gather. Poté vykonavatel požadavku převezme pracovní procesy z fondu, omezené parametrem max_parallel_workers size. Posledním omezením je max_worker_processes, což je celkový počet procesů na pozadí.

Pokud nebylo možné alokovat pracovní proces, bude zpracování jednoprocesové.

Plánovač dotazů může omezit pracovní postupy v závislosti na velikosti tabulky nebo indexu. Existují pro to možnosti. min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Pokaždé je tabulka 3x větší než min_parallel_(index|table)_scan_size, Postgres přidává pracovní proces. Počet pracovních postupů není založen na nákladech. Kruhová závislost ztěžuje složité implementace. Místo toho plánovač používá jednoduchá pravidla.

V praxi tato pravidla nejsou vždy vhodná pro výrobu, takže je možné změnit počet pracovních procesů u konkrétní tabulky: ALTER TABLE ... SET (parallel_workers = N).

Proč se nepoužívá paralelní zpracování?

Kromě dlouhého seznamu omezení existují také kontroly nákladů:

parallel_setup_cost - aby se zabránilo paralelnímu zpracování krátkých požadavků. Tento parametr odhaduje dobu přípravy paměti, spuštění procesu a počáteční výměnu dat.

parallel_tuple_cost: komunikace mezi vedoucím a pracovníky může být zpožděna úměrně počtu ntic z pracovních procesů. Tento parametr zohledňuje náklady na výměnu dat.

Nested Loop Joins – Nested Loop Join

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Shromažďování probíhá v posledním kroku, takže vnořená smyčka levého spojení je paralelní operace. Parallel Index Only Scan bylo zavedeno až ve verzi 10. Funguje podobně jako paralelní sekvenční skenování. Stav c_custkey = o_custkey přečte jednu objednávku pro každý řádek klienta. Takže to není paralelní.

Hash Join – Hash Join

Každý pracovní proces si před PostgreSQL 11 vytváří svou vlastní hashovací tabulku. A pokud je těchto procesů více než čtyři, výkon se nezlepší. V nové verzi je hash tabulka sdílená. Každý pracovní proces může použít WORK_MEM k vytvoření hash tabulky.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Dotaz 12 z TPC-H ilustruje paralelní spojení hash. Každý pracovní proces se podílí na vytváření sdílené hashovací tabulky.

Sloučit Připojte se

Sloučené spojení je svou podstatou neparalelní. Nebojte se, pokud je to poslední krok dotazu – stále může běžet paralelně.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Uzel "Merge Join" je nad "Gather Merge". Sloučení tedy nepoužívá paralelní zpracování. Ale uzel „Parallel Index Scan“ stále pomáhá se segmentem part_pkey.

Spojení sekce

V PostgreSQL 11 spojení po sekcích ve výchozím nastavení zakázáno: má velmi drahé plánování. Tabulky s podobným rozdělením lze spojovat po částech. Díky tomu bude Postgres používat menší hashovací tabulky. Každé spojení sekcí může být paralelní.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Hlavní věc je, že spojení po sekcích je paralelní pouze v případě, že jsou tyto sekce dostatečně velké.

Paralelní příloha

Paralelní příloha lze použít místo různých bloků v různých pracovních postupech. To se obvykle děje s dotazy UNION ALL. Nevýhodou je menší souběžnost, protože každý pracovní proces zpracovává pouze 1 požadavek.

Jsou zde spuštěny 2 pracovní procesy, i když jsou povoleny 4.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Nejdůležitější proměnné

  • WORK_MEM omezuje množství paměti na proces, nejen požadavky: work_mem procesy připojení = hodně paměti.
  • max_parallel_workers_per_gather - kolik pracovních procesů spouštěný program použije pro paralelní zpracování z plánu.
  • max_worker_processes - přizpůsobí celkový počet pracovních procesů počtu jader CPU na serveru.
  • max_parallel_workers - totéž, ale pro paralelní pracovní postupy.

Výsledky

Počínaje verzí 9.6 může paralelní zpracování výrazně zlepšit výkon složitých dotazů, které skenují mnoho řádků nebo indexů. V PostgreSQL 10 je paralelní zpracování standardně povoleno. Nezapomeňte jej zakázat na serverech s velkou zátěží OLTP. Sekvenční nebo indexové prohledávání jsou velmi náročné na zdroje. Pokud nevytváříte přehledy o celé datové sadě, lze dotazy zvýšit výkon jednoduše přidáním chybějících indexů nebo použitím správného rozdělení.

reference

Zdroj: www.habr.com

Přidat komentář