Párhuzamos lekérdezések a PostgreSQL-ben

Párhuzamos lekérdezések a PostgreSQL-ben
A modern CPU-k sok maggal rendelkeznek. Az alkalmazások évek óta párhuzamosan küldenek lekérdezéseket adatbázisokba. Ha ez egy jelentéslekérdezés egy táblázat több sorában, akkor gyorsabban fut több CPU használatakor, és a PostgreSQL a 9.6-os verzió óta képes erre.

A párhuzamos lekérdezés funkció megvalósítása 3 évig tartott – a lekérdezés végrehajtásának különböző szakaszaiban át kellett írnunk a kódot. A PostgreSQL 9.6 infrastruktúrát vezetett be a kód továbbfejlesztése érdekében. A következő verziókban más típusú lekérdezések is párhuzamosan futnak.

Korlátozások

  • Ne engedélyezze a párhuzamos végrehajtást, ha már minden mag foglalt, különben a többi kérés lelassul.
  • A legfontosabb, hogy a párhuzamos feldolgozás magas WORK_MEM értékekkel sok memóriát használ – minden egyes hash csatlakozás vagy rendezés a work_mem memóriát foglalja el.
  • Az alacsony késleltetésű OLTP-lekérdezések nem gyorsíthatók fel párhuzamos végrehajtással. És ha a lekérdezés egy sort ad vissza, a párhuzamos feldolgozás csak lelassítja azt.
  • A fejlesztők előszeretettel használják a TPC-H benchmarkot. Talán hasonló lekérdezései vannak a tökéletes párhuzamos végrehajtáshoz.
  • Csak a predikátumzárolás nélküli SELECT lekérdezések futnak párhuzamosan.
  • Néha a megfelelő indexelés jobb, mint a szekvenciális táblakeresés párhuzamos módban.
  • A lekérdezések és kurzorok szüneteltetése nem támogatott.
  • Az ablakfüggvények és a rendezett halmazösszesítő függvények nem párhuzamosak.
  • Nem nyer semmit az I/O munkaterhelésben.
  • Nincsenek párhuzamos rendezési algoritmusok. De a rendezéssel rendelkező lekérdezések bizonyos szempontból párhuzamosan is végrehajthatók.
  • A párhuzamos feldolgozás engedélyezéséhez cserélje ki a CTE-t (WITH ...) egy beágyazott SELECT-re.
  • A harmadik féltől származó adatcsomagolók még nem támogatják a párhuzamos feldolgozást (de tudnák!)
  • A FULL OUTER JOIN nem támogatott.
  • A max_rows letiltja a párhuzamos feldolgozást.
  • Ha egy lekérdezésnek van olyan funkciója, amely nem PÁRHUZAMOS BIZTONSÁGOS jelöléssel rendelkezik, akkor egyszálú lesz.
  • A SERIALIZABLE tranzakció elkülönítési szint letiltja a párhuzamos feldolgozást.

Tesztkörnyezet

A PostgreSQL fejlesztői megpróbálták csökkenteni a TPC-H benchmark lekérdezések válaszidejét. Töltse le a benchmarkot és adaptálja a PostgreSQL-hez. Ez a TPC-H benchmark nem hivatalos használata – nem adatbázis- vagy hardver-összehasonlításra.

  1. Töltse le a TPC-H_Tools_v2.17.3.zip fájlt (vagy újabb verziót) a TPC kihelyezett oldaláról.
  2. Nevezze át a makefile.suite programot Makefile-ra, és módosítsa az itt leírtak szerint: https://github.com/tvondra/pg_tpch . Fordítsa le a kódot a make paranccsal.
  3. Adatok generálása: ./dbgen -s 10 létrehoz egy 23 GB-os adatbázist. Ez elég ahhoz, hogy lássa a különbséget a párhuzamos és nem párhuzamos lekérdezések teljesítményében.
  4. Fájlok konvertálása tbl в csv с for и sed.
  5. A tár klónozása pg_tpch és másolja a fájlokat csv в pg_tpch/dss/data.
  6. Hozzon létre lekérdezéseket egy paranccsal qgen.
  7. Töltse be az adatokat az adatbázisba a paranccsal ./tpch.sh.

