DEV Community

Franck Pachot
Franck Pachot

Posted on • Updated on

Most recent record (of many items) with YugabyteDB πŸš€

I like when a blog post covers all aspects of a problem and show alternatives. Ryan Booz from TimescaleDB detailed clearly how to Select the most recent record (of many items) with PostgreSQL in PostgreSQL and Timescale, with different solutions and their performance considerations.

YugabyteDB has the same query layer (PostgreSQL), but different implementation with tables and indexes sharded and stored as LSM trees. So let's look at the performance configuration here. Better read Ryan's blog post first, it has great explanations.

Tables "trucks" and "trucks_readings"

The example, from the original blog post, stores trucks (identified by truck_id) information and their readings (like milage) per timestamp (ts).

Here are my tables:

drop table truck_readings;
drop table trucks;

create table trucks (
 truck_id bigint
 ,primary key(truck_id)
);

create table truck_readings (
 truck_id bigint --references trucks
 ,ts timestamptz
 ,milage int
 ,primary key(truck_id hash, ts desc)
) partition by range(ts);
Enter fullscreen mode Exit fullscreen mode

You have the choice to define the referential integrity constraint, to guarantee consistency. But you may also disable it for faster ingest.

The primary key of truck_readings uses the default (HASH,DESC) to distribute the truck_id for scalability and cluster the readings on time (ts) for efficient range scans. We will often query the most recent ones, so defining DESC avoids an Index Scan Backward for those queries.

When storing timeseries, it makes sense to have the table partitioned by range of date, in order to purge old data. I'll create three partitions for my example:

create table truck_readings_202201 partition of truck_readings
 for values from ('2022-01-01') to ('2022-02-01');
create table truck_readings_202202 partition of truck_readings
 for values from ('2022-02-01') to ('2022-03-01');
create table truck_readings_202203 partition of truck_readings
 for values from ('2022-03-01') to ('2022-04-01');
Enter fullscreen mode Exit fullscreen mode

I add some rows, for 2000 trucks with 1000 readings each

insert into trucks select generate_series(1,2000);
insert into truck_readings(truck_id, ts, milage)
select truck_id, now() - interval '1 minute' * generate_series(1,1000), 42 from trucks;
analyze truck_readings;
analyze trucks;
Enter fullscreen mode Exit fullscreen mode

I don't need more data because I'll assess the performance from the execution plan. Understanding is better than measuring. Then it is easier to verify with more rows by changinf the generate_series bounds.

Most recent record (of one item)

Without any additional index, a query for one truck is optimal:

yugabyte=# explain analyze SELECT * FROM truck_readings
           WHERE truck_id = 1234
           ORDER BY ts DESC LIMIT 1;
                                                                               QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=0.02..0.15 rows=1 width=16) (actual time=1.614..1.616 rows=1 loops=1)
   ->  Merge Append  (cost=0.02..23780.03 rows=189500 width=16) (actual time=1.614..1.614 rows=1 loops=1)
         Sort Key: truck_readings_202201.ts DESC
         ->  Index Scan using truck_readings_202201_pkey on truck_readings_202201  (cost=0.00..15.25 rows=100 width=16) (actual time=0.579..0.579 rows=0 loops=1)
               Index Cond: (truck_id = 1234)
         ->  Index Scan using truck_readings_202202_pkey on truck_readings_202202  (cost=0.00..21300.25 rows=189300 width=16) (actual time=0.572..0.572 rows=1 loops=1)
               Index Cond: (truck_id = 1234)
         ->  Index Scan using truck_readings_202203_pkey on truck_readings_202203  (cost=0.00..15.25 rows=100 width=16) (actual time=0.461..0.461 rows=0 loops=1)
               Index Cond: (truck_id = 1234)
 Planning Time: 0.115 ms
 Execution Time: 1.666 ms
(11 rows)
Enter fullscreen mode Exit fullscreen mode

Because YugabyteDB tables are stored in the index structure, the Index Scan on the primary key is similar to an Index Only Scan even when not showing it (more about it here). This basically goes to one tablet (from the hash code of the truck_id) and seeks to the truck_id to read the first row. This is the best we can do within one partition.

