DEV Community

Franck Pachot for YugabyteDB Distributed PostgreSQL Database

Posted on • Updated on

Understand what you run before publishing your (silly) benchmark results

Before wasting time and money on large cloud instances, a benchmark to compare database performance should start with understanding what you run:

  1. run the full workload (all queries) but from a single user in a single iteration
  2. look at the response time and the execution plan
  3. understand how to get an efficient and scalable run

Different databases, even if all compatible from a SQL point of view, may require different settings and indexes. The default settings are rarely good for intensive activity because they may be defined for small environments, to allow an easy quick start, and they may stay compatible with older versions without leveraging new features.

If you read a benchmark report that uses all default settings, you can safely ignore their conclusions. There's a good chance that they tuned only the database they were paid to advocate.

To show that it is not difficut to understand what you run, when in a PostgreSQL-compatible database, I'll look at the HammerDB benchmark connected to YugabyteDB. HammerDB has no specific code for it but YugabyteDB is PostgreSQL-compatible (it uses PostgreSQL code on top of distributed storage and transaction).

Docker Compose

Here is my docker-compose.yaml to start YugabyteDB and HammerDB:


  yb:
      image: yugabytedb/yugabyte:latest
      volumes:
          - .:/u
      environment:
       - YSQL_DB=tpcc
       - YSQL_USER=franck
       - YSQL_PASSWORD=Apagn1
      command: |
       bash -c '
        yugabyted start --tserver_flags=flagfile=/u/yb-tserver.flags ;
        cd /root/var/logs/tserver ;
        tail -F postgresql-*.log
        '
      healthcheck:
        test: bash -c 'postgres/bin/pg_isready -h $$(hostname)'
        interval: 5s
        timeout: 1s
        retries: 3
        start_period: 30s
      ports:
       - 7000:7000
       - 15433:15433
  hd:
      image: tpcorg/hammerdb
      volumes:
          - .:/u
      command: sleep infinity
      depends_on:
       yb:
        condition: service_healthy

Enter fullscreen mode Exit fullscreen mode

Here is the yb-tserver.flags referenced to setup the YugabyteDB configuration:

# Read Committed with PostgreSQL-like locking (wait on lock)
--yb_enable_read_committed_isolation=true
--enable_deadlock_detection=true
--enable_wait_queues=true
# Allow logging statements and execution plans
--ysql_pg_conf_csv=log_statement='all',shared_preload_libraries='auto_explain'
# ignore warning when running ANALYZE
--ysql_beta_features=true

Enter fullscreen mode Exit fullscreen mode

I start the containers and define aliases to run the HammerDB client and the YugabyteDB shell (equivalent to PostgreSQL psql):

docker compose up -d

alias hammerdbcli="docker compose exec hd ./hammerdbcli"
alias ysqlsh="docker compose exec -e PGPASSWORD=yugabyte yb ysqlsh -h yb -U yugabyte tpcc"

Enter fullscreen mode Exit fullscreen mode

Setup the configuration and database

I run the HammerDB Command Line Interface with the alias above (hammerdbcli) and configure my environement.

I set PostgreSQL connection to YugabyteDB

dbset db pg
diset connection            pg_host  yb
diset connection            pg_port  5433
diset tpcc pg_superuser     yugabyte
diset tpcc pg_superuserpass yugabyte
diset tpcc pg_defaultdbase  yugabyte
diset tpcc pg_user          franck
diset tpcc pg_pass          Apagn1
diset tpcc pg_dbase         tpcc

Enter fullscreen mode Exit fullscreen mode

I build the schema:

dbset bm TPC-C
buildschema
Enter fullscreen mode Exit fullscreen mode

Image description
...
Image description

I can connect to the database with my ysqlsh alias and look at the tables and functions:

tpcc=# \dt+
                      List of relations
 Schema |    Name    | Type  | Owner  |  Size   | Description
--------+------------+-------+--------+---------+-------------
 public | customer   | table | franck | 44 MB   |
 public | district   | table | franck | 1090 kB |
 public | history    | table | franck | 16 MB   |
 public | item       | table | franck | 25 MB   |
 public | new_order  | table | franck | 1024 kB |
 public | order_line | table | franck | 121 MB  |
 public | orders     | table | franck | 7193 kB |
 public | stock      | table | franck | 92 MB   |
 public | warehouse  | table | franck | 1089 kB |
(9 rows)

