DEV Community

Franck Pachot for AWS Heroes

Posted on

Joins, Scale, and Denormalization

In database design, denormalization is often glorified as a necessary step to improve performance, especially in complex joins scenarios. This notion originates from the widespread belief that "joins don't scale", particularly in distributed systems. However, denormalization comes with trade-offs, including increased code complexity and challenges in maintaining data consistency.

I received an excellent question following my previous blog post, asking if a lot of denormalization is required in Aurora DSQL:

In this post, I'll create a typical relational schema that includes two tables: "orders" and "orderlines." I will also incorporate additional denormalized columns and compare the performance of a query using these denormalized columns versus a join on a normalized schema. My query will filter the orders based on the country and product and sort the results by the creation date.

Here are the normalized entity attributes:

  • "product" is a column of the "orderlines" table.
  • "country" and "created_at" are columns of the "orders" table.

To enhance the performance of my query, I could add the country and creation date attributes to the "orderlines" table. This way, I can perform all my filtering and sorting operations before executing the join. This approach represents a form of denormalization often used when there are concerns about the scalability of joins.

Here is the script I used to create the schema and generate a million orders, with ten lines each:

\timing on 
drop table if exists orders, orderlines cascade;

create table orders (
 primary key (order_id)
 , order_id uuid default gen_random_uuid()
 , country text
 , created_at timestamptz default now()
);

create index on orders(country, created_at);

create table orderlines (
 primary key (order_id, line_id)
 , order_id uuid -- references orders /* FOREIGN KEY constraint not supported in Aurora DSQL */
 , line_id int
 , product text
 , denormalized_country    text
 , denormalized_created_at timestamptz
);

create index on orderlines(denormalized_country, denormalized_created_at);

with orders(country, order_id, created_at) as (
 select
  (ARRAY['USA', 'France', 'Japan', 'Germany', 'India'])[(random()*5+1)]
  , gen_random_uuid()
  , now()
 from generate_series(1,100)
) , i1 as (
insert into orderlines select
 order_id, line_id
 , format('Product#%s',round(random()*50)::int)
 , country, created_at
 from orders,generate_series(1,10) line_id
) , i2 as (
 insert into orders(country, order_id, created_at) select * from orders
) select
\watch c=10000 0.01

analyze orders;
analyze orderlines;

select count(*) from orders;
select count(*) from orderlines;

Enter fullscreen mode Exit fullscreen mode

I didn't define referential integrity because Aurora DSQL doesn't support it. Additionally, as mentioned in the previous post, I structured the insertion to avoid exceeding the limit of 10,000 rows per transaction.

postgres=> select count(*) from orders;

  count
---------
 1000000
(1 row)

Time: 10114.990 ms (00:10.115)

postgres=> select count(*) from orderlines;

  count
----------
 10000000
(1 row)

Time: 99290.709 ms (01:39.291)
Enter fullscreen mode Exit fullscreen mode

Join on Normalized tables in Aurora DSQL