Párhuzamos szekvenciális szkennelés

Lehet, hogy nem a párhuzamos olvasás miatt gyorsabb, hanem azért, mert az adatok sok CPU magon keresztül oszlanak el. A modern operációs rendszerekben a PostgreSQL adatfájlok jól gyorsítótárazottak. Előreolvasással nagyobb blokkot kaphatunk a tárolóból, mint amennyit a PG démon kér. Ezért a lekérdezés teljesítményét nem korlátozza a lemez I/O. CPU ciklusokat fogyaszt, hogy:

  • sorokat egyenként olvasni a táblázat oldalairól;
  • Hasonlítsa össze a karakterláncok értékeit és feltételeit WHERE.

Futtassunk egy egyszerű lekérdezést select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

A szekvenciális vizsgálat túl sok sort állít elő összesítés nélkül, így a lekérdezést egyetlen CPU mag hajtja végre.

Ha hozzáteszed SUM(), láthatja, hogy két munkafolyamat segít felgyorsítani a lekérdezést:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Párhuzamos összesítés

A Parallel Seq Scan csomópont sorokat állít elő részleges összesítéshez. A "Partial Aggregate" csomópont ezeket a sorokat a használatával vágja le SUM(). A végén az egyes dolgozói folyamatok SUM számlálóját a „Gather” csomópont gyűjti össze.

A végeredményt a „Finalize Aggregate” csomópont számítja ki. Ha saját összesítő funkciói vannak, ne felejtse el „párhuzamos biztonságosként” megjelölni azokat.

Munkavállalói folyamatok száma

A munkafolyamatok száma a szerver újraindítása nélkül növelhető:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Mi folyik itt? 2-szer több munkafolyamat volt, és a kérés mindössze 1,6599-szer gyorsabb lett. Érdekesek a számítások. 2 dolgozói folyamatunk és 1 vezetőnk volt. A csere után 4+1 lett.

Maximális gyorsulásunk párhuzamos feldolgozásból: 5/3 = 1,66(6)-szor.

Hogyan működik?

A folyamatok

A kérés végrehajtása mindig a vezető folyamattal kezdődik. A vezető mindent nem párhuzamosan és néhány párhuzamos feldolgozást végez. Más folyamatokat, amelyek ugyanazokat a kéréseket hajtják végre, munkafolyamatoknak nevezzük. A párhuzamos feldolgozás infrastruktúrát használ dinamikus háttérmunkás folyamatok (9.4-es verziótól). Mivel a PostgreSQL más részei folyamatokat használnak szálak helyett, a 3 munkafolyamatot tartalmazó lekérdezés négyszer gyorsabb lehet, mint a hagyományos feldolgozás.

Kölcsönhatás

A dolgozói folyamatok üzenetsoron keresztül kommunikálnak a vezetővel (megosztott memória alapján). Minden folyamatnak 2 sora van: a hibákra és a sorokra.

Hány munkafolyamat szükséges?

A minimális határértéket a paraméter határozza meg max_parallel_workers_per_gather. A kérés futtatója ezután feldolgozófolyamatokat vesz át a paraméter által korlátozott készletből max_parallel_workers size. Az utolsó korlátozás az max_worker_processes, vagyis a háttérfolyamatok teljes száma.

Ha nem lehetett dolgozói folyamatot hozzárendelni, a feldolgozás egyfolyamatos lesz.

A lekérdezéstervező a tábla vagy index méretétől függően csökkentheti a munkafolyamatokat. Ehhez vannak paraméterek min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Minden alkalommal, amikor az asztal 3-szor nagyobb, mint min_parallel_(index|table)_scan_size, A Postgres egy munkafolyamatot ad hozzá. A munkafolyamatok száma nem a költségeken alapul. A körkörös függőség megnehezíti a komplex megvalósításokat. Ehelyett a tervező egyszerű szabályokat használ.

A gyakorlatban ezek a szabályok nem mindig alkalmasak a termelésre, így egy adott táblához módosíthatja a dolgozói folyamatok számát: ALTER TABLE ... SET (parallel_workers = N).

Miért nem alkalmazzák a párhuzamos feldolgozást?