However, the table is partitioned for maintenance reason. Reading from many partitions is not a big deal here because we read only one row at most from it. But it would be better with a global index. There are no global index with PostgreSQL declarative partitioning, all partitions must be accessed for this query. We may add global indexes in YugabyteDB at some point (I'm testing this on YugabyteDB 2.11.2), and you can open a git issue if you think you need it, but I'll show that we probably have a better solution here.

Most recent record (of many item)

PostgreSQL has a SQL syntax to get the last record for each truck:

explain analyze
 SELECT DISTINCT ON (truck_id) * FROM truck_readings
 ORDER BY truck_id, ts DESC;
Enter fullscreen mode Exit fullscreen mode

But, in addition to the lack of global index, there is no Index Scan to get it:

yugabyte=# explain analyze
 SELECT DISTINCT ON (truck_id) * FROM truck_readings
 ORDER BY truck_id, ts DESC;
                                                                      QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------
 Unique  (cost=461335.44..470810.44 rows=200 width=16) (actual time=8639.871..9047.342 rows=2000 loops=1)
   ->  Sort  (cost=461335.44..466072.94 rows=1895000 width=16) (actual time=8639.870..8874.270 rows=2000000 loops=1)
         Sort Key: truck_readings_202201.truck_id, truck_readings_202201.ts DESC
         Sort Method: external merge  Disk: 58720kB
         ->  Append  (cost=0.00..198975.00 rows=1895000 width=16) (actual time=5.411..7758.188 rows=2000000 loops=1)
               ->  Seq Scan on truck_readings_202201  (cost=0.00..100.00 rows=1000 width=16) (actual time=1.364..1.364 rows=0 loops=1)
               ->  Seq Scan on truck_readings_202202  (cost=0.00..189300.00 rows=1893000 width=16) (actual time=4.046..7517.417 rows=2000000 loops=1)
               ->  Seq Scan on truck_readings_202203  (cost=0.00..100.00 rows=1000 width=16) (actual time=1.124..1.125 rows=0 loops=1)
 Planning Time: 1.073 ms
 Execution Time: 9048.912 ms
(10 rows)
Enter fullscreen mode Exit fullscreen mode

Again, this could be optimized to access multiple ranges of the indexes. We have similar optimization with the Hybrid Scan (example here), and you can open a git issue if you need it here. But, once again, I don't think we need it in this case.

Driving the nested loop

In Ryan's blog, multiple options are exposed.

The "Option 1: Naive GROUP BY" is not very good as it scans all partitions:

yugabyte=# explain analyze
yugabyte-#            SELECT truck_id, max(ts) FROM truck_readings
yugabyte-#            GROUP BY truck_id;
                                                                   QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=208450.00..208452.00 rows=200 width=12) (actual time=8792.829..8793.280 rows=2000 loops=1)
   Group Key: truck_readings_202201.truck_id
   ->  Append  (cost=0.00..198975.00 rows=1895000 width=12) (actual time=6.569..8382.785 rows=2000000 loops=1)
         ->  Seq Scan on truck_readings_202201  (cost=0.00..100.00 rows=1000 width=12) (actual time=2.141..2.142 rows=0 loops=1)
         ->  Seq Scan on truck_readings_202202  (cost=0.00..189300.00 rows=1893000 width=12) (actual time=4.427..8158.258 rows=2000000 loops=1)
         ->  Seq Scan on truck_readings_202203  (cost=0.00..100.00 rows=1000 width=12) (actual time=1.055..1.055 rows=0 loops=1)
 Planning Time: 0.774 ms
 Execution Time: 8793.759 ms
(8 rows)
Enter fullscreen mode Exit fullscreen mode

The "Option 2: LATERAL JOIN" is a good way to drive the access pattern, inlining the query from "Most recent record (of one item)" and running it in a nested loop from trucks:

yugabyte=# explain analyze SELECT * FROM trucks t
           INNER JOIN LATERAL (
            SELECT * FROM truck_readings
            WHERE truck_id = t.truck_id
            ORDER BY ts DESC
            LIMIT 1
           ) l ON TRUE
           ORDER BY t.truck_id DESC;
                                                                                      QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort  (cost=648.18..653.18 rows=2000 width=24) (actual time=2009.534..2009.810 rows=2000 loops=1)
   Sort Key: t.truck_id DESC
   Sort Method: quicksort  Memory: 205kB
   ->  Nested Loop  (cost=0.02..538.53 rows=2000 width=24) (actual time=4.359..2007.192 rows=2000 loops=1)
         ->  Seq Scan on trucks t  (cost=0.00..200.00 rows=2000 width=4) (actual time=2.531..5.518 rows=2000 loops=1)
         ->  Limit  (cost=0.02..0.15 rows=1 width=16) (actual time=0.998..0.999 rows=1 loops=2000)
               ->  Merge Append  (cost=0.02..23780.03 rows=189500 width=16) (actual time=0.998..0.998 rows=1 loops=2000)
                     Sort Key: truck_readings_202201.ts DESC
                     ->  Index Scan using truck_readings_202201_pkey on truck_readings_202201  (cost=0.00..15.25 rows=100 width=16) (actual time=0.300..0.300 rows=0 loops=2000)
                           Index Cond: (truck_id = t.truck_id)
                     ->  Index Scan using truck_readings_202202_pkey on truck_readings_202202  (cost=0.00..21300.25 rows=189300 width=16) (actual time=0.348..0.348 rows=1 loops=2000)
                           Index Cond: (truck_id = t.truck_id)
                     ->  Index Scan using truck_readings_202203_pkey on truck_readings_202203  (cost=0.00..15.25 rows=100 width=16) (actual time=0.313..0.313 rows=0 loops=2000)
                           Index Cond: (truck_id = t.truck_id)
 Planning Time: 0.150 ms
 Execution Time: 2038.525 ms