tpcc=# \df
                                                                       List of functions
 Schema |    Name     | Result data type |                                             Argument data types                                              | Type
--------+-------------+------------------+--------------------------------------------------------------------------------------------------------------+------
 public | dbms_random | integer          | integer, integer                                                                                             | func
 public | delivery    | integer          | integer, integer                                                                                             | func
 public | neword      | numeric          | integer, integer, integer, integer, integer, integer                                                         | func
 public | ostat       | SETOF record     | integer, integer, integer, integer, character varying                                                        | func
 public | payment     | integer          | integer, integer, integer, integer, integer, integer, numeric, character varying, character varying, numeric | func
 public | slev        | integer          | integer, integer, integer                                                                                    | func
(6 rows)

tpcc=#
Enter fullscreen mode Exit fullscreen mode

Don't waste your time loading huge amounts of data. What matters is data distribution and execution plans. That's enough to understand the scalability. When you know what you are running, and that you run it with a relevant configuration, you can scale with more data.

Run the benchmark use cases

Back to hammerdbcli I'll run only one iteration to see the SQL statements:

diset tpcc pg_total_iterations 1
diset tpcc pg_rampup 0
diset tpcc  0
diset tpcc pg_duration 0
vurun
Enter fullscreen mode Exit fullscreen mode

Here is the HammerDB log:
Image description

Here is the PostgreSQL log in YugabyteDB:
Image description
I have logged all statements (setting log_statement='all' in --ysql_pg_conf_csv) so that I can see what is run by the application.

I'm convinced that running a benchmark, and publishing the results, is useless without understanding what is run, and PostgreSQL, or PostgreSQL compatible databases, makes it easy to trace the SQL calls.

Extract the SQL statements

Here is how I get all SQL statements that were run by this small execution:

docker compose exec yb bash -c '
grep -E "(LOG:|DETAIL:)" /root/var/logs/tserver/postgresql-*.log
'
Enter fullscreen mode Exit fullscreen mode

I see two statements that I can easily reproduce in ysqlsh:


prepare payment (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, NUMERIC, VARCHAR) AS select payment($1,$2,$3,$4,$5,$6,$7,'$8','0',0);
prepare neword (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER) as select neword($1,$2,$3,$4,$5,0);

\timing on

explain (costs off, analyze)
execute payment ('1', '10', '1', '10', '2857',  '1',  '1478', 'BAROUGHTATION');

explain (costs off, analyze)
execute neword('1', '1', '3', '6', '5');

Enter fullscreen mode Exit fullscreen mode

Here is the result when I run it with ysqlsh on my lab:

tpcc=# \timing on
Timing is on.
tpcc=# explain (costs off, analyze)
tpcc-# execute payment ('1', '10', '1', '10', '2857',  '1',  '1478', 'BAROUGHTATION');
                      QUERY PLAN
------------------------------------------------------
 Result (actual time=111.295..111.297 rows=1 loops=1)
 Planning Time: 0.127 ms
 Execution Time: 111.331 ms
 Peak Memory Usage: 8 kB
(4 rows)

Time: 116.268 ms
tpcc=# explain (costs off, analyze)
tpcc-# execute neword('1', '1', '3', '6', '5');
                      QUERY PLAN
------------------------------------------------------
 Result (actual time=179.656..179.657 rows=1 loops=1)
 Planning Time: 0.140 ms
 Execution Time: 179.726 ms
 Peak Memory Usage: 177 kB
(4 rows)

Time: 186.771 ms

Enter fullscreen mode Exit fullscreen mode

At this point, it is clear that it makes no sense to run that with a benchmark with this. 100 milliseconds for OLTP is probably not what we expect. But that's not because of the database, it is because we don't know what we run.

pg_stat_statements

To get an average response time on multiple runs, pg_stat_statements collects them:

select pg_stat_statements_reset();
execute payment ('1', '10', '1', '10', '2857',  '1',  '1478', 'BAROUGHTATION');

\watch
-- stop after a while (^C)

select
 mean_time::numeric(10,2), min_time::numeric(10,2), max_time::numeric(10,2)
 , calls, rows, query 
from pg_stat_statements;

Enter fullscreen mode Exit fullscreen mode

However, this shows only the top-level statement, which is not useful to look at the execution plan for the recursive queries it runs:

  mean_time | min_time | max_time | calls | rows |                                                                    query
