Паралельні запити у PostgreSQL

Паралельні запити у PostgreSQL
У сучасних ЦП дуже багато ядер. Роками програми надсилали запити до баз даних паралельно. Якщо це звітний запит до безлічі рядків у таблиці, він виконується швидше, коли задіяно кілька ЦП, і PostgreSQL це можливо, починаючи з версії 9.6.

Потрібно було 3 роки, щоб реалізувати функцію паралельних запитів — довелося переписати код на різних етапах виконання запитів. У PostgreSQL 9.6 з'явилася інфраструктура подальшого поліпшення коду. У наступних версіях та інші типи запитів виконуються паралельно.

Обмеження

  • Не вмикайте паралельне виконання, якщо всі ядра вже зайняті, інакше інші запити будуть гальмувати.
  • Найголовніше, паралельна обробка з високими значеннями WORK_MEM задіює багато пам'яті - кожне хеш-з'єднання або сортування займають пам'ять в обсязі work_mem.
  • Запити OLTP із низькою затримкою неможливо прискорити паралельним виконанням. А якщо запит повертає один рядок, паралельна обробка його лише уповільнить.
  • Розробники люблять використовувати бенчмарк TPC-H. Можливо, у вас є схожі запити для ідеального паралельного виконання.
  • Тільки запити SELECT без попереднього блокування виконуються паралельно.
  • Іноді правильна індексація краща за послідовне сканування таблиці в паралельному режимі.
  • Призупинення запитів та курсори не підтримуються.
  • Віконні функції та агрегатні функції впорядкованих наборів не є паралельними.
  • Ви нічого не виграєте в робочому навантаженні вводу-виводу.
  • Паралельних алгоритмів сортування немає. Але запити із сортуваннями можуть виконуватися паралельно у деяких аспектах.
  • Замініть CTE (WITH …) на вкладений SELECT, щоб увімкнути паралельну обробку.
  • Обертки сторонніх даних поки не підтримують паралельну обробку (а могли б!)
  • FULL OUTER JOIN не підтримується.
  • max_rows відключає паралельну обробку.
  • Якщо у запиті є функція, яка не позначена як PARALLEL SAFE, він буде однопоточним.
  • Рівень ізоляції транзакції SERIALIZABLE вимикає паралельну обробку.

Тестове середовище

Розробники PostgreSQL спробували урізати час відгуку запитів бенчмарку TPC-H. Завантажте бенчмарк і адаптуйте його до PostgreSQL. Це неофіційне використання бенчмарку TPC-H – не для порівняння баз даних чи обладнання.

  1. Завантажте TPC-H_Tools_v2.17.3.zip (або новішу версію) з офсайту TPC.
  2. Перейменуйте makefile.suite на Makefile і змініть, як описано тут: https://github.com/tvondra/pg_tpch . Скомпілюйте код командою make.
  3. Згенеруйте дані: ./dbgen -s 10 створює базу даних на 23 ГБ. Цього вистачить, щоб побачити різницю у продуктивності паралельних та непаралельних запитів.
  4. Конвертуйте файли tbl в csv с for и sed.
  5. Клонуйте репозиторій pg_tpch та скопіюйте файли csv в pg_tpch/dss/data.
  6. Створіть запити командою qgen.
  7. Завантажте дані до бази командою ./tpch.sh.

Паралельне послідовне сканування

Воно може бути швидше не через паралельне читання, а тому що дані розкидані по багатьох ядрах ЦП. У сучасних ОС файли даних PostgreSQL добре кешуються. З випереджаючим читанням можна отримати зі сховища блок більше, ніж запитує демон PG. Тому продуктивність запиту не обмежена введенням-виводом диска. Він споживає цикли ЦП, щоб:

  • читати рядки по одній зі сторінок таблиці;
  • порівнювати значення рядків та умови WHERE.

Виконаємо простий запит select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Послідовний скан дає занадто багато рядків без агрегації, тому запит виконується одним ядром ЦП.

якщо додати SUM(), видно, що два робочі процеси допоможуть прискорити запит:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Паралельна агрегація

Нода "Parallel Seq Scan" робить рядки для часткової агрегації. Нода "Partial Aggregate" урізає ці рядки за допомогою SUM(). Наприкінці лічильник SUM із кожного робочого процесу збирається нодою «Gather».

Підсумковий результат розраховується нодою "Finalize Aggregate". Якщо ви маєте свої функції агрегації, не забудьте позначити їх як «parallel safe».

Кількість робочих процесів

Кількість робочих процесів можна збільшити без перезапуску сервера:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Що тут відбувається? Робочих процесів стало вдвічі більше, а запит став лише в 2 разів швидше. Розрахунки цікаві. У нас було 1,6599 робочі процеси і 2 лідер. Після зміни стало 1+4.

Наше максимальне прискорення від паралельної обробки: 5/3 = 1,66 (6) разів.

Як це працює?

процеси