(16 rows)
Enter fullscreen mode Exit fullscreen mode

The "Option 3: TimescaleDB SkipScan" will work on YugabyteDB if we extend the Hybrid Scan optimizations to this case.

The "Option 4: Loose Index Scan" will do many full scans, except when having an index on time such as this one:

create index truck_readings_by_time 
on truck_readings (ts desc,truck_id);
Enter fullscreen mode Exit fullscreen mode

and maintaining such an index is probably not a good idea.

The readability of such a query is not very nice either:

yugabyte=# explain
WITH RECURSIVE t AS (
   SELECT min(ts) AS ts FROM truck_readings
   UNION ALL
   SELECT (SELECT min(ts) FROM truck_readings WHERE ts > t.ts)
   FROM t WHERE t.ts IS NOT NULL
   )
SELECT ts FROM t WHERE ts IS NOT NULL
UNION ALL
SELECT null WHERE EXISTS(SELECT 1 FROM truck_readings WHERE ts IS NULL);
                                                                              QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Append  (cost=9.32..12.87 rows=101 width=8)
   CTE t
     ->  Recursive Union  (cost=0.16..9.32 rows=101 width=8)
           ->  Result  (cost=0.16..0.17 rows=1 width=8)
                 InitPlan 3 (returns $1)
                   ->  Limit  (cost=0.02..0.16 rows=1 width=8)
                         ->  Merge Append  (cost=0.02..410.80 rows=3000 width=8)
                               Sort Key: truck_readings_202201.ts
                               ->  Index Only Scan Backward using truck_readings_202201_ts_truck_id_idx on truck_readings_202201  (cost=0.00..124.00 rows=1000 width=8)
                                     Index Cond: (ts IS NOT NULL)
                               ->  Index Only Scan Backward using truck_readings_202202_ts_truck_id_idx on truck_readings_202202  (cost=0.00..124.00 rows=1000 width=8)
                                     Index Cond: (ts IS NOT NULL)
                               ->  Index Only Scan Backward using truck_readings_202203_ts_truck_id_idx on truck_readings_202203  (cost=0.00..124.00 rows=1000 width=8)
                                     Index Cond: (ts IS NOT NULL)
           ->  WorkTable Scan on t t_1  (cost=0.00..0.71 rows=10 width=8)
                 Filter: (ts IS NOT NULL)
   ->  CTE Scan on t  (cost=0.00..2.02 rows=100 width=8)
         Filter: (ts IS NOT NULL)
   ->  Result  (cost=0.00..0.01 rows=1 width=8)
         One-Time Filter: $5
         InitPlan 5 (returns $5)
           ->  Result  (cost=0.00..0.00 rows=0 width=0)
                 One-Time Filter: false
(23 rows)
Enter fullscreen mode Exit fullscreen mode

The WHERE ts > t.ts is not pushed down to the Index Scan, so this is not better than the one with no index. Anyway, it was mentioned to show how the PostgreSQL Wiki demonstrates an alternative to MySQL Loose IndexScan.

This is not a good idea here and we have a much better solution with a trigger.
This is the "Option 5: Logging table and trigger" in Ryan's blog post. I've implemented it differently on the following points:

  • no FILLFACTOR in YugabyteDB because we don't use heap tables but store the rows in the primary key structure (LSM tree)
  • using an UPDATE ... RETURNING rather than an INSERT ... ON CONFLICT because I find it more readable as it is the most common scenario (existing truck). But both have the same performance, using a read + write operation to the distributed storage. I've put the ON CONFLICT solution in comments if you prefer it.
  • declaring an AFTER INSERT trigger because Partitioned tables cannot have BEFORE / FOR EACH ROW triggers. I guess that TimescaleDB creates the trigger on each partition to be able to do that.

So let's detail this solution which is, in my opinion, the best one for a distributed SQL database (which supports triggers and stored procedures of course).