-----------+----------+----------+-------+------+---------------------------------------------------------------------------------------------------------------------------------------------
      0.08 |     0.08 |     0.08 |     1 |    1 | select pg_stat_statements_reset()
      0.09 |     0.07 |     0.10 |     3 |    8 | select                                                                                                                                     +
           |          |          |       |      |  mean_time::numeric(10,2), min_time::numeric(10,2), max_time::numeric(10,2)                                                                +
           |          |          |       |      |  , calls, rows, query                                                                                                                      +
           |          |          |       |      | from pg_stat_statements
     55.93 |    25.43 |    83.04 |   101 |  101 | prepare payment (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, NUMERIC, VARCHAR) AS select payment($1,$2,$3,$4,$5,$6,$7,'$8','0',0)
(3 rows)
Enter fullscreen mode Exit fullscreen mode

Before running a benchmark with multiple threads, I want to be sure if all is configured properly or if we can reduce this average response time of 56 milliseconds.

Auto Explain

I want to look at the queries that are run within those stored functions. For this, I log all statements and their execution plan, with the result in my console, to avoid going to the log file, and with all nested statements to get the SQL within the stored procedures:

-- log to the console in addition to the logfile
set client_min_messages=log;
-- log all statements
set log_statement='all';
-- explain analyze all statements
set auto_explain.log_min_duration=0;
set auto_explain.log_analyze=on;
set auto_explain.log_timing=on;
-- including recursive statements run within the procedure execution
set auto_explain.log_nested_statements=on;

Enter fullscreen mode Exit fullscreen mode

This is YugabyteDB but it is the same as PostgreSQL. You don't have to learn something new and that's the advantage of PostgreSQL compatibility.

I run again the two calls and all queries are logged:

tpcc=# execute payment ('1', '10', '1', '10', '2857',  '1',  '1478', 'BAROUGHTATION');

LOG:  statement: execute payment ('1', '10', '1', '10', '2857',  '1',  '1478', 'BAROUGHTATION');
DETAIL:  prepare: prepare payment (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, NUMERIC, VARCHAR) AS select payment($1,$2,$3,$4,$5,$6,$7,'$8','0',0);

LOG:  duration: 2.031 ms  plan:
Query Text: UPDATE warehouse
                SET w_ytd = w_ytd + p_h_amount
                WHERE w_id = p_w_id
                RETURNING w_street_1, w_street_2, w_city, w_state, w_zip, w_name
Update on warehouse  (cost=0.00..0.11 rows=1 width=328) (actual time=2.022..2.025 rows=1 loops=1)
  ->  Seq Scan on warehouse  (cost=0.00..0.11 rows=1 width=328) (actual time=1.700..1.702 rows=1 loops=1)
        Remote Filter: (w_id = 1)

LOG:  duration: 1.986 ms  plan:
Query Text: UPDATE district
                SET d_ytd = d_ytd + p_h_amount
                WHERE d_w_id = p_w_id AND d_id = p_d_id
                RETURNING d_street_1, d_street_2, d_city, d_state, d_zip, d_name
Update on district  (cost=0.00..1.10 rows=10 width=334) (actual time=1.969..1.975 rows=1 loops=1)
  ->  Seq Scan on district  (cost=0.00..1.10 rows=10 width=334) (actual time=1.843..1.849 rows=1 loops=1)
        Remote Filter: ((d_w_id = 1) AND (d_id = 10))

LOG:  duration: 1.454 ms  plan:
Query Text: SELECT count(c_last)                                 FROM customer
                WHERE c_last = p_c_last AND c_d_id = p_c_d_id AND c_w_id = p_c_w_id
Aggregate  (cost=364.00..364.01 rows=1 width=8) (actual time=1.447..1.447 rows=1 loops=1)
  ->  Index Only Scan using customer_i2 on customer  (cost=0.00..356.50 rows=3000 width=50) (actual time=1.400..1.400 rows=0 loops=1)
        Index Cond: ((c_w_id = 1) AND (c_d_id = 10) AND (c_last = '$8'::text))
        Heap Fetches: 0

LOG:  duration: 0.001 ms  plan:
Query Text: SELECT c_first, c_middle, c_id,
                c_street_1, c_street_2, c_city, c_state, c_zip,
                c_phone, c_credit, c_credit_lim,
                c_discount, c_balance, c_since
                FROM customer
                WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_last = p_c_last
                ORDER BY c_first