Here is the query I run on the normalized columns that retrieves the most recent records for a specific product (Product#42) ordered in France, ordered by the time they were created (latest first):

select * from orders join orderlines using(order_id)
where country='France' and product='Product#42'
order by created_at desc
;
Enter fullscreen mode Exit fullscreen mode

The execution takes one minute to retrieve 39700 rows:

postgres=> explain analyze
select * from orders join orderlines using(order_id)
where country='France' and product='Product#42'
order by created_at desc
;
                                                                                                                                                                                                                                                                     QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=1569969.34..1570073.39 rows=41621 width=67) (actual time=58267.313..58273.121 rows=39700 loops=1)
   Sort Key: orders.created_at DESC
   Sort Method: quicksort  Memory: 5569kB
   ->  Hash Join  (cost=1302424.49..1566775.96 rows=41621 width=67) (actual time=4458.633..58228.892 rows=39700 loops=1)
         Hash Cond: (orderlines.order_id = orders.order_id)
         ->  Custom Scan (btree-table) on orderlines  (cost=1250077.94..1513225.72 rows=209996 width=50) (actual time=0.761..53651.114 rows=199867 loops=1)
               Projected via pushdown compute engine: order_id, line_id, product, denormalized_country, denormalized_created_at
               Filters via pushdown compute engine: (product = 'Product#42'::text)
         ->  Hash  (cost=49881.23..49881.23 rows=197226 width=33) (actual time=4454.064..4454.071 rows=199785 loops=1)
               Buckets: 262144  Batches: 1  Memory Usage: 16096kB
               ->  Index Scan Backward using orders_country_created_at_idx on orders  (cost=24753.36..49881.23 rows=197226 width=33) (actual time=40.903..4397.518 rows=199785 loops=1)
                     Index Cond: (country = 'France'::text)
                     Projected via pushdown compute engine: order_id, country, created_at
 Planning Time: 58.315 ms
 Execution Time: 58275.753 ms
(15 rows)

Time: 58595.964 ms (00:58.596)

Enter fullscreen mode Exit fullscreen mode

The filters were pushed down to fetch 197226 rows from "orders" (with an ordered index range scan) and 199867 rows from "orderlines" (with a full scan with pushed-down filtering). This query is slow because the join condition was not pushed down.

Only one Join method in PostgreSQL can push down a job condition: the Nested Loop.

Let's try to disable Hash Join and Merge Join:

postgres=> set enable_hashjoin=off;
SET
Time: 269.677 ms

postgres=> set enable_mergejoin=off;
SET
Time: 18.359 ms

postgres=> explain analyze
select * from orders join orderlines using(order_id)
where country='France' and product='Product#42'
order by created_at desc
;
                                                                                                                                                                                                                         QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=14901762.36..14901866.41 rows=41621 width=67) (actual time=103592.415..103598.680 rows=39700 loops=1)
   Sort Key: orders.created_at DESC
   Sort Method: quicksort  Memory: 5569kB
   ->  Nested Loop  (cost=1250178.12..14898568.99 rows=41621 width=67) (actual time=8.211..103538.156 rows=39700 loops=1)
         ->  Custom Scan (btree-table) on orderlines  (cost=1250077.94..1513225.72 rows=209996 width=50) (actual time=0.648..3304.011 rows=199867 loops=1)
               Projected via pushdown compute engine: order_id, line_id, product, denormalized_country, denormalized_created_at
               Filters via pushdown compute engine: (product = 'Product#42'::text)
         ->  Memoize  (cost=100.19..104.19 rows=1 width=33) (actual time=0.498..0.500 rows=0 loops=199867)
               Cache Key: orderlines.order_id
               Cache Mode: logical
               Hits: 16922  Misses: 182945  Evictions: 0  Overflows: 0  Memory Usage: 16847kB
               ->  Index Only Scan using orders_pkey on orders  (cost=100.17..104.18 rows=1 width=33) (actual time=0.542..0.545 rows=0 loops=182945)
                     Index Cond: (order_id = orderlines.order_id)
                     Projected via pushdown compute engine: order_id, country, created_at
                     Filters via pushdown compute engine: (country = 'France'::text)
                     Heap Fetches: 0
 Planning Time: 0.235 ms
 Execution Time: 103600.727 ms
(18 rows)

Time: 103627.461 ms (01:43.627)

Enter fullscreen mode Exit fullscreen mode

The performance is not improved here due to the number of rows in the outer table, which results in loops in the inner table. I will later compare this with YugabyteDB to illustrate how a Distributed SQL database can leverage the Nested Loop by pushing down the join filter, thereby avoiding performing one loop for each row.

First, let's answer the question: does denormalization help?

Denormalization in Aurora DSQL

In the following query, I utilize the normalized columns that replicate the values from the "orders" table into the "orderlines" table, allowing all filtering to occur before the join.

