Zapytania równoległe w PostgreSQL

Zapytania równoległe w PostgreSQL
Nowoczesne procesory mają wiele rdzeni. Od lat aplikacje równolegle wysyłają zapytania do baz danych. Jeśli jest to zapytanie raportujące dotyczące wielu wierszy tabeli, działa ono szybciej przy użyciu wielu procesorów, a PostgreSQL jest w stanie to zrobić od wersji 9.6.

Wdrożenie funkcji zapytań równoległych zajęło 3 lata – na różnych etapach wykonywania zapytań musieliśmy przepisać kod. PostgreSQL 9.6 wprowadził infrastrukturę w celu dalszego ulepszania kodu. W kolejnych wersjach równolegle wykonywane są zapytania innych typów.

Ograniczenia

  • Nie włączaj wykonywania równoległego, jeśli wszystkie rdzenie są już zajęte, w przeciwnym razie inne żądania będą spowolnione.
  • Co najważniejsze, przetwarzanie równoległe z dużymi wartościami WORK_MEM zużywa dużo pamięci - każde złączenie skrótu lub sortowanie zajmuje pamięć work_mem.
  • Zapytań OLTP o niskim opóźnieniu nie można przyspieszyć przez wykonanie równoległe. A jeśli zapytanie zwróci jeden wiersz, przetwarzanie równoległe tylko go spowolni.
  • Programiści uwielbiają korzystać z testu porównawczego TPC-H. Być może masz podobne zapytania dotyczące idealnego wykonywania równoległego.
  • Równolegle wykonywane są tylko zapytania SELECT bez blokowania predykatów.
  • Czasami właściwe indeksowanie jest lepsze niż sekwencyjne skanowanie tabel w trybie równoległym.
  • Wstrzymywanie zapytań i kursorów nie jest obsługiwane.
  • Funkcje okna i funkcje agregujące o uporządkowanym zbiorze nie są równoległe.
  • Nic nie zyskujesz w obciążeniu we/wy.
  • Nie ma algorytmów sortowania równoległego. Jednak w niektórych aspektach zapytania z sortowaniem mogą być wykonywane równolegle.
  • Zastąp CTE (WITH ...) zagnieżdżonym SELECT, aby umożliwić przetwarzanie równoległe.
  • Opakowania danych innych firm nie obsługują jeszcze przetwarzania równoległego (ale mogłyby!)
  • FULL OUTER JOIN nie jest obsługiwany.
  • max_rows wyłącza przetwarzanie równoległe.
  • Jeśli zapytanie ma funkcję, która nie jest oznaczona jako PARALLEL SAFE, będzie ono jednowątkowe.
  • Poziom izolacji transakcji SERIALIZABLE wyłącza przetwarzanie równoległe.

Środowisko testowe

Programiści PostgreSQL próbowali skrócić czas odpowiedzi na zapytania testowe TPC-H. Pobierz test porównawczy i dostosuj go do PostgreSQL. Jest to nieoficjalne wykorzystanie testu porównawczego TPC-H - nie do porównywania baz danych lub sprzętu.

  1. Pobierz TPC-H_Tools_v2.17.3.zip (lub nowszą wersję) z zewnętrznej siedziby TPC.
  2. Zmień nazwę makefile.suite na Makefile i zmień zgodnie z opisem tutaj: https://github.com/tvondra/pg_tpch . Skompiluj kod za pomocą polecenia make.
  3. Wygeneruj dane: ./dbgen -s 10 tworzy bazę danych o pojemności 23 GB. To wystarczy, aby zobaczyć różnicę w wydajności zapytań równoległych i nierównoległych.
  4. Skonwertuj pliki tbl в csv с for и sed.
  5. Sklonuj repozytorium pg_tpch i skopiuj pliki csv в pg_tpch/dss/data.
  6. Twórz zapytania za pomocą polecenia qgen.
  7. Załaduj dane do bazy danych za pomocą polecenia ./tpch.sh.

Równoległe skanowanie sekwencyjne

