A quick sanity testing of PostgreSQL parallel query on Arm64

Query parallelism is supported in PostgreSQL since quite a while now. People in the PG community call it "Parallel query", but by now it is not limited to just SELECT queries. Index build leverages multiple cores; and even utilities like VACUUM now make use of parallelism. Furthermore, community is working on parallelizing COPY and INSERTs.
                                                                                
I was interested to do kind-of "sanity" check of this capability specifically on ARM64 platform. Let's see how it goes. And also at the same time, we will try to understand little bit of how to interpret the parallelism part of the plan output. Subqueries and partitions are not covered in this blog; probably I will add it in another blog.
                                                                                
For running the queries I generated a scale-5 TPC-H benchmark schema with the help of scripts taken from https://github.com/tvondra/pg_tpch.git. My machine is an 8 CPU VM with 15GB memory and Ubuntu 18.04, running on a "Kunpeng 920" 2.6 GHz host. The PostgreSQL build was using git master branch,  so you can treat it somewhere between PostgreSQL 13 and 14. All the tests were run with max_parallel_workers_per_gather = 4. The tables were pre-warmed, so I reduced seq_page_cost and random_page_cost to as low as 0.1.
                                                                                
The JIT-related part of the EXPLAIN output is omitted from the plans to keep the focus on the main query plan. Also, estimated costs are omitted in order to make the plan output compact.
                                                      


Parallel sequential scan
                                                                                
This is the simplest one, and the one with which query parallelism got introduced in PostgreSQL 9.6.
                                                                                
Just a plain "select * from lineitem" won't give us a parallel scan, because all the tuples need to be transferred from workers to the leader backend. Parallel scan is beneficial only when this tuple transfer cost is small enough. So let's reduce the number of rows selected :

tpch=# explain (analyze, costs off)
tpch-# select l_orderkey from lineitem  where l_shipmode = 'AIR' and l_shipinstruct = 'TAKE BACK RETURN';

                                            QUERY PLAN
--------------------------------------------------------------------------------------------------
 Gather (actual time=6.264..1776.956 rows=1070891 loops=1)
   Workers Planned: 4
   Workers Launched: 4
   ->  Parallel Seq Scan on lineitem (actual time=6.959..1640.647 rows=214178 loops=5)
         Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar))
         Rows Removed by Filter: 5785781
 Planning Time: 0.205 ms
 Execution Time: 1823.987 ms

So parallel sequential scan took 1824 ms to execute. Let's compare this with sequential scan :

tpch=# set max_parallel_workers_per_gather TO  0; -- Disable parallelism

                                         QUERY PLAN
--------------------------------------------------------------------------------------------
 Seq Scan on lineitem (actual time=117.795..5077.520 rows=1070891 loops=1)
   Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar))
   Rows Removed by Filter: 28928904
 Planning Time: 0.101 ms
 Execution Time: 5123.774 ms


So parallel seqscan was around 2.5 times faster.

Just a background before we go for other queries ... Parallelism is achieved by distributing  the table blocks to the workers, and the parallel workers would then do their job of reading and processing tuples from the blocks they read. But how is it made sure that no two workers scan the same block ? After all, they are running in parallel, so they should make sure that each block should be scanned only by one particular worker, otherwise duplicate rows would be returned. To make this happen, there is a coordination between the workers. They all are *aware* that they are all running in parallel, so they keep a shared "next block to read" pointer, which each worker updates once it chooses it's own next block. This type of parallel plan node is called "parallel-aware"; it has a prefix "Parallel" before the plan name in the EXPLAIN output. A plan node sitting on top of such parallel-aware node might itself be running in a parallel worker, but it may not be aware of it, while actually it is processing only a partial set of rows in parallel since the underlying parallel-seq scan is processing its own set of table blocks. Such plan can be called as "parallel-oblivious" plan, for the sake of naming it. We will talk about this more when we discuss parallel joins and aggregates.
                                                                                
Another thumb-rule is : A Gather node is the umbrella parallel node, under which all the nodes in the subtree are run in parallel by workers. A Gather node's job is to gather the tuples returned by each worker, and pass it on to the upper node. All the nodes above Gather run in the usual parent backend. There cannot be nested Gather nodes.


Index Scan

The following query didn't produce a parallel index scan :                      

tpch=# explain (analyze, costs off)
select l_partkey from lineitem    where l_partkey < 100000;
                                                 QUERY PLAN
------------------------------------------------------------------------------------------------------------
 Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.078..895.358 rows=2999506 loops=1)
   Index Cond: (l_partkey < 100000)
   Heap Fetches: 0
 Planning Time: 0.129 ms
 Execution Time: 1012.693 ms

