Паралельныя запыты ў PostgreSQL

Паралельныя запыты ў PostgreSQL
У сучасных ЦП вельмі шмат ядраў. Гадамі прыкладанні пасылалі запыты ў базы дадзеных раўналежна. Калі гэта справаздачны запыт да мноства радкоў у табліцы, ён выконваецца хутчэй, калі задзейнічае некалькі ЦП, і ў PostgreSQL гэта магчыма, пачынальна з версіі 9.6.

Спатрэбілася 3 гады, каб рэалізаваць функцыю паралельных запытаў - прыйшлося перапісаць код на розных этапах выканання запытаў. У PostgreSQL 9.6 з'явілася інфраструктура для наступнага паляпшэння кода. У наступных версіях і іншыя тыпы запытаў выконваюцца раўналежна.

Абмежаванні

  • Не ўключайце раўналежнае выкананне, калі ўсе ядры ўжо занятыя, інакш іншыя запыты будуць тармазіць.
  • Самае галоўнае, паралельная апрацоўка з высокімі значэннямі WORK_MEM задзейнічае шмат памяці – кожнае хэш-злучэнне або сартаванне займаюць памяць у аб'ёме work_mem.
  • Запыты OLTP з нізкай затрымкай немагчыма паскорыць раўналежным выкананнем. А калі запыт вяртае адзін радок, раўналежная апрацоўка яго толькі запаволіць.
  • Распрацоўнікі любяць выкарыстоўваць бенчмарк TPC-H. Можа, у вас ёсць падобныя запыты для ідэальнага паралельнага выканання.
  • Толькі запыты SELECT без прэдыкатнай блакіроўкі выконваюцца паралельна.
  • Часам правільная індэксацыя лепш паслядоўнага сканавання табліцы ў раўналежным рэжыме.
  • Прыпыненне запытаў і курсоры не падтрымліваюцца.
  • Аконныя функцыі і агрэгатныя функцыі ўпарадкаваных набораў не раўналежныя.
  • Вы нічога не выйграваеце ў працоўнай нагрузцы ўводу-вываду.
  • Паралельных алгарытмаў сартавання не бывае. Але запыты з сартаваннямі могуць выконвацца паралельна ў некаторых аспектах.
  • Заменіце CTE (WITH …) на ўкладзены SELECT, каб уключыць паралельную апрацоўку.
  • Абгорткі іншых дадзеных пакуль не падтрымліваюць раўналежную апрацоўку (а маглі б!)
  • FULL OUTER JOIN не падтрымліваецца.
  • max_rows адключае паралельную апрацоўку.
  • Калі ў запыце ёсць функцыя, не пазначаная як PARALLEL SAFE, ён будзе аднаструменным.
  • Узровень ізаляцыі транзакцыі SERIALIZABLE адключае раўналежную апрацоўку.

Тэставое асяроддзе

Распрацоўнікі PostgreSQL паспрабавалі зрэзаць час водгуку запытаў бенчмарку TPC-H. Загрузіце бенчмарк і адаптуйце яго да PostgreSQL. Гэта неафіцыйнае выкарыстанне бенчмарку TPC-H - не для параўнання баз дадзеных або абсталявання.

  1. Загрузіце TPC-H_Tools_v2.17.3.zip (або версію навей) з афсайта TPC.
  2. Перайменуйце makefile.suite у Makefile і зменіце, як апісана тут: https://github.com/tvondra/pg_tpch . Скампілюйце код камандай make.
  3. Згенеруйце дадзеныя: ./dbgen -s 10 стварае базу даных на 23 ГБ. Гэтага хопіць, каб убачыць розніцу ў прадукцыйнасці паралельных і непаралельных запытаў.
  4. Канвертуйце файлы tbl в csv с for и sed.
  5. Клануйце рэпазітар pg_tpch і скапіруйце файлы csv в pg_tpch/dss/data.
  6. Стварыце запыты камандай qgen.
  7. Загрузіце дадзеныя ў базу камандай ./tpch.sh.

Паралельнае паслядоўнае сканіраванне