postgres=> explain analyze
select * from orders join orderlines using(order_id)
where denormalized_country='France' and product='Product#42'
order by denormalized_created_at desc
;
                                                                                                                                                                                           QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=1569969.34..1570073.39 rows=41621 width=67) (actual time=57639.922..57646.350 rows=39700 loops=1)
   Sort Key: orders.created_at DESC
   Sort Method: quicksort  Memory: 5569kB
   ->  Hash Join  (cost=1302424.49..1566775.96 rows=41621 width=67) (actual time=4121.827..57596.461 rows=39700 loops=1)
         Hash Cond: (orderlines.order_id = orders.order_id)
         ->  Custom Scan (btree-table) on orderlines  (cost=1250077.94..1513225.72 rows=209996 width=50) (actual time=0.824..53345.865 rows=199867 loops=1)
               Projected via pushdown compute engine: order_id, line_id, product, denormalized_country, denormalized_created_at
               Filters via pushdown compute engine: (product = 'Product#42'::text)
         ->  Hash  (cost=49881.23..49881.23 rows=197226 width=33) (actual time=4118.756..4118.769 rows=199785 loops=1)
               Buckets: 262144  Batches: 1  Memory Usage: 16096kB
               ->  Index Scan Backward using orders_country_created_at_idx on orders  (cost=24753.36..49881.23 rows=197226 width=33) (actual time=32.584..4062.699 rows=199785 loops=1)
                     Index Cond: (country = 'France'::text)
                     Projected via pushdown compute engine: order_id, country, created_at
 Planning Time: 0.225 ms
 Execution Time: 57648.824 ms
(15 rows)

Time: 57653.252 ms (00:57.653)
Enter fullscreen mode Exit fullscreen mode

The response time improves when filtering occurs before joining and using the appropriate index that reads one range to retrieve rows in the necessary order.

I have created the most effective index for this query, which retrieves all the necessary rows from a specific range using equality predicates on "denormalized_country" and "product." Additionally, it returns the results in the expected order based on "denormalized_created_at." I anticipated that the Merge Join would maintain this order and eliminate the need for an additional sorting operation. However, sorting 39700 rows does not require significant time in this case.

An Index Scan Backward may not be the most efficient, but I was not able to create a descending index as it's a limitation of this preview:

postgres=> create index on orderlines(denormalized_country, denormalized_created_at desc);
ERROR:  specifying sort order not supported for index keys
Time: 174.067 ms
Enter fullscreen mode Exit fullscreen mode

I've run the query with an ascending ORDER BY and the response time is much faster:

postgres=> explain analyze
select * from orders join orderlines using(order_id)
where denormalized_country='France' and product='Product#42'
order by denormalized_created_at asc;
                                                                                                    QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=160668.38..160774.17 rows=42314 width=67) (actual time=7710.204..7716.482 rows=39700 loops=1)
   Sort Key: orderlines.denormalized_created_at
   Sort Method: quicksort  Memory: 5569kB
   ->  Merge Join  (cost=138969.95..157416.80 rows=42314 width=67) (actual time=966.429..7691.254 rows=39700 loops=1)
         Merge Cond: (orders.order_id = orderlines.order_id)
         ->  Index Only Scan using orders_pkey on orders  (cost=124486.05..139812.58 rows=995088 width=33) (actual time=1.594..6639.377 rows=999975 loops=1)
               Projected via pushdown compute engine: order_id, country, created_at
               Heap Fetches: 0
         ->  Sort  (cost=14482.85..14588.63 rows=42314 width=50) (actual time=964.500..973.289 rows=39700 loops=1)
               Sort Key: orderlines.order_id
               Sort Method: quicksort  Memory: 4638kB
               ->  Index Scan using orderlines_denormalized_country_product_denormalized_create_idx on orderlines  (cost=5389.34..11231.26 rows=42314 width=50) (actual time=156.506..952.285 rows=39700 loops=1)
                     Index Cond: ((denormalized_country = 'France'::text) AND (product = 'Product#42'::text))
                     Projected via pushdown compute engine: order_id, line_id, product, denormalized_country, denormalized_created_at
 Planning Time: 58.170 ms
 Execution Time: 7718.411 ms
