Kueri paralel di PostgreSQL

Kueri paralel di PostgreSQL
CPU modern memiliki banyak inti. Selama bertahun-tahun, aplikasi telah mengirimkan pertanyaan ke database secara paralel. Jika ini adalah kueri laporan pada beberapa baris dalam tabel, ini berjalan lebih cepat saat menggunakan beberapa CPU, dan PostgreSQL telah mampu melakukan ini sejak versi 9.6.

Butuh waktu 3 tahun untuk mengimplementasikan fitur kueri paralel - kami harus menulis ulang kode pada berbagai tahap eksekusi kueri. PostgreSQL 9.6 memperkenalkan infrastruktur untuk lebih meningkatkan kodenya. Di versi berikutnya, jenis kueri lain dijalankan secara paralel.

Pembatasan

  • Jangan aktifkan eksekusi paralel jika semua inti sudah sibuk, jika tidak, permintaan lain akan melambat.
  • Yang terpenting, pemrosesan paralel dengan nilai WORK_MEM yang tinggi menggunakan banyak memori - setiap hash join atau sortir membutuhkan memori work_mem.
  • Kueri OLTP latensi rendah tidak dapat dipercepat dengan eksekusi paralel. Dan jika kueri mengembalikan satu baris, pemrosesan paralel hanya akan memperlambatnya.
  • Pengembang senang menggunakan benchmark TPC-H. Mungkin Anda memiliki pertanyaan serupa untuk eksekusi paralel yang sempurna.
  • Hanya kueri SELECT tanpa penguncian predikat yang dijalankan secara paralel.
  • Terkadang pengindeksan yang tepat lebih baik daripada pemindaian tabel berurutan dalam mode paralel.
  • Menjeda kueri dan kursor tidak didukung.
  • Fungsi jendela dan fungsi agregat himpunan terurut tidak paralel.
  • Anda tidak mendapatkan apa pun dalam beban kerja I/O.
  • Tidak ada algoritma pengurutan paralel. Namun kueri dengan sortir dapat dijalankan secara paralel dalam beberapa aspek.
  • Ganti CTE (DENGAN ...) dengan SELECT bersarang untuk mengaktifkan pemrosesan paralel.
  • Pembungkus data pihak ketiga belum mendukung pemrosesan paralel (tetapi bisa!)
  • GABUNG LUAR LENGKAP tidak didukung.
  • max_rows menonaktifkan pemrosesan paralel.
  • Jika kueri memiliki fungsi yang tidak ditandai PARALLEL SAFE, kueri tersebut akan berupa rangkaian tunggal.
  • Tingkat isolasi transaksi SERIALIZABLE menonaktifkan pemrosesan paralel.

Lingkungan Uji

Pengembang PostgreSQL mencoba mengurangi waktu respons kueri benchmark TPC-H. Unduh benchmark dan sesuaikan dengan PostgreSQL. Ini adalah penggunaan tidak resmi dari benchmark TPC-H - bukan untuk perbandingan database atau perangkat keras.

  1. Unduh TPC-H_Tools_v2.17.3.zip (atau versi yang lebih baru) dari TPC di luar lokasi.
  2. Ganti nama makefile.suite menjadi Makefile dan ubah seperti yang dijelaskan di sini: https://github.com/tvondra/pg_tpch . Kompilasi kode dengan perintah make.
  3. Hasilkan data: ./dbgen -s 10 membuat database 23 GB. Ini cukup untuk melihat perbedaan performa kueri paralel dan non-paralel.
  4. Konversi file tbl в csv с for и sed.
  5. Kloning repositori pg_tpch dan salin filenya csv в pg_tpch/dss/data.
  6. Buat kueri dengan perintah qgen.
  7. Memuat data ke dalam database dengan perintah ./tpch.sh.

Pemindaian sekuensial paralel