A korlátozások hosszú listája mellett költségellenőrzések is vannak:

parallel_setup_cost - a rövid kérések párhuzamos feldolgozásának elkerülése. Ez a paraméter megbecsüli a memória előkészítéséhez, a folyamat elindításához és a kezdeti adatcseréhez szükséges időt.

parallel_tuple_cost: a vezető és a dolgozók közötti kommunikáció a munkafolyamatokból származó sorok számával arányosan késhet. Ez a paraméter az adatcsere költségét számítja ki.

Beágyazott hurok csatlakozások

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

A gyűjtés az utolsó szakaszban történik, így a beágyazott hurok bal összekapcsolása párhuzamos művelet. A Parallel Index Only Scan csak a 10-es verzióban került bevezetésre. A párhuzamos soros szkenneléshez hasonlóan működik. Feltétel c_custkey = o_custkey ügyfélkarakterlánconként egy megbízást olvas be. Tehát nem párhuzamos.

Hash Csatlakozás

Minden munkafolyamat létrehozza a saját hash tábláját a PostgreSQL 11-ig. Ha pedig négynél több ilyen folyamat van, a teljesítmény nem javul. Az új verzióban a hash tábla meg van osztva. Minden munkafolyamat használhatja a WORK_MEM-et egy hash-tábla létrehozásához.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

A TPC-H 12. lekérdezése egyértelműen párhuzamos hash kapcsolatot mutat. Minden munkafolyamat hozzájárul egy közös hash-tábla létrehozásához.

Egyesítés Csatlakozás

Az összevonás nem párhuzamos jellegű. Ne aggódjon, ha ez a lekérdezés utolsó lépése – továbbra is futhat párhuzamosan.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Az "Összekapcsolás" csomópont a "Gather Merge" felett található. Tehát az összevonás nem használ párhuzamos feldolgozást. De a „Parallel Index Scan” csomópont továbbra is segít a szegmensben part_pkey.

Csatlakozás szakaszok szerint

PostgreSQL 11-ben szakaszonkénti kapcsolat alapértelmezés szerint le van tiltva: nagyon drága ütemezése van. A hasonló particionálású táblák partíciónként illeszthetők össze. Így a Postgres kisebb hash táblákat fog használni. A szakaszok minden csatlakozása párhuzamos lehet.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

A lényeg az, hogy a szakaszonkénti kapcsolat csak akkor legyen párhuzamos, ha ezek a szakaszok elég nagyok.

Párhuzamos hozzáfűzés

Párhuzamos hozzáfűzés különböző blokkok helyett használhatók a különböző munkafolyamatokban. Ez általában az UNION ALL lekérdezésekkel történik. Hátránya a kisebb párhuzamosság, mert minden dolgozói folyamat csak 1 kérést dolgoz fel.

2 munkafolyamat fut itt, bár 4 engedélyezve van.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

A legfontosabb változók

  • A WORK_MEM folyamatonként korlátozza a memóriát, nem csak a lekérdezéseket: work_mem folyamatokat kapcsolatok = sok memória.
  • max_parallel_workers_per_gather — hány munkafolyamatot fog a végrehajtó program felhasználni a tervből származó párhuzamos feldolgozáshoz.
  • max_worker_processes — a munkafolyamatok teljes számát a szerveren lévő CPU magok számához igazítja.
  • max_parallel_workers - ugyanaz, de párhuzamos munkafolyamatokhoz.

Eredményei

A 9.6-os verziótól kezdve a párhuzamos feldolgozás nagymértékben javíthatja a sok sort vagy indexet vizsgáló összetett lekérdezések teljesítményét. A PostgreSQL 10-ben a párhuzamos feldolgozás alapértelmezés szerint engedélyezve van. Ne felejtse el letiltani a nagy OLTP-terhelésű szervereken. A szekvenciális vagy indexellenőrzések sok erőforrást fogyasztanak. Ha nem a teljes adatkészletről futtat jelentést, javíthatja a lekérdezés teljesítményét egyszerűen hiányzó indexek hozzáadásával vagy megfelelő particionálással.

referenciák

Forrás: will.com

Hozzászólás