So let's try reducing parallel_tuple_cost, for the sake of reproducing a parallel index scan :
tpch=# set parallel_tuple_cost TO 0.0001;                                       
tpch=# explain (analyze, costs off) select l_partkey from lineitem    where l_partkey < 100000;
                                                        QUERY PLAN                                                        
--------------------------------------------------------------------------------------------------------------------------
 Gather (actual time=0.390..387.086 rows=2999506 loops=1)
   Workers Planned: 4
   Workers Launched: 4
   ->  Parallel Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.098..262.780 rows=599901 loops=5)
         Index Cond: (l_partkey < 100000)
         Heap Fetches: 0
 Planning Time: 0.802 ms
 Execution Time: 509.306 ms


Notes:

parallel_tuple_cost is the cost of transferring tuples from workers to leader backend. Note that I used a contrived value of .0001 just for the sake of reproducing a parallel index scan. Although setting it is giving us an index scan with faster execution time, it is not recommended to change these costing parameters without obtaining conclusive statistics on your system.

Index-only scan is a special kind of Index Scan, in that the index already has the data required by the select query, so a separate heap scan is avoided; only index is scanned. A plain index scan or a bitmap heap scan also supports parallelism; we will see more of these in aggregate or table join examples where they are more commonly seen.

A parallel index scan does not produce ordered results, unlike a non-parallel index scan. Multiple workers read index blocks in parallel. So although each worker returns its own tuples sorted, together the result set is not sorted due to parallel index block reads.

Parellel index scan is only supported for a btree index.



Parallel Aggregate


An aggregate expression in a query typically has drastically less number of rows returned, compared to the number of table rows, inheritently since the values are aggregate values returned from over a row set. So there is very less worker-leader tuple transfer cost involved, so aggregate query almost always gets benefited due to parallelism.

tpch=# -- Check out sequential aggregate plan
tpch=# set max_parallel_workers_per_gather TO  0;
tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;;
                                   QUERY PLAN
--------------------------------------------------------------------------------
 Aggregate (actual time=11230.951..11230.951 rows=1 loops=1)
   ->  Seq Scan on lineitem (actual time=0.009..2767.802 rows=29999795 loops=1)
 Planning Time: 0.105 ms
 Execution Time: 11231.739 ms


tpch=# -- Check out parallel aggregate plan
tpch=# set max_parallel_workers_per_gather TO  4;
tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;;
                                            QUERY PLAN
---------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=2150.383..2190.898 rows=1 loops=1)
   ->  Gather (actual time=2150.241..2190.883 rows=5 loops=1)
         Workers Planned: 4
         Workers Launched: 4
         ->  Partial Aggregate (actual time=2137.664..2137.665 rows=1 loops=5)
               ->  Parallel Seq Scan on lineitem (actual time=0.016..563.268 rows=5999959 loops=5)
 Planning Time: 0.896 ms
 Execution Time: 2202.304 ms

So it's 5 times faster; pretty neat.

Above, we can see that the usual Aggregate plan node is divided into two kinds of Aggregate plan nodes. The one below Gather node is the Partial Aggregate node, which, as it name implies, does an aggregate of only the values returned by its own worker. It means that it has not run the finalize function yet. That is the task of the Finalize Aggregate, which combines the partial aggregates returned by all the workers through the Gather node.



Joins


We will analyze the three different joins using this query :

select avg(l_discount) from orders, lineitem
where
    l_orderkey = o_orderkey
    and o_orderdate < date '1995-03-09'
    and l_shipdate > date '1995-03-09';


Merge Join

                                                                QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=5030.051..5241.390 rows=1 loops=1)
   ->  Gather (actual time=5029.991..5241.370 rows=5 loops=1)
         Workers Planned: 4
         Workers Launched: 4
         ->  Partial Aggregate (actual time=5015.939..5015.940 rows=1 loops=5)
               ->  Merge Join (actual time=199.287..4987.159 rows=149782 loops=5)
                     Merge Cond: (lineitem.l_orderkey = orders.o_orderkey)
                     ->  Parallel Index Scan using idx_lineitem_orderkey on lineitem (actual time=198.962..2095.747 rows=3248732 loops=5)
                           Filter: (l_shipdate > '1995-03-09'::date)
                           Rows Removed by Filter: 2751227
                     ->  Index Scan using orders_pkey on orders (actual time=0.057..2343.402 rows=3625756 loops=5)
                           Filter: (o_orderdate < '1995-03-09'::date)
                           Rows Removed by Filter: 3874054
 Planning Time: 0.290 ms
 Execution Time: 5243.194 ms


