DEV Community

Kasey Speakman
Kasey Speakman

Posted on • Edited on

Event Storage in Postgres, Multi-tenant

I previously wrote about Event Storage in Postgres. One thing I did not address at that time was multi-tenant scenarios. Multiple tenants adds the potential for a lot of data. In particular, indexes can eventually get so large that they start affecting performance. (That's the hope, right? 🤑). The way we dealt with that in our previous product was by isolating the tenant's data within its own schema. However, this causes issues when we want to process events across tenants. For example, to provide rollup reports when multiple tenants belong to the same parent organization. Or to gauge activity on the system right now. When separating tenants by schemas, it is a pain to generate a query which grabs events across all tenants.

Partitioning

One way to have logically separated tenant data, but to have the convenience of a single Event table is to use Postgres's table partitioning. They have added a lot to this feature in recent releases. And in Postgres 11, this feature is especially strong. Here is a version of the Event table partitioned by Tenant ID. It looks almost the same!

Main Table

CREATE TABLE IF NOT EXISTS Event
(
    SequenceNum bigserial NOT NULL,
    TenantId uuid NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    Data jsonb,
    LogDate timestamptz NOT NULL DEFAULT now(),
    CONSTRAINT pk_event_sequencenum PRIMARY KEY (TenantId, SequenceNum),
    CONSTRAINT uk_event_streamid_version UNIQUE (TenantId, StreamId, Version)
) PARTITION BY LIST (TenantId);

Constraints on the main table are required to include the partition key, which is TenantId here.

Tenant Partitions

Unfortunately Postgres does not yet have auto creation of partitions. So before you insert data for a tenant, you first have to create a partition for it. Typically there is a provisioning process when adding a new tenant, so creation of the partition can simply be part of that process. Here is an example of creating a tenant with TenantId = 847ED1889E8B4D238EB49126EBD77A4D.

CREATE TABLE IF NOT EXISTS Event_847ED1889E8B4D238EB49126EBD77A4D
PARTITION OF Event FOR VALUES IN ('847ED1889E8B4D238EB49126EBD77A4D')
;

Under the covers, each partition has its own index. So you don't end up with a single giant index. When you query the data across tenants, Postgres can run queries on the partitions in parallel, then aggregate them.

Inserting events

You insert data just like you would insert it to a single table, and Postgres automatically routes it to the appropriate partition.

INSERT
  INTO Event
     ( TenantId
     , StreamId
     , Version
     , Type
     , Meta
     , Data
     )
VALUES
     ( '847ED1889E8B4D238EB49126EBD77A4D'
     , 'A88F94DB6E7A439E9861485F63CC8A13'
     , 1
     , 'EmptyEvent'
     , '{}'
     , NULL
     )
;

Query by sequence

To support reading events across tenant in order of occurrence, you can run a query like this.

SELECT *
  FROM Event
 WHERE SequenceNum > 0
 ORDER
    BY SequenceNum
 LIMIT 1000
;

This query supports reading batches of up to 1000. I avoided using OFFSET since it is inefficient once the offset value gets large. And each listener usually keeps track of the last SequenceNum that it processed anyway.

I could have added another condition to the WHERE clause like AND SequenceNum <= 1000 instead of using LIMIT. But there could be skipped sequence numbers due to concurrency (see below). Although this is a minor point.

Query by stream

Query by stream is the same as before except we now also need to provide the TenantId.

SELECT *
  FROM Event
 WHERE TenantId = '847ED1889E8B4D238EB49126EBD77A4D'
   AND StreamId = 'A88F94DB6E7A439E9861485F63CC8A13'
 ORDER
    BY Version
;

Other Goodies

Notification of events

You can trigger Postgres notifications whenever an event is added to any partition of the Event table. Here is what I use for that currently.

DROP TRIGGER IF EXISTS trg_EventRecorded ON Event;
DROP FUNCTION IF EXISTS NotifyEvent();

CREATE FUNCTION NotifyEvent() RETURNS trigger AS $$

    DECLARE
        payload text;

    BEGIN
        -- { sequencenum }/{ tenant id }/{ stream id }/{ version }/{ event type }
        SELECT CONCAT_WS( '/'
                        , NEW.SequenceNum
                        , REPLACE(CAST(NEW.TenantId AS text), '-', '')
                        , REPLACE(CAST(NEW.StreamId AS text), '-', '')
                        , NEW.Version
                        , NEW.Type
                        )
          INTO payload
        ;

        PERFORM pg_notify('eventrecorded', payload);

        RETURN NULL;

    END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_EventRecorded
    AFTER INSERT ON Event
    FOR EACH ROW
    EXECUTE PROCEDURE NotifyEvent()
;

When an event is inserted, it will trigger a notification to the channel eventrecorded. Here is an example payload.

# sequence num
#  / tenant id
#                                   / stream id
#                                                                    / version
#                                                                      / type
  2/847ed1889e8b4d238eb49126ebd77a4d/a88f94db6e7a439e9861485f63cc8a13/2/EmptyEvent

This payload format is very much inspired by MQTT topic paths.

The event itself might be too large to fit in the notification payload. So instead, I give enough data about the event for the listener to know if they want to go to the trouble of loading it. Usually listeners only care about the type of event and the sequence number. Then they typically load batches of events starting from the last sequence number they processed. So the middle bits might be a YAGNI violation. But it felt right to include in case a listener wants to load specific events instead of batching.

To listen for event notifications, the SQL part is simple:

LISTEN eventrecorded;

But the code part will vary depending on your language. Personally, the coding pattern I need to use with the Npgsql library feels a bit painful. I think it has more to do with the database details leaking through the library's abstractions. I ended up with a very imperative producer/consumer queue. It was not a great look in F# after all the error handling was added. But it'll do.

Concurrent Events

The unique constraint we added to the Event table, called uk_event_streamid_version, will actually catch concurrency violations for us via Optimistic Concurrency.

For example, let's say a user tries to perform an operation on a stream. The logic loads the stream and it has 4 events. The last event's Version is 4. We run our code and decide, based on the current state of this stream, we should generate 2 new events. Since the last Version was 4, those events should have Version = 5 and Version = 6 respectively. Then we insert those events to Event table.

Simultaneously, another user tries to perform a different operation on the same stream. This request is being processed on another thread (or another computer) and is unaware of the first user's request. It reads the same 4 events, and decides to generate 1 new event with Version = 5. However, the other user's operation committed their DB transaction just before ours. So when our code tries to save the event with Version = 5, it will fail. Postgres will trigger a unique index violation.

As long as we appropriately calculate the expected Version for each new event before saving, the unique constraint will prevent concurrency conflicts.

Sequence Gap

When there is a concurrency violation, it will create a gap in SequenceNum. That's just the way auto increment sequences work in Postgres. Even if the transaction is aborted, the sequence is still incremented. Otherwise, managing rollbacks for sequences would complicate inserts and slow down performance. Don't obsess over sequence gaps -- it is only for ordering.

⚠️ Concurrent Writers

As pointed out in comments below (Thanks Alexander Langer!), this implementation is suitable for a "single writer". But due to the way Postgres sequences works, multiple concurrent writers can introduce a subtle bug: events can sometimes be committed in a different order from the SequenceNum order. Since listeners depend on loading events in SequenceNum order, this can cause listeners to miss events.

See below for an implementation which supports concurrent writers. Consequently, it will also generate gapless sequence numbers. It will likely have an impact on insert performance, but I haven't measured.

Data Management

Tenant data is now also pretty easy to individually remove or backup since it is actually in its own separate table. For example assuming that TenantId = 847ED1889E8B4D238EB49126EBD77A4D requested that we remove their data, it is pretty easy to get rid of it.

-- DANGER DANGER
DROP TABLE Event_847ED1889E8B4D238EB49126EBD77A4D CASCADE;

Assuming you have some sort of rolling backup in place, eventually this will roll off of backups into oblivion too.

Conclusion

This method of partitioning the event streams by tenant can help performance as the dataset grows large. At some point we may have to go a step further and partition our data onto separate database nodes. This basic table structure also seems suited for node-based partitioning using something like Citus Data (recently acquired by Microsoft). But the above method should cover that awkward first scaling hurdle when one overarching index becomes a performance bottleneck. And it isn't much more effort than a single table!

/∞

Top comments (10)

Collapse
 
lngr profile image
Alexander Langer

The way you are writing events (multiple concurrent writers, as mentioned in your previous post) combined with the way you are reading events ("And each listener usually keeps track of the last SequenceNum that it processed anyway") means you'll loose data in your listeners.

Collapse
 
kspeakman profile image
Kasey Speakman • Edited

I can see why you would think that. But I'm not using notifications in the way that most event stores use them -- to push events to listeners. For one thing, Postgres NOTIFY payloads are too small for that to work. For another I don't like the push model for listeners; I prefer a pull model.

Using these notifications, you can setup the listener either way -- to process all events, or to only listen for the latest happenings.

To process all events, the listener uses notifications simply to be informed of a new event and it's position (SequenceNum). If the sequence number is higher than its last processed one, it will fetch all new events starting from it's last processed SequenceNum. The listener tracks its own last processed position and loads events sequentially. It doesn't depend on notifications for tracking. No events are missed.

But there are cases where the listener doesn't care about processing every event, only the most recent. In that case, the listener loads the specific event referenced in the notification. Some events will be missed (e.g. listener node failure), but this kind of listener isn't concerned about that.

P.S. As far as notifications and concurrency. With Postgres NOTIFY, only committed events trigger notifications. They are never triggered when the transaction aborts, such as when there is a unique key constraint (concurrency) violation.

Collapse
 
lngr profile image
Alexander Langer • Edited

Yes, I understood that. The error is in here:

To process all events, the listener uses notifications simply to be informed of a new event and it's position (SequenceNum). If the sequence number is higher than its last processed one, it will fetch all new events starting from it's last processed SequenceNum.

Consider two transactions T1 and T2:

T1 starts, obtains next sequence number N from Postgres.
T1 stalls for whatever reason.
T2 starts, obtains next sequence number N+1 from Postgres.
T2 commits, sends notification.
Listener sees N+1 is higher than $last, retrieves all events since $last including N+1, sets $last := N+1.
T1 revives, commits, sends notification.
Listener sees N, not higher than $last (= N+1), ignores.

=> missed event.

(AFAIK the only way to avoid this with this architecture is to have a single writer per partition...).

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

Ah, I see. Excellent point.

The case of "T1 stalls for whatever reason." is certainly possible, but unlikely for us so far. Aside from not yet reaching the IOPS to make this likely, transactions are literally BEGIN ... INSERT ... INSERT ... COMMIT, all sent as a single (maybe even prepared) statement. Although I'm not an expert in what effects auto-vaccuming pauses can have.

If I run into this problem, I will have to give this more of a think. A temporary workaround that would work for us is to rebuild read models if this happens.

Anyway, I'm glad you mentioned this. Thank you!

Thread Thread
 
lngr profile image
Alexander Langer

Yes, beside from that issue, Postgres is a great choice and we are using a similar setup as the one you are describing here. We are currently overcoming this by having a single threaded writer, which is still fast enough given that transactions are really fast for us, as you mentioned, and the single writer is certainly not the bottleneck :)

