Паралелни заявки в PostgreSQL

Паралелни заявки в PostgreSQL
Съвременните процесори имат много ядра. От години приложенията изпращат паралелно заявки към бази данни. Ако това е заявка за отчитане на множество редове в таблица, тя е по-бърза, когато използва множество процесори и PostgreSQL може да направи това от версия 9.6.

Отне 3 години, за да внедря функцията за паралелни заявки - трябваше да пренапиша кода на различни етапи от изпълнението на заявката. PostgreSQL 9.6 въведе инфраструктурата за допълнително подобряване на кода. В следващите версии други типове заявки се изпълняват паралелно.

Ограничения

  • Не разрешавайте паралелно изпълнение, ако всички ядра вече са заети, в противен случай други заявки ще се забавят.
  • Най-важното е, че паралелната обработка с високи стойности на WORK_MEM използва много памет - всяко хеш присъединяване или сортиране консумира памет в размер на work_mem.
  • OLTP заявките с ниска латентност не могат да бъдат ускорени чрез паралелно изпълнение. И ако заявката върне един ред, паралелната обработка само ще я забави.
  • Разработчиците обичат да използват бенчмарка TPC-H. Може би имате подобни заявки за перфектно паралелно изпълнение.
  • Паралелно се изпълняват само SELECT заявки без заключване на предикат.
  • Понякога правилното индексиране е по-добро от паралелното сканиране на последователни таблици.
  • Паузата на заявката и курсорите не се поддържат.
  • Прозоречните функции и подредените агрегатни функции не са паралелни.
  • Не печелите нищо от I/O работното натоварване.
  • Няма паралелни алгоритми за сортиране. Но заявките със сортиране могат да се изпълняват паралелно в някои аспекти.
  • Заменете CTE (WITH ...) с вложен SELECT, за да разрешите паралелна обработка.
  • Чуждите обвивки на данни все още не поддържат паралелна обработка (но биха могли!)
  • FULL OUTER JOIN не се поддържа.
  • max_rows забранява паралелната обработка.
  • Ако заявката има функция, която не е маркирана като PARALLEL SAFE, тя ще бъде еднопоточна.
  • Нивото на изолация на транзакцията SERIALIZABLE забранява паралелната обработка.

Тестова среда

Разработчиците на PostgreSQL се опитаха да намалят времето за отговор на заявките за сравнение на TPC-H. Изтеглете бенчмарка и адаптирайте го към PostgreSQL. Това е неофициална употреба на бенчмарка TPC-H - не за сравняване на бази данни или хардуер.

  1. Изтеглете TPC-H_Tools_v2.17.3.zip (или по-нова версия) от външен TPC.
  2. Преименувайте makefile.suite на Makefile и модифицирайте, както е описано тук: https://github.com/tvondra/pg_tpch . Компилирайте кода с make.
  3. Генериране на данни: ./dbgen -s 10 създава 23 GB база данни. Това е достатъчно, за да видите разликата в производителността между паралелни и непаралелни заявки.
  4. Конвертиране на файлове tbl в csv с for и sed.
  5. Клонирайте хранилището pg_tpch и копирайте файловете csv в pg_tpch/dss/data.
  6. Създайте заявки с команда qgen.
  7. Заредете данните в базата данни с командата ./tpch.sh.

Паралелно последователно сканиране

Може да е по-бързо не поради паралелното четене, а защото данните са разпръснати в много процесорни ядра. В съвременните операционни системи файловете с данни на PostgreSQL са добре кеширани. С предварително четене е възможно да получите блок, по-голям от хранилището, отколкото PG демонът иска. Следователно производителността на заявките не е ограничена от I/O на диска. Той изразходва процесорни цикли за:

  • чете редове един по един от страниците на таблицата;
  • сравняване на низови стойности и условия WHERE.

Нека изпълним проста заявка select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Последователното сканиране дава твърде много редове без агрегиране, така че заявката се изпълнява от едно процесорно ядро.

Ако добавите SUM(), можете да видите, че два работни потока ще помогнат за ускоряване на заявката:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Паралелно агрегиране

Възелът "Parallel Seq Scan" създава редове за частично агрегиране. Възелът „Частичен агрегат“ съкращава тези редове с SUM(). В края броячът SUM от всеки работен поток се събира от възела Gather.

Крайният резултат се изчислява от възела „Finalize Aggregate“. Ако имате свои собствени функции за агрегиране, не забравяйте да ги маркирате като "паралелно безопасно".

Брой работни процеси

Броят на работните процеси може да бъде увеличен без рестартиране на сървъра:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Какво става тук? Има 2 пъти повече работни процеси, а заявката е само 1,6599 пъти по-бърза. Изчисленията са интересни. Имахме 2 работни процеса и 1 лидер. След смяната стана 4+1.

Нашето максимално ускоряване от паралелна обработка: 5/3 = 1,66(6) пъти.

Как действа тя?

процеси