Index Scan using customer_i2 on customer  (cost=0.00..386.50 rows=3000 width=424) (never executed)
  Index Cond: ((c_w_id = 1) AND (c_d_id = 10) AND ((c_last)::text = '$8'::text))

LOG:  duration: 12.319 ms  plan:
Query Text: UPDATE customer
                SET c_balance = p_c_balance - p_h_amount
                WHERE c_w_id = p_c_w_id AND c_d_id = p_c_d_id AND c_id = p_c_id
                RETURNING c_balance, ' '
Update on customer  (cost=0.00..4.12 rows=1 width=2088) (actual time=12.303..12.305 rows=1 loops=1)
  ->  Result  (cost=0.00..4.12 rows=1 width=2088) (actual time=0.003..0.004 rows=1 loops=1)

LOG:  duration: 10.626 ms  plan:
Query Text: INSERT INTO history (h_c_d_id, h_c_w_id, h_c_id, h_d_id,h_w_id, h_date, h_amount, h_data)
                VALUES (p_c_d_id, p_c_w_id, p_c_id, p_d_id,     p_w_id, tstamp, p_h_amount, p_w_name || ' ' || p_d_name)
Insert on history  (cost=0.00..0.01 rows=1 width=104) (actual time=0.167..0.167 rows=0 loops=1)
  ->  Result  (cost=0.00..0.01 rows=1 width=104) (actual time=0.004..0.004 rows=1 loops=1)

LOG:  duration: 93.055 ms  plan:
Query Text: prepare payment (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, NUMERIC, VARCHAR) AS select payment($1,$2,$3,$4,$5,$6,$7,'$8','0',0);
Result  (cost=0.00..0.26 rows=1 width=4) (actual time=93.049..93.050 rows=1 loops=1)
 payment
---------
    2857
(1 row)

Time: 96.074 ms

tpcc=# execute neword('1', '1', '3', '6', '5');

LOG:  statement: execute neword('1', '1', '3', '6', '5');
DETAIL:  prepare: prepare neword (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER) as select neword($1,$2,$3,$4,$5,0);

LOG:  duration: 2.656 ms  plan:
Query Text: SELECT c_discount, c_last, c_credit, w_tax
                                                                                     FROM customer, warehouse
                WHERE warehouse.w_id = no_w_id AND customer.c_w_id = no_w_id AND customer.c_d_id = no_d_id AND customer.c_id = no_c_id
Nested Loop  (cost=0.00..4.23 rows=1 width=86) (actual time=2.650..2.650 rows=1 loops=1)
  ->  Index Scan using customer_i1 on customer  (cost=0.00..4.12 rows=1 width=74) (actual time=1.615..1.615 rows=1 loops=1)
        Index Cond: ((c_w_id = 1) AND (c_d_id = 3) AND (c_id = 6))
  ->  Seq Scan on warehouse  (cost=0.00..0.10 rows=1 width=12) (actual time=0.969..0.969 rows=1 loops=1)
        Remote Filter: (w_id = 1)

LOG:  duration: 1.143 ms  plan:
Query Text: UPDATE district SET d_next_o_id = d_next_o_id + 1 WHERE d_id = no_d_id AND d_w_id = no_w_id RETURNING d_next_o_id, d_tax
Update on district  (cost=0.00..1.07 rows=10 width=334) (actual time=1.134..1.137 rows=1 loops=1)
  ->  Seq Scan on district  (cost=0.00..1.07 rows=10 width=334) (actual time=1.087..1.089 rows=1 loops=1)
        Remote Filter: ((d_id = 3) AND (d_w_id = 1))

LOG:  duration: 0.162 ms  plan:
Query Text: INSERT INTO ORDERS (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_ol_cnt, o_all_local) VALUES (no_d_next_o_id, no_d_id, no_w_id, no_c_id, current_timestamp, no_o_ol_cnt, no_o_all_local)
Insert on orders  (cost=0.00..0.01 rows=1 width=28) (actual time=0.159..0.160 rows=0 loops=1)
  ->  Result  (cost=0.00..0.01 rows=1 width=28) (actual time=0.004..0.004 rows=1 loops=1)

LOG:  duration: 0.068 ms  plan:
Query Text: INSERT INTO NEW_ORDER (no_o_id, no_d_id, no_w_id) VALUES (no_d_next_o_id, no_d_id, no_w_id)
Insert on new_order  (cost=0.00..0.01 rows=1 width=10) (actual time=0.066..0.066 rows=0 loops=1)
  ->  Result  (cost=0.00..0.01 rows=1 width=10) (actual time=0.003..0.003 rows=1 loops=1)