(16 rows)

Time: 7905.397 ms (00:07.905)
Enter fullscreen mode Exit fullscreen mode

This doesn't provide the result that I wanted but proves that with the possibility of defining a descending index, the response time can be acceptable with some denormalization.

Finally, without the join, the Index Scan Backward is fast:

postgres=> explain analyze
select * from orderlines
where denormalized_country='France' and product='Product#42'
order by denormalized_created_at desc
;
                                                                                               QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Index Scan Backward using orderlines_denormalized_country_product_denormalized_create_idx on orderlines  (cost=5389.34..11231.26 rows=42314 width=50) (actual time=156.969..868.327 rows=39700 loops=1)
   Index Cond: ((denormalized_country = 'France'::text) AND (product = 'Product#42'::text))
   Projected via pushdown compute engine: order_id, line_id, product, denormalized_country, denormalized_created_at
 Planning Time: 34.859 ms
 Execution Time: 870.267 ms
(5 rows)

Time: 950.970 ms

Enter fullscreen mode Exit fullscreen mode

This demonstrates that performance is highly dependent on the query planner selecting the best execution plan and the SQL engine processing it optimally on the distributed storage.
In this example, denormalization was necessary. However, keep in mind that Aurora DSQL is still new and in preview mode, so more features and optimizations will be introduced in the future.

To illustrate this point, I will run the same test on YugabyteDB, which has been distributing PostgreSQL for five years and has evolved to optimize join methods for a distributed SQL database.

Comparing with YugabyteDB (Batched Nested Loops)

I've created the same in YugabyteDB, with a cluster in the same regions as the Aurora DSQL:

yugabyte=> select * from yb_servers();
    host     | port | num_connections | node_type | cloud |  region   |    zone    | public_ip |               uuid
-------------+------+-----------------+-----------+-------+-----------+------------+-----------+----------------------------------
 10.30.0.106 | 5433 |               0 | primary   | aws   | us-east-2 | us-east-2b |           | 990ae3e9fc494ac0b52928ae8d1a808e
 10.50.0.132 | 5433 |               0 | primary   | aws   | us-west-2 | us-west-2c |           | fb2a58472b424aa893bbc33a0e4d9ea6
 10.20.0.59  | 5433 |               0 | primary   | aws   | us-east-1 | us-east-1c |           | 92e5b27ee0b44b43832158ac1547b4e8
(3 rows)

Time: 104.778 ms

yugabyte=> select count(*) from orders;
  count
---------
 1000000
(1 row)

Time: 371.095 ms
yugabyte=> select count(*) from orderlines;
  count
----------
 10000000
(1 row)

Time: 2450.754 ms (00:02.451)

yugabyte=> explain analyze
select * from orders join orderlines using(order_id)
where country='France' and product='Product#42'
order by created_at desc
;
                                                                                                                                                                                                                                                                                         QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=4727067.60..4727175.36 rows=43102 width=58) (actual time=2607.205..2611.064 rows=39759 loops=1)
   Sort Key: orders.created_at DESC
   Sort Method: external merge  Disk: 2880kB
   ->  YB Batched Nested Loop Join  (cost=360.00..4723749.72 rows=43102 width=58) (actual time=22.334..2579.672 rows=39759 loops=1)
         Join Filter: (orders.order_id = orderlines.order_id)
         ->  Seq Scan on orders  (cost=180.00..1268130.74 rows=204600 width=30) (actual time=7.292..48.609 rows=199802 loops=1)
               Storage Filter: (country = 'France'::text)
         ->  Index Scan using orderlines_pkey on orderlines  (cost=180.00..17292.25 rows=219 width=44) (actual time=11.826..11.925 rows=203 loops=196)
               Index Cond: (order_id = ANY (ARRAY[orders.order_id, $1, $2, ..., $1023]))
               Storage Filter: (product = 'Product#42'::text)
 Planning Time: 0.477 ms
 Execution Time: 2614.582 ms
 Peak Memory Usage: 7331 kB