Może być szybszy nie ze względu na odczyt równoległy, ale dlatego, że dane są rozproszone pomiędzy wieloma rdzeniami procesora. W nowoczesnych systemach operacyjnych pliki danych PostgreSQL są dobrze buforowane. Dzięki funkcji odczytu z wyprzedzeniem możliwe jest pobranie z pamięci większego bloku niż żąda demon PG. Dlatego wydajność zapytań nie jest ograniczona przez operacje we/wy dysku. Zużywa cykle procesora, aby:

  • czytaj wiersze pojedynczo ze stron tabeli;
  • porównaj wartości ciągów i warunki WHERE.

Uruchommy proste zapytanie select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Skanowanie sekwencyjne generuje zbyt wiele wierszy bez agregacji, więc zapytanie jest wykonywane przez pojedynczy rdzeń procesora.

Jeśli dodasz SUM()widać, że dwa przepływy pracy pomogą przyspieszyć zapytanie:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agregacja równoległa

Węzeł Parallel Seq Scan tworzy wiersze do częściowej agregacji. Węzeł „Częściowe agregatowanie” przycina te linie za pomocą SUM(). Na koniec licznik SUM z każdego procesu roboczego jest zbierany przez węzeł „Zbierz”.

Ostateczny wynik jest obliczany przez węzeł „Finalize Aggregate”. Jeśli masz własne funkcje agregujące, nie zapomnij oznaczyć ich jako „bezpieczne równolegle”.

Liczba procesów roboczych

Liczbę procesów roboczych można zwiększyć bez ponownego uruchamiania serwera:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Co tu się dzieje? Było 2 razy więcej procesów roboczych, a żądanie stało się tylko 1,6599 razy szybsze. Obliczenia są interesujące. Mieliśmy 2 procesy robocze i 1 lidera. Po zmianie powstało 4+1.

Nasze maksymalne przyspieszenie z przetwarzania równoległego: 5/3 = 1,66(6) razy.

Jak to działa?

Procesy

Realizacja żądania zawsze rozpoczyna się od procesu wiodącego. Lider robi wszystko nierównolegle i w pewnym stopniu równolegle. Inne procesy wykonujące te same żądania nazywane są procesami roboczymi. Przetwarzanie równoległe wykorzystuje infrastrukturę dynamiczne procesy robocze w tle (od wersji 9.4). Ponieważ inne części PostgreSQL używają procesów, a nie wątków, zapytanie z 3 procesami roboczymi może być 4 razy szybsze niż tradycyjne przetwarzanie.

Wzajemne oddziaływanie

Procesy robocze komunikują się z liderem poprzez kolejkę komunikatów (w oparciu o pamięć współdzieloną). Każdy proces ma 2 kolejki: na błędy i na krotki.

Ile przepływów pracy jest potrzebnych?

Minimalny limit jest określony przez parametr max_parallel_workers_per_gather. Następnie moduł uruchamiający żądania pobiera procesy robocze z puli ograniczonej parametrem max_parallel_workers size. Ostatnie ograniczenie to max_worker_processes, czyli całkowita liczba procesów w tle.

Jeżeli nie było możliwości przydzielenia procesu roboczego, przetwarzanie będzie jednoprocesowe.

Planista zapytań może zredukować przepływy pracy w zależności od rozmiaru tabeli lub indeksu. Są do tego parametry min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Za każdym razem stół jest 3 razy większy niż min_parallel_(index|table)_scan_size, Postgres dodaje proces roboczy. Liczba przepływów pracy nie jest oparta na kosztach. Zależność cykliczna utrudnia złożone implementacje. Zamiast tego planista stosuje proste zasady.

W praktyce reguły te nie zawsze sprawdzają się na produkcji, dlatego można zmienić ilość procesów roboczych dla konkretnej tabeli: ALTER TABLE ... SET (parallel_workers = N).

Dlaczego nie stosuje się przetwarzania równoległego?

Oprócz długiej listy ograniczeń istnieją również kontrole kosztów:

parallel_setup_cost - aby uniknąć równoległego przetwarzania krótkich wniosków. Parametr ten szacuje czas przygotowania pamięci, uruchomienia procesu i wstępnej wymiany danych.

parallel_tuple_cost: komunikacja pomiędzy liderem a pracownikami może być opóźniona proporcjonalnie do liczby krotek z procesów roboczych. Parametr ten oblicza koszt wymiany danych.

Połączenia zagnieżdżonych pętli

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Kolekcjonowanie odbywa się na ostatnim etapie, więc Nested Loop Left Join jest operacją równoległą. Parallel Index Only Scan zostało wprowadzone dopiero w wersji 10. Działa podobnie do równoległego skanowania szeregowego. Stan c_custkey = o_custkey odczytuje jedno zamówienie na ciąg klienta. Więc to nie jest równoległe.

Hash Dołącz

Każdy proces roboczy tworzy własną tablicę mieszającą aż do PostgreSQL 11. A jeśli będzie więcej niż cztery takie procesy, wydajność nie ulegnie poprawie. W nowej wersji tablica skrótów jest współdzielona. Każdy proces roboczy może użyć WORK_MEM do utworzenia tabeli mieszającej.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Zapytanie 12 z TPC-H wyraźnie pokazuje równoległe połączenie mieszające. Każdy proces roboczy przyczynia się do utworzenia wspólnej tablicy mieszającej.

Połącz Dołącz

Łączenie scalone nie ma charakteru równoległego. Nie martw się, jeśli jest to ostatni krok zapytania – może ono nadal działać równolegle.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Węzeł „Połącz połączenie” znajduje się nad węzłem „Zbierz połączenie”. Zatem scalanie nie wykorzystuje przetwarzania równoległego. Ale węzeł „Równoległe skanowanie indeksu” nadal pomaga w segmencie part_pkey.

Połączenie przez sekcje

W PostgreSQL 11 połączenie sekcjami domyślnie wyłączone: ma bardzo kosztowne planowanie. Tabele o podobnym partycjonowaniu można łączyć partycja po partycji. W ten sposób Postgres użyje mniejszych tabel skrótów. Każde połączenie sekcji może być równoległe.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Najważniejsze jest to, że połączenie w sekcjach jest równoległe tylko wtedy, gdy te sekcje są wystarczająco duże.

Dołączanie równoległe

Dołączanie równoległe można używać zamiast różnych bloków w różnych przepływach pracy. Zwykle dzieje się tak w przypadku zapytań UNION ALL. Wadą jest mniejsza równoległość, ponieważ każdy proces roboczy przetwarza tylko 1 żądanie.

Działają tutaj 2 procesy robocze, chociaż 4 są włączone.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Najważniejsze zmienne

  • WORK_MEM ogranicza pamięć na proces, a nie tylko zapytania: work_mem procesy połączenia = dużo pamięci.
  • max_parallel_workers_per_gather — ile procesów roboczych wykorzysta program wykonawczy do przetwarzania równoległego z planu.
  • max_worker_processes — dostosowuje całkowitą liczbę procesów roboczych do liczby rdzeni procesora na serwerze.
  • max_parallel_workers - to samo, ale dla równoległych procesów roboczych.

Wyniki

Począwszy od wersji 9.6, przetwarzanie równoległe może znacznie poprawić wydajność złożonych zapytań, które skanują wiele wierszy lub indeksów. W PostgreSQL 10 przetwarzanie równoległe jest domyślnie włączone. Pamiętaj, aby wyłączyć tę opcję na serwerach z dużym obciążeniem OLTP. Skanowania sekwencyjne lub skanowania indeksowe zużywają dużo zasobów. Jeśli nie uruchamiasz raportu dotyczącego całego zbioru danych, możesz poprawić wydajność zapytań, po prostu dodając brakujące indeksy lub stosując odpowiedni podział.

referencje

Źródło: www.habr.com

Dodaj komentarz