Keep in mind, however, that even if it's very, very unlikely, given that you might store tens of millions of events per year (which is not uncommon even for a moderate business software application with a couple of users), even a 0.0001% chance per event means you are likely to encounter that race condition about 10,000,000 * 0.0001% = 10 times a year. From my own negative experience, I can tell you that a) this will happen, unfortunately, and b) this is one of those bugs that are very hard to find :-)

Anyways, keep up the good work, this is a very nice usage of the notify feature in postgres I hadn't considered before.

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

I really appreciate your feedback!

On quick search, I found this article on gapless sequences. Using what was described there would also enforce a totally ordered sequence number. This will have a performance impact, not sure how much. Perhaps a single writer is a better solution overall for now.

I am concerned that business logic (command processing) will have to scale compute resources. And currently I planned to colocate the writer with the command processing, hence the multiple concurrent writers as command processing scales. I guess good problems to have and figure out when I get there. But I don't like the idea of the single writer being its own separate service.

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

Here is a quick sketch of what the solution would look like that supported multiple concurrent writers. Using the strategy from that link. This is accomplished by making sequence number generation part of the parent transaction. I even added some rules to make the Event table append-only and the sequence counter update only to prevent maintenance mistakes. Also inspired by that link.

Here, I also renamed SequenceNum to Position.