Яно можа быць хутчэй не з-за раўналежнага чытання, а таму што дадзеныя раскіданыя па шматлікіх ядрах ЦП. У сучасных АС файлы дадзеных PostgreSQL добра кэшуюцца. З папераджальным чытаннем можна атрымаць са сховішча блок больш, чым запытвае дэман PG. Таму прадукцыйнасць запыту не абмежавана ўводам-вывадам дыска. Ён спажывае цыклы ЦП, каб:

  • чытаць радкі па адной са старонак табліцы;
  • параўноўваць значэння радкоў і ўмовы WHERE.

Выканаем просты запыт select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Паслядоўны скан дае занадта шмат радкоў без агрэгацыі, так што запыт выконваецца адным ядром ЦП.

Калі дадаць SUM(), відаць, што два працоўныя працэсы дапамогуць паскорыць запыт:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Паралельная агрэгацыя

Нода "Parallel Seq Scan" вырабляе радкі для частковай агрэгацыі. Нода “Partial Aggregate” уразае гэтыя радкі з дапамогай SUM(). У канцы лічыльнік SUM з кожнага працоўнага працэсу збіраецца нодай "Gather".

Выніковы вынік разлічваецца нодай "Finalize Aggregate". Калі ў вас свае функцыі агрэгацыі, не забудзьцеся пазначыць іх як "parallel safe".

Колькасць працоўных працэсаў

Колькасць працоўных працэсаў можна павялічыць без перазапуску сервера:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Што тут адбываецца? Рабочых працэсаў стала ў 2 разы больш, а запыт стаў усяго ў 1,6599 разоў хутчэй. Разлікі цікавыя. У нас было 2 працоўныя працэсы і 1 лідэр. Пасля змены стала 4+1.

Наша максімальнае паскарэнне ад паралельнай апрацоўкі: 5/3 = 1,66(6) разоў.

Як гэта працуе?

працэсы

Выкананне запыту заўсёды пачынаецца з лідзіруючага працэсу. Лідэр робіць усё непаралельнае і частка паралельнай апрацоўкі. Іншыя працэсы, якія выконваюць тыя ж запыты, называюцца працоўнымі працэсамі. Паралельная апрацоўка выкарыстоўвае інфраструктуру дынамічных фонавых працоўных працэсаў (з версіі 9.4). Раз іншыя часткі PostgreSQL выкарыстоўваюць працэсы, а не плыні, запыт з 3 працоўнымі працэсамі мог быць у 4 разы хутчэй традыцыйнай апрацоўкі.

Узаемадзеянне

Рабочыя працэсы маюць зносіны з лідэрам праз чаргу паведамленняў (на аснове агульнай памяці). У кожнага працэсу 2 чэргі: для памылак і для картэжаў.

Колькі трэба працоўных працэсаў?

Мінімальнае абмежаванне задае параметр max_parallel_workers_per_gather. Потым выканаўца запытаў бярэ працоўныя працэсы з пула, абмежаванага параметрам. max_parallel_workers size. Апошняе абмежаванне - гэта max_worker_processes, гэта значыць агульная колькасць фонавых працэсаў.

Калі не атрымалася вылучыць працоўны працэс, апрацоўка будзе аднапрацэснай.

Планавальнік запытаў можа скараціць працоўныя працэсы ў залежнасці ад памеру табліцы ці азначніка. Для гэтага ёсць параметры min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Кожны раз, калі табліца ў 3 разы больш, чым min_parallel_(index|table)_scan_size, Postgres дадае працоўны працэс. Колькасць працоўных працэсаў не заснавана на выдатках. Кругавая залежнасць абцяжарвае складаныя рэалізацыі. Замест гэтага планавальнік выкарыстоўвае простыя правілы.

На практыцы гэтыя правілы не заўсёды падыходзяць для прадакшэну, так што можна змяніць колькасць працоўных працэсаў для пэўнай табліцы: ALTER TABLE … SET (parallel_workers = N).

Чаму паралельная апрацоўка не выкарыстоўваецца?

Акрамя доўгага спісу абмежаванняў ёсць яшчэ праверкі выдаткаў:

parallel_setup_cost - Каб абысціся без паралельнай апрацоўкі кароткіх запытаў. Гэты параметр мяркуе час на падрыхтоўку памяці, запуск працэсу і пачатковы абмен дадзенымі.

parallel_tuple_cost: зносіны лідэра з працоўнымі могуць зацягвацца прапарцыйна колькасці картэжаў ад працоўных працэсаў. Гэты параметр лічыць выдаткі на абмен дадзенымі.

Злучэнні ўкладзеных цыклаў - Nested Loop Join

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Збор адбываецца на апошнім этапе, так што Nested Loop Left Join – гэта паралельная аперацыя. Parallel Index Only Scan з'явіўся толькі ў версіі 10. Ён працуе аналагічна паралельнаму паслядоўнаму сканаванню. Умова c_custkey = o_custkey счытвае адзін парадак для кожнага кліенцкага радка. Так што яно не паралельна.

Хэш-злучэнне - Hash Join

Кожны працоўны працэс стварае сваю хэш-табліцу да PostgreSQL 11. І калі гэтых працэсаў больш за чатыры, прадукцыйнасць не павысіцца. У новай версіі хэш-табліца агульная. Кожны працоўны працэс можа выкарыстоўваць WORK_MEM, каб стварыць хэш-табліцу.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Запыт 12 з TPC-H наглядна паказвае раўналежнае хэш-злучэнне. Кожны працоўны працэс удзельнічае ў стварэнні агульнай хэш-табліцы.

Злучэнне зліццём - Merge Join

Злучэнне зліццём непаралельна па сваёй прыродзе. Не хвалюйцеся, калі гэта апошні этап запыту, - ён усё роўна можа выконвацца паралельна.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Нода "Merge Join" знаходзіцца над "Gather Merge". Так што зліццё не выкарыстоўвае раўналежную апрацоўку. Але нода "Parallel Index Scan" усё роўна дапамагае з сегментам part_pkey.

Злучэнне па секцыях

У PostgreSQL 11 злучэнне па секцыях адключана па змаўчанні: у яго вельмі затратнае планаванне. Табліцы з падобным секцыянаваннем можна злучаць секцыя за секцыяй. Так Postgres будзе выкарыстоўваць хэш-табліцы паменш. Кожнае злучэнне секцый можа быць раўналежным.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Галоўнае, злучэнне па секцыях бывае паралельным, толькі калі гэтыя секцыі дастаткова вялікія.

Паралельны дадатак - Parallel Append

Parallel Append можа выкарыстоўвацца замест розных блокаў у розных працоўных працэсах. Звычайна гэта бывае з запытамі UNION ALL. Недахоп - менш паралелізму, бо кожны працоўны працэс апрацоўвае толькі 1 запыт.

Тут запушчана 2 працоўныя працэсы, хоць уключана 4.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Найважнейшыя зменныя

  • WORK_MEM абмяжоўвае аб'ём памяці для кожнага працэсу, не толькі для запытаў: work_mem працэсы злучэнні = вельмі шмат памяці.
  • max_parallel_workers_per_gather - колькі працоўных працэсаў выконваючая праграма будзе выкарыстоўваць для паралельнай апрацоўкі з плана.
  • max_worker_processes - падладжвае агульны лік працоўных працэсаў пад лік ядраў ЦП на серверы.
  • max_parallel_workers - Тое ж, але для паралельных працоўных працэсаў.

Вынікі

Пачынаючы з версіі 9.6/10 паралельная апрацоўка можа сур'ёзна палепшыць прадукцыйнасць складаных запытаў, якія скануюць шмат радкоў або індэксаў. У PostgreSQL XNUMX паралельная апрацоўка ўключана па змаўчанні. Не забывайце адключаць яе на серверах з вялікай працоўнай нагрузкай OLTP. Паслядоўныя сканы ці сканы індэксаў спажываюць вельмі шмат рэсурсаў. Калі вы не выконваеце справаздачу па ўсім наборы дадзеных, запыты можна зрабіць больш прадукцыйна, проста дадаўшы адсутнічаюць індэксы або выкарыстоўваючы правільнае секцыянаванне.

Спасылкі

Крыніца: habr.com

Дадаць каментар