Виконання запиту завжди починається з провідного процесу. Лідер робить все непаралельне та частину паралельної обробки. Інші процеси, які виконують самі запити, називаються робочими процесами. Паралельна обробка використовує інфраструктуру динамічних фонових робочих процесів (З версії 9.4). Оскільки інші частини PostgreSQL використовують процеси, а не потоки, запит з 3 робочими процесами міг бути в 4 рази швидше традиційної обробки.

Взаємодія

Робочі процеси спілкуються з лідером через чергу повідомлень (з урахуванням спільної пам'яті). У кожного процесу дві черги: для помилок і для кортежів.

Скільки потрібно робочих процесів?

Мінімальне обмеження задає параметр max_parallel_workers_per_gather. Потім виконавець запитів бере робочі процеси з пулу, обмеженого параметром max_parallel_workers size. Останнє обмеження – це max_worker_processes, тобто загальна кількість фонових процесів.

Якщо не вдалося виділити робочий процес, обробка буде однопроцесною.

Планувальник запитів може скоротити робочі процеси залежно від розміру таблиці чи індексу. Для цього є параметри min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Щоразу, коли таблиця в 3 рази більша, ніж min_parallel_(index|table)_scan_size, Postgres додає робочого процесу. Кількість робочих процесів не ґрунтується на витратах. Кругова залежність ускладнює складні реалізації. Натомість планувальник використовує прості правила.

На практиці ці правила не завжди годяться для продакшену, тому можна змінити кількість робочих процесів для конкретної таблиці: ALTER TABLE … SET (parallel_workers = N).

Чому паралельна обробка не використовується?

Крім довгого списку обмежень, є ще перевірки витрат:

parallel_setup_cost — щоб уникнути паралельної обробки коротких запитів. Цей параметр дає час на підготовку пам'яті, запуск процесу та початковий обмін даними.

parallel_tuple_cost: спілкування лідера з робітниками може затягуватися пропорційно до кількості кортежів від робочих процесів. Цей параметр рахує витрати на обмін даними.

З'єднання вкладених циклів - Nested Loop Join

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Збір відбувається на останньому етапі, так що Nested Loop Left Join – це паралельна операція. Parallel Index Only Scan з'явився лише у версії 10. Він працює аналогічно до паралельного послідовного сканування. Умова c_custkey = o_custkey зчитує один порядок для кожного рядка клієнтів. Тож воно не паралельне.

Хеш-з'єднання - Hash Join

Кожен робочий процес створює свою хеш-таблицю до PostgreSQL 11. І якщо цих процесів більше чотирьох, продуктивність не підвищиться. У новій версії хеш-таблиця загальна. Кожен робочий процес може використовувати WORK_MEM для створення хеш-таблицю.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Запит 12 TPC-H наочно показує паралельне хеш-з'єднання. Кожен робочий процес бере участь у створенні загальної хеш-таблиці.

З'єднання злиттям - Merge Join

З'єднання злиттям непаралельне за своєю природою. Не хвилюйтеся, якщо це останній етап запиту, він все одно може виконуватися паралельно.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Нода "Merge Join" знаходиться над "Gather Merge". Тож злиття не використовує паралельну обробку. Але нода "Parallel Index Scan" все одно допомагає з сегментом part_pkey.

З'єднання по секціям

У PostgreSQL 11 з'єднання по секціям відключено за замовчуванням: має дуже затратне планування. Таблиці з подібним секціюванням можна поєднувати секція за секцією. Так Postgres буде використовувати менші хеш-таблиці. Кожне з'єднання секцій може бути паралельним.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Головне, з'єднання по секціям буває паралельним, тільки якщо ці секції досить великі.

Паралельне доповнення - Parallel Append

Parallel Append може використовуватися замість різних блоків у різних робочих процесах. Зазвичай це буває із запитами UNION ALL. Недолік — менший за паралелізм, адже кожен робочий процес обробляє лише 1 запит.

Тут запущено 2 робочі процеси, хоча включено 4.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Найважливіші змінні

  • WORK_MEM обмежує обсяг пам'яті для кожного процесу, не тільки для запитів: work_mem процеси з'єднання = дуже багато пам'яті.
  • max_parallel_workers_per_gather — скільки робочих процесів виконуюча програма використовуватиме паралельної обробки з плану.
  • max_worker_processes - Підлаштовує загальну кількість робочих процесів під число ядер ЦП на сервері.
  • max_parallel_workers — те саме, але для паралельних робочих процесів.

Підсумки

Починаючи з версії 9.6, паралельна обробка може серйозно покращити продуктивність складних запитів, які сканують багато рядків або індексів. У PostgreSQL 10 паралельна обробка включена за умовчанням. Не забувайте відключати її на серверах з великим робочим навантаженням OLTP. Послідовні скани чи скани індексів споживають дуже багато ресурсів. Якщо ви не виконуєте звіт по всьому набору даних, запити можна зробити продуктивнішими, просто додавши відсутні індекси або використовуючи правильне секціонування.

Посилання

Джерело: habr.com

Додати коментар або відгук