--DROP TABLE IF EXISTS PositionCounter CASCADE;
--DROP FUNCTION IF EXISTS NextPosition();
--DROP TABLE IF EXISTS Event CASCADE;
--DROP TRIGGER IF EXISTS trg_EventRecorded ON Event;
--DROP FUNCTION IF EXISTS NotifyEvent();

-- transactional sequence number
CREATE TABLE IF NOT EXISTS PositionCounter
(
    Position bigint NOT NULL
);

INSERT INTO PositionCounter VALUES (0);

-- prevent removal / additional rows
CREATE RULE rule_positioncounter_noinsert AS 
ON INSERT TO PositionCounter DO INSTEAD NOTHING;
CREATE RULE rule_positioncounter_nodelete AS 
ON DELETE TO PositionCounter DO INSTEAD NOTHING;

-- function to get next sequence number
CREATE FUNCTION NextPosition() RETURNS bigint AS $$
    DECLARE
        nextPos bigint;
    BEGIN
        UPDATE PositionCounter
           SET Position = Position + 1
        ;
        SELECT INTO nextPos Position FROM PositionCounter;
        RETURN nextPos;
    END;
$$ LANGUAGE plpgsql;

-- event table
CREATE TABLE IF NOT EXISTS Event
(
    Position bigint NOT NULL,
    TenantId uuid NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    Data jsonb,
    LogDate timestamptz NOT NULL DEFAULT now(),
    CONSTRAINT pk_event_position PRIMARY KEY (TenantId, Position),
    CONSTRAINT uk_event_streamid_version UNIQUE (TenantId, StreamId, Version)
) PARTITION BY LIST (TenantId);