LOG:  duration: 11.019 ms  plan:
Query Text: SELECT array_agg ( i_price )
                                                 FROM UNNEST(item_id_array) item_id
                LEFT JOIN item ON i_id = item_id
Aggregate  (cost=16.26..16.27 rows=1 width=32) (actual time=11.008..11.008 rows=1 loops=1)
  ->  Nested Loop Left Join  (cost=0.00..16.00 rows=100 width=12) (actual time=1.523..10.966 rows=5 loops=1)
        ->  Function Scan on unnest item_id  (cost=0.00..1.00 rows=100 width=4) (actual time=0.014..0.025 rows=5 loops=1)
        ->  Index Scan using item_i1 on item  (cost=0.00..0.15 rows=1 width=16) (actual time=2.131..2.131 rows=1 loops=5)
              Index Cond: (i_id = item_id.item_id)

LOG:  duration: 42.221 ms  plan:
Query Text: WITH stock_update AS (
                UPDATE stock
                SET s_quantity = ( CASE WHEN s_quantity < (item_stock.quantity + 10) THEN s_quantity + 91 ELSE s_quantity END) - item_stock.quantity
                FROM UNNEST(item_id_array, supply_wid_array, quantity_array, price_array)
                AS item_stock (item_id, supply_wid, quantity, price)
                WHERE stock.s_i_id = item_stock.item_id
                AND stock.s_w_id = item_stock.supply_wid
                AND stock.s_w_id = ANY(supply_wid_array)
                RETURNING stock.s_dist_03 as s_dist, stock.s_quantity, ( item_stock.quantity + item_stock.price * ( 1 + no_w_tax + no_d_tax ) * ( 1 - no_c_discount ) ) amount
                )
                SELECT array_agg ( s_dist ), array_agg ( s_quantity ), array_agg ( amount )
                FROM stock_update
Aggregate  (cost=16.69..16.70 rows=1 width=96) (actual time=42.204..42.204 rows=1 loops=1)
  CTE stock_update
    ->  Update on stock  (cost=0.01..16.65 rows=1 width=1268) (actual time=3.549..42.075 rows=5 loops=1)
          ->  Nested Loop  (cost=0.01..16.65 rows=1 width=1268) (actual time=3.422..41.443 rows=5 loops=1)
                ->  Function Scan on item_stock  (cost=0.01..1.01 rows=100 width=108) (actual time=0.056..0.094 rows=5 loops=1)
                ->  Index Scan using stock_i1 on stock  (cost=0.00..0.16 rows=1 width=1168) (actual time=8.191..8.191 rows=1 loops=5)
                      Index Cond: ((s_i_id = item_stock.item_id) AND (s_w_id = item_stock.supply_wid) AND (s_w_id = ANY ('{1,1,1,1,1}'::integer[])))
  ->  CTE Scan on stock_update  (cost=0.00..0.02 rows=1 width=134) (actual time=3.556..42.136 rows=5 loops=1)

LOG:  duration: 0.558 ms  plan:
Query Text: INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_quantity, ol_amount, ol_dist_info)
                SELECT no_d_next_o_id,
                no_d_id,
                no_w_id,
                data.line_number,
                data.item_id,
                data.supply_wid,
                data.quantity,
                data.amount,
                data.stock_dist
                FROM UNNEST(order_line_array,
                item_id_array,
                supply_wid_array,
                quantity_array,
                amount_array,
                stock_dist_array)
                AS data( line_number, item_id, supply_wid, quantity, amount, stock_dist)
Insert on order_line  (cost=0.02..1.51 rows=100 width=144) (actual time=0.552..0.552 rows=0 loops=1)
  ->  Function Scan on data  (cost=0.02..1.51 rows=100 width=144) (actual time=0.058..0.074 rows=5 loops=1)

LOG:  duration: 141.211 ms  plan:
Query Text: prepare neword (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER) as select neword($1,$2,$3,$4,$5,0);
Result  (cost=0.00..0.26 rows=1 width=32) (actual time=141.197..141.198 rows=1 loops=1)
 neword
--------
 173.12
(1 row)

Time: 150.271 ms

Enter fullscreen mode Exit fullscreen mode