(13 rows)

Time: 2717.264 ms (00:02.717)

Enter fullscreen mode Exit fullscreen mode

The execution time is two seconds without any denormalization. YugabyteDB optimizes the process by pushing down the join condition as an array of 1,024 values from the outer table. It employs a Batched Nested Loop Join to minimize the number of iterations needed. This efficiency is possible because YugabyteDB stores indexes as LSM trees, which facilitate Index Skip Scan, allowing multiple ranges to be read in a single scan.

The default for the batched nested loop is an array of 1024 values but we can increase the batch size to do fewer loops:

yugabyte=> set yb_bnl_batch_size to 10240;
SET
Time: 102.098 ms

yugabyte=> explain analyze
select * from orders join orderlines using(order_id)
where country='France' and product='Product#42'
order by created_at
;
                                                                                                                                              QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=4729721.18..4729828.93 rows=43102 width=58) (actual time=2153.855..2157.834 rows=39759 loops=1)
   Sort Key: orders.created_at DESC
   Sort Method: external merge  Disk: 2880kB
   ->  YB Batched Nested Loop Join  (cost=360.00..4726403.30 rows=43102 width=58) (actual time=116.579..2125.446 rows=39759 loops=1)
         Join Filter: (orders.order_id = orderlines.order_id)
         ->  Seq Scan on orders  (cost=180.00..1268130.74 rows=204600 width=30) (actual time=7.404..257.041 rows=199802 loops=1)
               Storage Filter: (country = 'France'::text)
         ->  Index Scan using orderlines_pkey on orderlines  (cost=180.00..173055.24 rows=2193 width=44) (actual time=78.630..79.631 rows=1988 loops=20)
               Index Cond: (order_id = ANY (ARRAY[orders.order_id, $1, $2, ..., $10239]))
               Storage Filter: (product = 'Product#42'::text)
 Planning Time: 3.009 ms
 Execution Time: 2162.229 ms
 Peak Memory Usage: 21179 kB
(13 rows)

Time: 2267.540 ms (00:02.268)

Enter fullscreen mode Exit fullscreen mode

The difference in performance is minimal. A batch size of 1024 is usually acceptable. Reading multiple times from distributed storage is fine as long as we are fetching hundreds or thousands of rows, which results in insignificant network latency.

The Batched Nested Loop Join maintains the order, so we can avoid the extra sorting step. In this case, the query planner did not leverage this opportunity because the number of rows to sort is minimal. However, if I were performing pagination and retrieving only the top 10 rows, it would choose to scan backward to bypass the sorting operation:

yugabyte=> explain analyze
select * from orders join orderlines using(order_id)
where country='France' and product='Product#42'
order by created_at desc limit 10
;
                                                                                QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=5587.50..7820.76 rows=10 width=58) (actual time=3.064..3.068 rows=10 loops=1)
   ->  YB Batched Nested Loop Join  (cost=5587.50..9631368.33 rows=43102 width=58) (actual time=3.063..3.064 rows=10 loops=1)
         Join Filter: (orders.order_id = orderlines.order_id)
         Sort Keys: orders.created_at DESC
         ->  Index Scan Backward using orders_country_created_at_idx on orders  (cost=5407.50..6175749.35 rows=204600 width=30) (actual time=1.796..1.812 rows=48 loops=1)
               Index Cond: (country = 'France'::text)
         ->  Index Scan using orderlines_pkey on orderlines  (cost=180.00..17292.25 rows=219 width=44) (actual time=1.150..1.157 rows=11 loops=1)
               Index Cond: (order_id = ANY (ARRAY[orders.order_id, $1, $2, ..., $1023]))
               Storage Filter: (product = 'Product#42'::text)
 Planning Time: 0.518 ms
 Execution Time: 3.240 ms
 Peak Memory Usage: 563 kB