A trigger as a purpose-built index

An index is like a table that contains redundant data, stored with a different physical organization, and maintained automatically. So that some access patterns are faster from the index, or at least their filtering part. When you don't have the possibility to define the right index, you can still declare triggers to maintain a similar structure.

Here, what I need is the last reading time for each truck. This is something easy to add, and maintain, into the truck table.

Step 1 - I add a column for it:

alter table trucks add column last_reading timestamptz;
Enter fullscreen mode Exit fullscreen mode

This is instantaneous as it is only a metadata change.

Step 2 - I create a function to update it:

create or replace function update_last_reading()
  returns trigger language plpgsql as
$trigger$
declare
 updated boolean;
begin
  -- solution with UPDATE...RETURNING:
  update trucks set last_reading=new.ts where trucks.truck_id=new.truck_id returning true into updated;
  if updated is null then insert into trucks(truck_id, last_reading) values (new.truck_id, new.ts); end if;
  -- solution with INSERT ... ON CONFLICT
  -- insert into trucks(truck_id, last_reading) values (new.truck_id, new.ts);
  --  on conflict (truck_id) do update set last_reading=new.ts;
  return new;
end
$trigger$;
Enter fullscreen mode Exit fullscreen mode

Because I've left the possibility to not declare the referential integrity, I implemented an 'upsert' logic to add the row in "trucks" if not already present.

Step 3 - I call this function from a trigger:

create trigger update_last_reading
  after insert or update on truck_readings
  for each row execute procedure update_last_reading();
Enter fullscreen mode Exit fullscreen mode

Step 4 - test it:

Nothing else to code. Now, each new reading inserted into "truck_readings" will update "trucks"."last_reading":

insert into truck_readings values(1234,now(),42);
insert into truck_readings values(4321,now(),42);
Enter fullscreen mode Exit fullscreen mode

I've inserted one reading for an existing truck_id and one for a non-existing one it order to validate both situations.

Here are they:

yugabyte=# select * from trucks
           order by last_reading desc nulls last limit 3;

 truck_id |         last_reading
----------+-------------------------------
     4321 | 2022-02-09 15:35:56.508575+00
     1234 | 2022-02-09 15:35:55.530103+00
     1361 |
(3 rows)
Enter fullscreen mode Exit fullscreen mode

This requires little more code than with an index but has the advantage to build the most efficient structures. Updating the table is the same as updating an index entry. And reading it is a simple scan of a small table.

We can ever reduce the read cost, with a little overhead in the insert, with a specific index on it:

create index trucks_last_reading 
on trucks (last_reading desc, truck_id);
Enter fullscreen mode Exit fullscreen mode

Heres the most efficient range scan to get this info on the latest readings:

yugabyte=# explain analyze select * from trucks
           where last_reading > now() - interval '1 month'
           order by last_reading;
                                                                  QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------
 Index Only Scan Backward using trucks_last_reading on trucks  (cost=0.00..29.50 rows=200 width=12) (actual time=1.060..22.984 rows=2 loops=1)
   Index Cond: (last_reading > (now() - '1 mon'::interval))
   Heap Fetches: 0
 Planning Time: 0.062 ms
 Execution Time: 23.024 ms
(5 rows)
Enter fullscreen mode Exit fullscreen mode

Thanks to the SQL trigger, I have the equivalent of a global index, with full control of the access patterns. The only difference is that I have to query this structure. There is no transparent rewrite when querying the truck_readings. But that should not be a problem when you write your application

Race condition

I'm adding this point following a comment after publication. This logic, updating a single record per truck, adds a serialization point per truck. The same truck adding a new reading at the exact same time will have one either waiting (pessimistic locking) or raising a transaction conflict (serialization error). The latter is probably the best one, and should be handled by the application logic: ignore (as they are probably duplicates, from a business point of view) or retry. Note that the trigger with insert-when-no data-found may also raise a similar situation. This is a major difference with an index that would have one entry for each reading. The benefit of having one row per truck has its consequences in concurrency. I didn't mention it initially because of the use-case (the same truck sending a different reading at the same time) should probably be processed as an error condition here.

Summary: Triggers and Stored Procedures πŸ’—

This is an example of useful usage of triggers and stored procedures in a SQL database. This is not business logic, but only data logic to optimize storage and access. And the beauty of it is that, once defined, and tested, it just works. ACID properties guarantee the correctness even in case of failures. And it is efficient, without the need for a secondary index on the large table, or for event sourcing to push data elsewhere. SQL is awesome, but you need to understand it (EXPLAIN helps).

Top comments (0)