The query above are far from optimal. And they are probably very different from the applications you run today. Even with the HammerDB parameter pg_storedprocs=false it runs all the code in stored PL/pgSQL functions. There are also some inefficient constructs in the SQL statements. You can trace them or look at the source code, like scanning two times the same table here. There are efficient solutions for this in PostgreSQL and YugabyteDB:

Select the First Row of Each Set of Rows Grouped Using GROUP BY

See how to select the first row of each set of rows in PostgreSQL through a variety of queries, clauses, and functions.

favicon yugabyte.com

In addition to that, even you see prepared statements, this is only for the call to the functions for which the planning time is not a problem. The queries within those calls are still parsed for each call. After running those, only the top-level call is prepared:

tpcc=# select * from pg_prepared_statements;
  name   |                                                                  statement
                              |         prepare_time          |                                parameter_types
                    | from_sql
---------+----------------------------------------------------------------------------------------------------------------
------------------------------+-------------------------------+-----------------------------------------------------------
--------------------+----------
 payment | prepare payment (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, NUMERIC, VARCHAR) AS select payment($1,$
2,$3,$4,$5,$6,$7,'$8','0',0); | 2023-07-19 18:47:02.750581+00 | {integer,integer,integer,integer,integer,integer,numeric,"
character varying"} | t
 neword  | prepare neword (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER) as select neword($1,$2,$3,$4,$5,0);
                              | 2023-07-19 18:47:03.257617+00 | {integer,integer,integer,integer,integer}
                    | t
(2 rows)
Enter fullscreen mode Exit fullscreen mode

The complex SQL queries will have to be parsed and executed for each execution. This may be ok because the parse time is fast (YugabyteDB caches the catalog tables in the local node) but, again, you need to know what you are measuring. Calling a stored function, with all business logic, though a prepared statement is probably not what you are doing with your application. Do you go to cloud-native databases to lift-and-shift applications designed in the 90's? That may not be a good idea.

HammerDB queries were ported to PostgreSQL from an old benchmark made for Oracle Database (Oracle forbids the publication of such benchmarks today), which was probably coded more than 20 years ago. TPC-C is the simulation of an old monolithic client/server application, which has nothing in common with modern applications. Then it was ported to PostgreSQL, with some modifications made for some PostgreSQL-compatible vendors (HammerDB has some settings like pg_cituscompat for CitusDB and pg_oracompat for EDB).

The YugabyteDB default settings suppose modern applications. Running this as-is with all default configuration makes no sense.

Run with Cost Based Optimizer and Batched Nested Loop

Those kind of applications inherited from the past still exits. There are two solutions: review the code, and make something readable and optimizable, or use the most advanced database optimizer features.

There are two features that can help in YugabyteDB:

  • The Cost Based Optimizer. For simple OLTP, the rule based optimizer which is still the default in this version (YugabyteDB 2.9) gives predictible performance. But for complex queries, the query planner can look at the statistics gathered with ANALYZE to estimate the cardinality and then choose another join order.
  • The Batched Nested Loop. By default a Nested Loop Join is row-by-row from the outer table to the inner table. To scale horizontally, this may involve network calls for each row. The NoSQL databases, like DynamoDB or MongoDB, prevent this by not allowing SQL joins. The sharded databases, like Citus extension for PostgreSQL or Oracle Sharding option, limits the distribution of data to collocate all rows that are joined. But in Distributed SQL, this is not acceptable and YugabyteDB has implemented batching to limit the network calls during a Nested Loop Join.

Let's try it with this HammerDB benchmark, without changing the code. I enable those two features and run the same again:

tpcc=# set yb_enable_optimizer_statistics=on;
SET
Time: 0.617 ms

tpcc=# set yb_bnl_batch_size=1000;
SET
Time: 0.524 ms


tpcc=# explain (costs off, analyze)
       execute payment ('1', '10', '1', '10', '2857',  '1',  '1478', 'BAROUGHTATION');
                    QUERY PLAN
--------------------------------------------------
 Result (actual time=6.202..6.202 rows=1 loops=1)
 Planning Time: 0.030 ms
 Execution Time: 6.228 ms
 Peak Memory Usage: 8 kB
(4 rows)

Time: 7.391 ms

tpcc=# explain (costs off, analyze)
       execute neword('1', '1', '3', '6', '5');
                     QUERY PLAN
----------------------------------------------------
 Result (actual time=17.522..17.522 rows=1 loops=1)
 Planning Time: 0.037 ms
 Execution Time: 17.552 ms
 Peak Memory Usage: 176 kB
(4 rows)

Time: 19.337 ms

Enter fullscreen mode Exit fullscreen mode

Now, the response time looks good even without rewriting the old code to use modern SQL. You can test all the HammerDB TPC-C use cases with this. Look at the execution plan. Adapt the indexes. And when you have good response times, then you can start to run multiple clients and look at the throughput.

Colocation

There are many ways to optimize and they depend on your context. That's why relying on database defaults for benchmarks is never good. And, often, the database vendor running the benchmark to compare with competitor spends more time on tuning his database than learning how others work.

When running an application that has been designed for a monolithic database, it makes sense to collocate all tables and then see which ones benefit from being distributed to all nodes. You still benefit from being on a Distributed SQL database for high availability. Even when collocated, they are replicated and are always available if a node crashes - even without running a failover or recovery.

I restart my cluster adding the following in yb-tserver.flags:

# use colocated database
--ysql_colocate_database_by_default=true

Enter fullscreen mode Exit fullscreen mode

Running the same, without changing any session parameter gives the following in pg_stat_statements:

 mean_time | min_time | max_time | calls | rows |                                                                    query
-----------+----------+----------+-------+------+---------------------------------------------------------------------------------------------------------------------------------------------
      0.17 |     0.17 |     0.17 |     1 |    1 | select pg_stat_statements_reset()
     37.64 |     6.59 |    63.41 |    68 |   68 | prepare payment (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, NUMERIC, VARCHAR) AS select payment($1,$2,$3,$4,$5,$6,$7,'$8','0',0)
(2 rows)
Enter fullscreen mode Exit fullscreen mode

Here is the same with Batched Nested Loops:

 mean_time | min_time | max_time | calls | rows |                                                                    query
-----------+----------+----------+-------+------+---------------------------------------------------------------------------------------------------------------------------------------------
      0.21 |     0.21 |     0.21 |     1 |    1 | select pg_stat_statements_reset()
     37.03 |    12.06 |   134.66 |    57 |   57 | prepare payment (INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, INTEGER, NUMERIC, VARCHAR) AS select payment($1,$2,$3,$4,$5,$6,$7,'$8','0',0)
Enter fullscreen mode Exit fullscreen mode

The number makes sense when you understand what you run. The non-batched Nested Loop overhead matters when there are cross-shard calls. Now, imagine that colocation was the default... the benchmarks using the defaults would show different numbers. Which one makes sense for a benchmark? It depends. What do you want to test? An application built for monolithic databases, and sharded on application side, like TCP-C with warehouses, can be deployed without cross-shard joins. That's the only possibility in sharded databases like Citus. You can do that on YugabyteDB by partitioning the tables on the Wharehouse ID, and collocate the partitions in tablegroups. For modern applications, you prefer the agility of Distributed SQL where all SQL features are global and may involve cross-shard transactions, and you get the same performance with a single setting enabling Batched Nested Loop, a new feature designed to run those legacy applications.

Only when all this is understood, you can start to scale the benchmark with multiple threads. Throwing number without this analysis is pointless.

To summarize...

I may write more about this HammerDB benchmark if I feel the need to run it on YugabyteDB. However, I recommend to run something that is relevant to your usage of the database. I was motivated to look at this because I've seen a benchmark that compares a sharded database, Citus, with distributed SQL, CockroachDB and YugabyteDB. It is the best example of publishing random numbers, for marketing purposes, without understanding what is run.
They have manually partitioned on Citus, but not for YugabyteDB. You see the difference in elasticity when adding or removing nodes: transparently re-sharded in YugabyteDB, painful re-deployement of tables in Citus. Moreover, they compared the throughput of Citus on Azure (branded as Cosmos DB for PostgreSQL) with High Availability Off (*), to Distributed SQL databases, CockroachDB and YugabyteDB, with Replication Factor 3, resilient to zone failure. You see the difference when there is a failure: downtime with Citus, application continuity with YugabyteDB. And they use the same instance sizes. Obviously, replicas use more disk, RAM and CPU and you need more instances for the same throughput. For Replication Factor 3, you need more ressources, but you don't stop the application for upgrades of failure. And look at the CPU power: the give cores (which are two threads) to Citus but vCPU (one thread, half core) for the others.


(*) The article from Microsoft didn't mention that the run on Citus was without High Availability but this is mentioned in the GigaOM article:
Image description

Top comments (0)