Изпълнението на заявка винаги започва с водещия процес. Лидерът прави всичко непаралелно и част от паралелната обработка. Други процеси, които изпълняват същите заявки, се наричат ​​работни процеси. Паралелната обработка използва инфраструктура динамични фонови работни процеси (от версия 9.4). Тъй като други части на PostgreSQL използват процеси, а не нишки, заявка с 3 работни процеса може да бъде 4 пъти по-бърза от традиционната обработка.

Взаимодействие

Работните процеси комуникират с лидера чрез опашка от съобщения (на базата на споделена памет). Всеки процес има 2 опашки: за грешки и за кортежи.

Колко работни процеси са ви необходими?

Минималната граница определя параметъра max_parallel_workers_per_gather. След това изпълнителят на заявка взема работни процеси от пула, ограничен от параметъра max_parallel_workers size. Последното ограничение е max_worker_processes, което е общият брой фонови процеси.

Ако не е било възможно да се разпредели работен процес, обработката ще бъде еднопроцесна.

Планиращият заявки може да намали работните потоци в зависимост от размера на таблицата или индекса. Има варианти за това. min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Всеки път, когато масата е 3 пъти по-голяма от min_parallel_(index|table)_scan_size, Postgres добавя работен процес. Броят на работните потоци не се основава на разходите. Циркулярната зависимост затруднява сложните реализации. Вместо това планировчикът използва прости правила.

На практика тези правила не винаги са подходящи за производство, така че е възможно да промените броя на работните процеси за определена таблица: ALTER TABLE ... SET (parallel_workers = N).

Защо не се използва паралелна обработка?

В допълнение към дългия списък от ограничения има и проверки на разходите:

parallel_setup_cost - за избягване на паралелна обработка на кратки заявки. Този параметър оценява времето за подготовка на паметта, стартиране на процеса и първоначален обмен на данни.

parallel_tuple_cost: комуникацията между лидера и работниците може да се забави пропорционално на броя на кортежите от работните процеси. Този параметър отчита разходите за обмен на данни.

Nested Loop Joins - Съединяване на вложен цикъл

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Събирането се извършва в последната стъпка, така че лявото присъединяване на вложен цикъл е паралелна операция. Сканирането само с паралелен индекс беше въведено едва във версия 10. Работи подобно на паралелното последователно сканиране. Състояние c_custkey = o_custkey чете една поръчка за всеки клиентски ред. Така че не е паралелно.

Хеширане - Хеширане

Всеки работен процес създава своя собствена хеш таблица преди PostgreSQL 11. И ако има повече от четири от тези процеси, производителността няма да се подобри. В новата версия хеш-таблицата е споделена. Всеки работен процес може да използва WORK_MEM за създаване на хеш таблица.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Заявка 12 от TPC-H илюстрира паралелно хеширане. Всеки работен процес участва в създаването на споделена хеш таблица.

Обединяване Присъединяване

Съединението за сливане е непаралелно по природа. Не се притеснявайте, ако това е последната стъпка от заявката - тя все още може да се изпълнява паралелно.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Възелът „Merge Join“ е над „Gather Merge“. Така че сливането не използва паралелна обработка. Но възелът „Паралелно индексно сканиране“ все още помага със сегмента part_pkey.

Секция връзка

В PostgreSQL 11 връзка по секции деактивирано по подразбиране: има много скъпо планиране. Таблици с подобно разделяне могат да се обединяват секция по секция. Това ще накара Postgres да използва по-малки хеш таблици. Всяка връзка на секциите може да бъде успоредна.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Основното е, че връзката по секции е паралелна само ако тези секции са достатъчно големи.

Паралелно добавяне

Паралелно добавяне може да се използва вместо различни блокове в различни работни процеси. Това обикновено се случва с UNION ALL заявки. Недостатъкът е по-малко паралелност, тъй като всеки работен процес обработва само 1 заявка.

Тук се изпълняват 2 работни процеса, въпреки че 4 са активирани.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Най-важните променливи

  • WORK_MEM ограничава количеството памет на процес, а не само на заявки: work_mem процеси връзки = много памет.
  • max_parallel_workers_per_gather - колко работни процеси изпълняващата програма ще използва за паралелна обработка от плана.
  • max_worker_processes - коригира общия брой работни процеси спрямо броя на процесорните ядра на сървъра.
  • max_parallel_workers - същото, но за паралелни работни процеси.

Резултати от

Започвайки с версия 9.6, паралелната обработка може значително да подобри производителността на сложни заявки, които сканират много редове или индекси. В PostgreSQL 10 паралелната обработка е активирана по подразбиране. Не забравяйте да го деактивирате на сървъри с голямо OLTP натоварване. Последователните или индексните сканирания изискват много ресурси. Ако не отчитате целия набор от данни, заявките могат да бъдат направени по-ефективни чрез просто добавяне на липсващи индекси или чрез използване на правилно разделяне.

Позоваването

Източник: www.habr.com

Добавяне на нов коментар