As you can see above, Merge Join is under a Gather node. That means, Merge Join is being executed in a parallel worker. Is the merge join being executed using some coordination with other workers ? Or, in other words, is Merge Join parallel-aware ? No. As you can see, the Merge Join does not have a "Parallel" prefix. Merge Join needs sorted input from both outer side and inner side, hence we have index scans both at inner side and outer side. Now, when a Merge Join is executed in a worker, a subset of outer side table is joined with full inner side. This is possible because the outer side is scanned using parallel Index Scan, and the inner side is a normal Index scan which means each worker does a full Index Scan of inner side. Effectively, the Merge join data is divided, thanks to the data that got divided by underlying Parallel Index Scan, and the Merge join does not even know that it is being run in parallel ! The caveat is that the inner side has to be redundantly scanned fully by each worker, followed by a sort if required. In our case the sort operation was not necessary becaues of the index.

There is a scope for improvement to make the Merge Join parallel-aware, by appropriately partitioning sorted data of both tables and do Merge Join of the pairs of partitioned data sets in parallel. But that discussion would need a separate blog.


Parallel-aware Hash Join

                                                 QUERY PLAN                                                  
-------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=3054.189..3099.784 rows=1 loops=1)
   ->  Gather (actual time=3022.810..3099.755 rows=5 loops=1)
         Workers Planned: 4
         Workers Launched: 4
         ->  Partial Aggregate (actual time=3007.931..3007.934 rows=1 loops=5)
               ->  Parallel Hash Join (actual time=643.552..2980.305 rows=149782 loops=5)
                     Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                     ->  Parallel Seq Scan on lineitem (actual time=0.030..1685.258 rows=3248732 loops=5)
                           Filter: (l_shipdate > '1995-03-09'::date)
                           Rows Removed by Filter: 2751227
                     ->  Parallel Hash (actual time=639.508..639.508 rows=725169 loops=5)
                           Buckets: 4194304  Batches: 1  Memory Usage: 174688kB
                           ->  Parallel Seq Scan on orders (actual time=14.083..384.196 rows=725169 loops=5)
                                 Filter: (o_orderdate < '1995-03-09'::date)
                                 Rows Removed by Filter: 774831
 Planning Time: 0.300 ms
 Execution Time: 3101.937 ms

The inner side of Hash Join is a "Parallel Hash" node. This plan builds a shared hash table by dividing the work among parallel coordinating workers. As with a sequential Hash Join, the outer side waits until the hash table is built. Once it is built, the same workers now start scanning the outer table and doing the join using the shared hash table. The outer scan is essentially a partial scan because each worker does it in parallel. So in our case, it's a parallel sequential scan.


Parallel-oblivious Hash Join

If the inner side is just a Hash node rather than a "Parallel Hash" node, then it means: a separate full hash table will be built by each of the workers rather than having a shared hash table, which obviously would be expensive than the parallel hash due to the absence of division of hash building work:

                                                            QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=5908.032..5971.214 rows=1 loops=1)
   ->  Gather (actual time=5852.417..5971.167 rows=5 loops=1)
         Workers Planned: 4
         Workers Launched: 4
         ->  Partial Aggregate (actual time=5850.930..5850.933 rows=1 loops=5)
               ->  Hash Join (actual time=2309.307..5826.753 rows=149782 loops=5)
                     Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                     ->  Parallel Seq Scan on lineitem (actual time=12.631..1712.443 rows=3248732 loops=5)
                           Filter: (l_shipdate > '1995-03-09'::date)
                           Rows Removed by Filter: 2751227
                     ->  Hash (actual time=2290.063..2290.065 rows=3625845 loops=5)
                           Buckets: 2097152  Batches: 4  Memory Usage: 48222kB
                           ->  Bitmap Heap Scan on orders (actual time=502.264..1512.424 rows=3625845 loops=5)
                                 Recheck Cond: (o_orderdate < '1995-03-09'::date)
                                 Heap Blocks: exact=138113
                                 ->  Bitmap Index Scan on idx_orders_orderdate (actual time=451.552..451.552 rows=3625845 loops=5)
                                       Index Cond: (o_orderdate < '1995-03-09'::date)
 Planning Time: 0.291 ms
 Execution Time: 5977.966 ms