(12 rows)

Time: 105.846 ms
Enter fullscreen mode Exit fullscreen mode

The backward scan on the LSM tree is more costly than a forward scan, so I can enhance this further with the proper index.

yugabyte=> create index on orders(country, created_at desc);
CREATE INDEX
Time: 47401.337 ms (00:47.401)

yugabyte=> explain analyze
select * from orders join orderlines using(order_id)
where country='France' and product='Product#42'
order by created_at desc limit 10
;
                                                                            QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=5587.50..7262.58 rows=10 width=58) (actual time=2.667..2.670 rows=10 loops=1)
   ->  YB Batched Nested Loop Join  (cost=5587.50..7225523.33 rows=43102 width=58) (actual time=2.666..2.667 rows=10 loops=1)
         Join Filter: (orders.order_id = orderlines.order_id)
         Sort Keys: orders.created_at DESC
         ->  Index Scan using orders_country_created_at_idx2 on orders  (cost=5407.50..3769904.35 rows=204600 width=30) (actual time=1.340..1.373 rows=48 loops=1)
               Index Cond: (country = 'France'::text)
         ->  Index Scan using orderlines_pkey on orderlines  (cost=180.00..17292.25 rows=219 width=44) (actual time=1.184..1.194 rows=15 loops=1)
               Index Cond: (order_id = ANY (ARRAY[orders.order_id, $1, $2, ..., $1023]))
               Storage Filter: (product = 'Product#42'::text)
 Planning Time: 0.524 ms
 Execution Time: 2.881 ms
 Peak Memory Usage: 563 kB
(12 rows)
Enter fullscreen mode Exit fullscreen mode

Without any denormalization, I have a typical OLTP query that joins two large tables with filters on both tables, returning a predictable response time in single-digit milliseconds.

Of course, denormalization is still possible and fast, it is just not needed when the database optimizes the joins:

yugabyte=> explain analyze
select * from orderlines
where denormalized_country='France' and product='Product#42'
order by denormalized_created_at desc
;
                                                                                              QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=1009817.88..1009920.71 rows=41129 width=44) (actual time=286.485..288.479 rows=39759 loops=1)
   Sort Key: denormalized_created_at DESC
   Sort Method: quicksort  Memory: 4052kB
   ->  Index Scan using orderlines_denormalized_country_product_denormalized_create_idx on orderlines  (cost=6735.62..1006665.78 rows=41129 width=44) (actual time=8.135..276.999 rows=39759 loops=1)
         Index Cond: ((denormalized_country = 'France'::text) AND (product = 'Product#42'::text))
 Planning Time: 0.101 ms
 Execution Time: 291.618 ms
 Peak Memory Usage: 5823 kB
(8 rows)

Time: 393.763 ms

Enter fullscreen mode Exit fullscreen mode

Denormalization

Denormalization is often used as a premature optimization driven by the misconception that joins don't scale. Before resorting to denormalization, ensure your database employs the most efficient join methods.

With Batched Nested Loop Join and Index Skip Scan, YugabyteDB demonstrates this by extending PostgreSQL's capabilities with an additional join method optimized for distributed environments. This allows joins to scale effectively without sacrificing consistency or burdening application code.

Aurora DSQL, currently in preview (many missing functionalities are not product limits but features not made available yet), uses PostgreSQL's row-by-row Nested Loop join algorithm. Sometimes, this might require you to denormalize your database schema to filter more rows before the join. It’s important to note that Aurora DSQL is still new and in preview. You may have to denormalize your schema now, which could challenge maintaining data integrity within your application code. However, by the time you develop your application, there may be improvements, such as better indexing and new join methods. Remember that denormalization should be viewed more as a workaround than a permanent solution.

Top comments (0)