Mungkin lebih cepat bukan karena pembacaan paralel, namun karena data tersebar di banyak inti CPU. Dalam sistem operasi modern, file data PostgreSQL disimpan dalam cache dengan baik. Dengan membaca terlebih dahulu, dimungkinkan untuk mendapatkan blok penyimpanan yang lebih besar daripada permintaan daemon PG. Oleh karena itu, kinerja kueri tidak dibatasi oleh I/O disk. Ini menghabiskan siklus CPU untuk:

  • membaca baris satu per satu dari halaman tabel;
  • membandingkan nilai dan kondisi string WHERE.

Mari kita jalankan kueri sederhana select:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

Pemindaian berurutan menghasilkan terlalu banyak baris tanpa agregasi, sehingga kueri dijalankan oleh satu inti CPU.

Jika Anda menambahkan SUM(), Anda dapat melihat bahwa dua alur kerja akan membantu mempercepat kueri:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Agregasi paralel

Node Parallel Seq Scan menghasilkan baris untuk agregasi parsial. Node "Agregat Parsial" memotong garis-garis ini menggunakan SUM(). Pada akhirnya, penghitung SUM dari setiap proses pekerja dikumpulkan oleh node “Gather”.

Hasil akhir dihitung oleh node “Finalisasi Agregat”. Jika Anda memiliki fungsi agregasi sendiri, jangan lupa menandainya sebagai “parallel safe”.

Jumlah proses pekerja

Jumlah proses pekerja dapat ditingkatkan tanpa me-restart server:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

Apa yang terjadi di sini? Ada proses kerja 2 kali lebih banyak, dan permintaan hanya menjadi 1,6599 kali lebih cepat. Perhitungannya menarik. Kami memiliki 2 proses pekerja dan 1 pemimpin. Setelah diubah menjadi 4+1.

Kecepatan maksimum kami dari pemrosesan paralel: 5/3 = 1,66(6) kali.

Bagaimana cara kerjanya?

Процессы

Eksekusi permintaan selalu dimulai dengan proses utama. Pemimpin melakukan segala sesuatu secara non-paralel dan beberapa proses paralel. Proses lain yang melakukan permintaan yang sama disebut proses pekerja. Pemrosesan paralel menggunakan infrastruktur proses pekerja latar belakang yang dinamis (dari versi 9.4). Karena bagian lain dari PostgreSQL menggunakan proses, bukan thread, kueri dengan 3 proses pekerja bisa 4 kali lebih cepat dibandingkan pemrosesan tradisional.

Interaksi

Proses pekerja berkomunikasi dengan pemimpin melalui antrian pesan (berdasarkan memori bersama). Setiap proses memiliki 2 antrian: untuk kesalahan dan untuk tupel.

Berapa banyak alur kerja yang dibutuhkan?

Batas minimum ditentukan oleh parameter max_parallel_workers_per_gather. Pelari permintaan kemudian mengambil proses pekerja dari kumpulan yang dibatasi oleh parameter max_parallel_workers size. Batasan terakhir adalah max_worker_processes, yaitu jumlah total proses latar belakang.

Jika proses pekerja tidak dapat dialokasikan, pemrosesan akan menjadi proses tunggal.

Perencana kueri bisa mengurangi alur kerja bergantung pada ukuran tabel atau indeks. Ada parameter untuk ini min_parallel_table_scan_size и min_parallel_index_scan_size.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Setiap kali tabelnya 3 kali lebih besar dari min_parallel_(index|table)_scan_size, Postgres menambahkan proses pekerja. Jumlah alur kerja tidak berdasarkan biaya. Ketergantungan melingkar membuat implementasi yang kompleks menjadi sulit. Sebaliknya, perencana menggunakan aturan sederhana.

Dalam praktiknya, aturan ini tidak selalu cocok untuk produksi, sehingga Anda dapat mengubah jumlah proses pekerja untuk tabel tertentu: ALTER TABLE ... SET (parallel_workers = N).

Mengapa pemrosesan paralel tidak digunakan?

Selain daftar panjang batasan, ada juga pemeriksaan biaya:

parallel_setup_cost - untuk menghindari pemrosesan paralel permintaan singkat. Parameter ini memperkirakan waktu untuk menyiapkan memori, memulai proses, dan pertukaran data awal.

parallel_tuple_cost: komunikasi antara pemimpin dan pekerja dapat tertunda sebanding dengan jumlah tupel dari proses kerja. Parameter ini menghitung biaya pertukaran data.

Gabungan Loop Bersarang

PostgreSQL 9.6+ может выполнять вложенные циклы параллельно — это простая операция.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Pengumpulan terjadi pada tahap terakhir, jadi Nested Loop Left Join adalah operasi paralel. Pemindaian Hanya Indeks Paralel diperkenalkan hanya di versi 10. Cara kerjanya mirip dengan pemindaian serial paralel. Kondisi c_custkey = o_custkey membaca satu pesanan per string klien. Jadi tidak paralel.

Gabung Hash

Setiap proses pekerja membuat tabel hashnya sendiri hingga PostgreSQL 11. Dan jika ada lebih dari empat proses ini, kinerja tidak akan meningkat. Di versi baru, tabel hash dibagikan. Setiap proses pekerja dapat menggunakan WORK_MEM untuk membuat tabel hash.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Kueri 12 dari TPC-H dengan jelas menunjukkan koneksi hash paralel. Setiap proses pekerja berkontribusi pada pembuatan tabel hash umum.

Gabung Gabung

Gabungan gabungan bersifat non-paralel. Jangan khawatir jika ini adalah langkah terakhir kueri - kueri masih dapat berjalan secara paralel.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

Node "Gabung Gabung" terletak di atas "Gabung Gabung". Jadi penggabungan tidak menggunakan proses paralel. Namun node “Parallel Index Scan” masih membantu segmen tersebut part_pkey.

Koneksi berdasarkan bagian

Di PostgreSQL 11 koneksi berdasarkan bagian dinonaktifkan secara default: penjadwalannya sangat mahal. Tabel dengan partisi serupa dapat digabungkan partisi demi partisi. Dengan cara ini Postgres akan menggunakan tabel hash yang lebih kecil. Setiap sambungan bagian bisa paralel.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Hal utama adalah bahwa sambungan di bagian-bagian itu paralel hanya jika bagian-bagian ini cukup besar.

Lampiran Paralel

Lampiran Paralel dapat digunakan sebagai pengganti blok yang berbeda dalam alur kerja yang berbeda. Ini biasanya terjadi dengan pertanyaan UNION ALL. Kerugiannya adalah kurang paralelnya, karena setiap proses pekerja hanya memproses 1 permintaan.

Ada 2 proses pekerja yang berjalan di sini, meskipun 4 diaktifkan.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Variabel yang paling penting

  • WORK_MEM membatasi memori per proses, bukan hanya kueri: work_mem proses koneksi = banyak memori.
  • max_parallel_workers_per_gather — berapa banyak proses pekerja yang akan digunakan oleh program pelaksana untuk pemrosesan paralel dari rencana.
  • max_worker_processes — menyesuaikan jumlah total proses pekerja dengan jumlah inti CPU di server.
  • max_parallel_workers - sama, tetapi untuk proses kerja paralel.

Hasil

Pada versi 9.6, pemrosesan paralel dapat sangat meningkatkan kinerja kueri kompleks yang memindai banyak baris atau indeks. Di PostgreSQL 10, pemrosesan paralel diaktifkan secara default. Ingatlah untuk menonaktifkannya di server dengan beban kerja OLTP yang besar. Pemindaian berurutan atau pemindaian indeks menghabiskan banyak sumber daya. Jika Anda tidak menjalankan laporan pada keseluruhan kumpulan data, Anda dapat meningkatkan kinerja kueri hanya dengan menambahkan indeks yang hilang atau menggunakan partisi yang tepat.

referensi

Sumber: www.habr.com

Tambah komentar