Nested Loop Join

                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=7211.122..7258.289 rows=1 loops=1)
   ->  Gather (actual time=7193.150..7258.259 rows=5 loops=1)
         Workers Planned: 4
         Workers Launched: 4
         ->  Partial Aggregate (actual time=7129.209..7129.210 rows=1 loops=5)
               ->  Nested Loop (actual time=13.924..7100.095 rows=149782 loops=5)
                     ->  Parallel Seq Scan on lineitem (actual time=13.621..1919.712 rows=3248732 loops=5)
                           Filter: (l_shipdate > '1995-03-09'::date)
                           Rows Removed by Filter: 2751227
                     ->  Result Cache (actual time=0.001..0.001 rows=0 loops=16243662)
                           Cache Key: lineitem.l_orderkey
                           Hits: 2450631  Misses: 844081  Evictions: 0  Overflows: 0  Memory Usage: 61379kB
                           Worker 0:  Hits: 2443189  Misses: 841050  Evictions: 0  Overflows: 0  Memory Usage: 61158kB
                           Worker 1:  Hits: 2350093  Misses: 808929  Evictions: 0  Overflows: 0  Memory Usage: 58824kB
                           Worker 2:  Hits: 2424018  Misses: 833681  Evictions: 0  Overflows: 0  Memory Usage: 60615kB
                           Worker 3:  Hits: 2417114  Misses: 830876  Evictions: 0  Overflows: 0  Memory Usage: 60407kB
                           ->  Index Scan using orders_pkey on orders (actual time=0.004..0.004 rows=0 loops=4158617)
                                 Index Cond: (o_orderkey = lineitem.l_orderkey)
                                 Filter: (o_orderdate < '1995-03-09'::date)
                                 Rows Removed by Filter: 1
 Planning Time: 0.294 ms
 Execution Time: 7268.857 ms


By nature, nested loop join has to have the whole inner side scanned for each of the outer tuple. So we can divide the outer scan among workers, and have a complete inner table scan by each of the workers, which will give us a parallel-oblivious Nested Loop Join. There is no need to make it parallel-aware.



Sequential Join 

If we disable parallelism, we can see a sequential hash join. Note that all of the above parallel joins are reasonably fater than the below sequential join ...

tpch=# set max_parallel_workers_per_gather TO  0;

                                                       QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
 Aggregate (actual time=15714.776..15714.779 rows=1 loops=1)
   ->  Hash Join (actual time=5134.219..15603.861 rows=748912 loops=1)
         Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
         ->  Bitmap Heap Scan on lineitem (actual time=2837.938..7162.214 rows=16243662 loops=1)
               Recheck Cond: (l_shipdate > '1995-03-09'::date)
               Heap Blocks: exact=607593
               ->  Bitmap Index Scan on idx_lineitem_shipdate (actual time=2556.845..2556.845 rows=16243662 loops=1)
                     Index Cond: (l_shipdate > '1995-03-09'::date)
         ->  Hash (actual time=2290.201..2290.202 rows=3625845 loops=1)
               Buckets: 2097152  Batches: 4  Memory Usage: 48222kB
               ->  Bitmap Heap Scan on orders (actual time=563.536..1548.176 rows=3625845 loops=1)
                     Recheck Cond: (o_orderdate < '1995-03-09'::date)
                     Heap Blocks: exact=138113
                     ->  Bitmap Index Scan on idx_orders_orderdate (actual time=333.284..333.285 rows=3625845 loops=1)
                           Index Cond: (o_orderdate < '1995-03-09'::date)
 Planning Time: 0.267 ms
 Execution Time: 15727.275 ms




Gather Merge


tpch=# explain (analyze, costs off)
select  l_orderkey from lineitem where l_suppkey > 10000 order by l_suppkey ;
                                          QUERY PLAN
----------------------------------------------------------------------------------------------
 Gather Merge (actual time=3351.705..8310.367 rows=23998124 loops=1)
   Workers Planned: 4
   Workers Launched: 4
   ->  Sort (actual time=3181.446..3896.115 rows=4799625 loops=5)
         Sort Key: l_suppkey
         Sort Method: external merge  Disk: 136216kB
         Worker 0:  Sort Method: external merge  Disk: 120208kB
         Worker 1:  Sort Method: external merge  Disk: 116392kB
         Worker 2:  Sort Method: external merge  Disk: 123520kB
         Worker 3:  Sort Method: external merge  Disk: 114264kB
         ->  Parallel Seq Scan on lineitem (actual time=55.688..915.160 rows=4799625 loops=5)
               Filter: (l_suppkey > 10000)
               Rows Removed by Filter: 1200334
 Planning Time: 0.102 ms
 Execution Time: 9654.078 ms


Gather Merge is a modified version of the Gather plan. It basically parallelizes the sort. So Gather gets sorted output from the workers, and then it merges them and returns sorted output.


A sequential Sort took almost almost twice longer :

                                   QUERY PLAN                                    
---------------------------------------------------------------------------------
 Sort (actual time=14399.200..18068.514 rows=23998124 loops=1)
   Sort Key: l_suppkey
   Sort Method: external merge  Disk: 610560kB
   ->  Seq Scan on lineitem (actual time=16.346..4320.823 rows=23998124 loops=1)
         Filter: (l_suppkey > 10000)
         Rows Removed by Filter: 6001671
 Planning Time: 0.086 ms
 Execution Time: 20015.980 ms



There are lot of other scenarios where parallelism can be observed, but probably I will take it up in a later blog ....



Comments

Popular posts from this blog

Need for external compression methods in PostgreSQL

PostgreSQL on ARM

Backtraces in PostgreSQL