-- Append only
CREATE RULE rule_event_nodelete AS 
ON DELETE TO Event DO INSTEAD NOTHING;
CREATE RULE rule_event_noupdate AS 
ON UPDATE TO Event DO INSTEAD NOTHING;

-- notification
CREATE FUNCTION NotifyEvent() RETURNS trigger AS $$

    DECLARE
        payload text;

    BEGIN
        -- { position }/{ tenantId }/{ streamId }/{ version }/{ event type }
        SELECT CONCAT_WS( '/'
                        , NEW.Position
                        , NEW.TenantId
                        , REPLACE(CAST(NEW.StreamId AS text), '-', '')
                        , NEW.Version
                        , NEW.Type
                        )
          INTO payload
        ;

        -- using lower case channel name or else LISTEN would require quoted identifier.
        PERFORM pg_notify('eventrecorded', payload);

        RETURN NULL;
    END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_EventRecorded
    AFTER INSERT ON Event
    FOR EACH ROW
    EXECUTE PROCEDURE NotifyEvent()
;
Enter fullscreen mode Exit fullscreen mode

And here's how to append an event with the transactional sequence number.

-- inside a transaction
INSERT
  INTO Event
     ( Position
     , TenantId
     , StreamId
     , Version
     , Type
     , Meta
     , Data
     )
VALUES
     ( NextPosition()
     , '3791B53677C840FC81CA65BFDAF34F1F'
     , 'A88F94DB6E7A439E9861485F63CC8A13'
     , 1
     , 'EmptyEvent'
     , '{}'
     , NULL
     )
;
Enter fullscreen mode Exit fullscreen mode
Thread Thread
 
ronlobo profile image
Ron Gonzalez Lobo

Thanks for putting this together, great read!

Working on porting this to YugaByteDB.

Thread Thread
 
skyjur profile image
Ski

Would it not be simpler instead to use explicit table lock when inserting events instead of PositionCounter? It limits performance but I believe result is same as PositionCounter table because it too needs a row level lock

begin;
lock table event;
insert into event ...
commit
Thread Thread
 
kspeakman profile image
Kasey Speakman

Simpler? It would be less to setup vs a Position table. In usage it's adding a lock statement vs calling NextPosition() in the insert.

I suppose you will just have to test for the performance difference between the two.