loading...

Event Storage in Postgres

kspeakman profile image Kasey Speakman Updated on ・8 min read

One of the critical pieces of any infrastructure is storage. Compared to traditional relational models, storing events in a log is quite simple. However, when you experience the good fortune of a successful product, even log-style storage has to evolve to keep up.

Naive Implementation

When I started using Event Sourcing, I wanted to go as simple as possible. To be quite honest, I could not wrap my head around many of the trappings of common event sourcing database implementations. So I started with a simple table.

CREATE TABLE Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum)
);

CREATE INDEX idx_event_streamid ON Event (StreamId);

I used a serialized writer. (Translation: all requests were queued and processed one at a time, no concurrency.) I used full consistency. (Translation: saving events and updating relational/document models were performed in the same transaction in the same database.)

This configuration worked quite well for us, has good performance (even on t2.micro instances), and is relatively simple to understand. If this were an internal system, I would likely stop there. But for a multi-tenant app, at some point this is going to hit a scaling brick wall.

💡 Aside - event sourcing rocks
When we change our relational models, our "data migration" consists of dropping the affected tables, recreating them with the updated schema, and replaying events back onto them. The process happens automatically during deployment. We never have to write migration scripts. <3

Growing the implementation

The next revision of event storage includes some optimizations to support further scaling capabilities. Most of this was described 7 years ago in Building an Event Storage. However, at the time I did not fully understand some of the nuances of the description. Rather than shoot myself in the foot with an unfamiliar weapon, I chose to implement the naive version first. Now I'm getting around to the optimizations.

The Event table

Let's start with the event table itself. Even though this updated table is almost identical, I'll go ahead and explain the purpose of each field and index.

CREATE TABLE IF NOT EXISTS Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum),
    UNIQUE (StreamId, Version),
    FOREIGN KEY (StreamId)
        REFERENCES Stream (StreamId)
);

Fields

The only two absolutely required fields are StreamId and Data. Everything else supports additional features.

SequenceNum
This is used by event listeners to keep track of their current position in the stream. The only thing that really matters here is to preserve the order in which events were saved, which an auto-increment big integer does nicely.
StreamId
An identifier for a stream. I chose uuid but there are many viable choices for identity.
Version
This field was not present in the previous incarnation, but was added as an optimization. If you think of the stream as an array, this is the index number of the event. Saving it with the event helps avoid manual counting.
Data
This is the serialized data for the event. I chose jsonb format, but there are other options. If I had more resources to develop my own introspection tools, I would probably use a binary serialization like Protocol Buffers for speed.
Type
This is the type of the event. Storing this supports filtering... to avoid fetch / deserialization if the listener does not use the event. I also use it for deserialization.
Meta
This supports auditing and tracing. I put things in here like user, execute permission, correlation id, etc.
LogDate
In addition to auditing and tracing, this could also be used for cross-shard sorting. Although it is tempting to use LogDate in business logic, it should only be used for infrastructural purposes. Events should contain their own timestamps if temporal context is needed. But I admit I have used LogDate for reports. :S

Indexes / Keys

SequenceNum Primary Key
Allows efficient queries based on position. i.e. WHERE SequenceNum > @LastSeenSeqNum
StreamId, Version Unique Key
Allows efficient loading of a specific stream in Version order. Postgres can use this index to process ORDER BY Version without an extra sort step.
StreamId Foreign Key
This is primarily "training wheels" to ensure data integrity in the face of bugs. Once the code is battle-proven, this could be removed to increase performance.

The Stream table

One of the features we wanted to add to the event storage is support for concurrent writers. Concurrent writers means: I can deploy multiple copies of my business services (those things which generate events, aka command handlers) without requiring locks or coordination between them. We could even run these on "serverless" architectures like Lambda to auto-scale compute resources on demand. This is accomplished through the use of Optimistic Concurrency. You can think of this like a merge conflict in git -- two different branches independently made changes to the same line of code. In this case, two independent users tried to make changes to the same entity at the same time. This is the table we use to detect that.

CREATE TABLE IF NOT EXISTS Stream
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    PRIMARY KEY (StreamId)
);

CREATE INDEX IF NOT EXISTS idx_stream_type ON Stream (Type);

At its core, the table simply tracks the current version of each stream. Here is the specific process to follow using this table.

  • Before executing business logic, get the current Version of the stream.
  • Execute the business logic, generating new events.
  • Save the events (in a transaction) ONLY IF
    • the Version from the Stream table is the same as Version when you started
    • then also update the Stream table to the last saved Version
    • indicate whether the events were saved. e.g. return true/false

This ensures that writers know when they tried to save conflicting events, and they can either return an error, retry, or some other conflict resolution process as defined by the business rules. We will probably just error by default until there is reason to do otherwise.

Concurrent services vs concurrent users

At a minimum, we need to know that stream hasn't changed from the time we started processing business logic until the time we tried to save the events. Most of the time the stream is loaded before processing anyway -- to rebuild the current state of the domain. So we can use the Version from the last loaded event as the expected version. Then verify it is still the same when we save new events. This will detect conflicts between concurrent services writing to the same stream.

However, we can take this a step further. The client can send us the version of the data they were working with when they submitted the request. Using this as the expected Version when saving events not only detects write conflicts between services, but also detects when a user's changes will accidentally overwrite another user's.

What's this Type then?

The Type is the type of Stream. For example, the Stream Type may be "Ordering" while one of the stream's Events may have a Type "OrderPlaced". It is entirely optional, but it can be used to help generate snapshots (discussed below). It can also support filtering events down to streams of a certain type. The index on Type should have no write-performance impact, since the only updates to the table are on Version.

The Snapshot table

When a stream gets very large (maybe >1000 events?), loading and replaying the stream to get current state might become too slow. The common mitigation to that is using snapshots. Rather than rebuilding the domain model from scratch every time, once in a while we rebuild up to the latest version of the domain model's state and save that to the database. Afterward, to load the stream we first get the snapshot and then only the events since the snapshot version. Here is the table to support that.

CREATE TABLE IF NOT EXISTS Snapshot
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Revision int NOT NULL,
    PRIMARY KEY (StreamId)
);

Application code should create a snapshot when a stream grows "too large" (something that should be determined by metrics like request time). The snapshot should also be recreated from time to time (after 1000 more events) and when structural changes are made to the snapshot -- see Revision below.

What is Revision?

Domain models change over time. Sometimes the changes are such that the last snapshot won't deserialize back into the new domain model. A relatively simple way to avoid that situation is to keep a Revision constant somewhere on your domain code, and increment it when you make structural changes. When loading the snapshot, the query can make sure to ignore it if the Revision is incompatible.

⚠️ I hate adding easily-forgettable steps to the development process -- such as bumping a Revision number on structural changes. If you have an idea for a better way to handle this, I would love to hear it!

How do snapshots get saved?

Probably the best way is to have a separate Snapshot service which runs on a schedule. It can use the Stream table to identify streams with a large number of events. It can also check the existing snapshot versions to find ones that need updating. The query to accomplish both of these checks at once is described in Building an Event Storage. Additionally, Snapshot revisions can be checked to recreate a snapshot for structural changes.

Now the streams in need of snapshots are identified. The next step is to call the appropriate domain model code to replay the stream up to the current state. Then that state gets saved to the Snapshot table. The Stream.Type can be used to determine which stream replay code to call. If you didn't opt to include the Type column in the Stream table, then you could also read the stream's first event. Usually those are indicative of the type of stream it is.

You might not need snapshots.

For many types of applications, well-designed streams do not accumulate large numbers of events over their lifetime. It certainly does not hurt to use snapshots, but they take dev time to implement. And they can always be added later.

What did we accomplish?

These changes allow services which use the event storage to scale in a share-nothing fashion. They can even be stateless and used on serverless platforms. They also provide consistent loading performance as stream sizes grow.

What about scaling the database itself?

We haven't really addressed horizontally scaling the event database itself. In a multi-tenant app, it would be pretty easy here to add a TenantId to these tables and use a sharding solution such as Citus Data.

My current approach to multi-tenancy is schema-based isolation. It would not be difficult to convert into a sharded approach, and I may end up doing just that. However I really like schema isolation, so I'm working through some ideas on using a simple directory (e.g. a file on S3) which maps tenants to database instances.

Update 2019-01-23

The comments below are worth the read, especially the discussions with @damiensawyer . I came to the conclusion that the only necessary table was the Event table. Then the Snapshot table could be added later if needed. The Stream table wasn't really necessary after all. And in fact it complicates the implementation quite a bit -- I had to create a not-so-small stored procedure to keep Stream updated as part of appending events. So I do not consider it worthwhile.

Posted on by:

kspeakman profile

Kasey Speakman

@kspeakman

collector of ideas. no one of consequence.

Discussion

pic
Editor guide
 

Thanks for this. Very useful.

Your stream table has got me thinking. I worked on an event sourced system once where they managed consistency between concurrent writers by reading all the events in a stream in a snapshot isolated transaction then writing the new events in the same transaction - catching the concurrency exception if someone else had written records inside the range covered by the initial select query (at least, I think that's how it worked).

I'm building a system now and was planning to do similar. Advantages over yours is that there's not an extra table required. Disadvantages are that you need to keep the transaction open between the reads and the writes. I'm wondering if yours would allow for greater concurrency on on the db?

This is my first large Postgrep app in anger (after years with SQL Server). Perhaps some more investigation is required on this.

 

I'm wondering if yours would allow for greater concurrency on on the db?

Yes, for sure. What you described is pessimistic concurrency, and it will allow concurrent writers (if all writers follow the same process). But it will slow down drastically as you add more concurrent writers. Generally, I prefer to do a single writer over pessimistic locking. For example, I used a built-in F# MailboxProcessor (essentially queue + worker) to process concurrent requests in a serialized fashion -- so there is no concurrency to worry about. And there is no need for a Stream table. For an internal system, you will probably never need anything else. If you need some availability guarantees, this works fine in an active-passive failover scenario.

If you need concurrent writers for scalability, then optimistic concurrency is better. It is essentially a check-and-set transaction on write only. No need to hold the transaction open while you run decision code. I have described the tables needed for optimistic concurrency, but I have conveniently (for me) left the append implementation as an exercise for the reader. I have a stored procedure for this that I may post once I have some confidence in it.

And if you run into so much concurrency against the same streams that even optimistic concurrency is too slow, then you probably need to move to stateful processing.

I have decent amount of experience in both MSSQL and Postgres (and MySQL a long time ago). MSSQL tooling and user experience is generally better. But Postgres text search is very compelling (and easy) over MSSQL FTS service. Postgres JSON support is great. Postgres still annoys me in the way it handles casing, but it is my default database choice over MSSQL.

 

Thanks for this.

On pondering it, are you sure that you need your Stream table?

The Event table has a unique constraint on (StreamId, Version). If writers are required to always increment the version to be 1 more than they highest event they read (which you may or may not be doing), then if two writers try to compete, that constraint will block the second.

Thanks for the comment! That is a good point. I probably would not let the writer throw an exception. But I could check the Event table (instead of the Stream table) for the highest version. Like this:

SELECT Version
  FROM Event
 WHERE StreamId = @StreamId
 ORDER BY Version DESC
 LIMIT 1
;

Unless I’m missing something, this should be O(1) lookup due to the index.

The Stream table with a Stream Type can still be nice for admin purposes. But it does seem even less important now.

All sweet.

I can't see any reason that query wouldn't be fast.

I'm not sure that I'd have an issue with catching an exception. You'd only catch that 'specific' one (key violation or whatever) and then report that back gracefully to the caller. Alternatively you need to do a potentially unneeded read before every write. Catching the error and retrying is kind of like asking for forgiveness instead of permission.

I was reading about partial indexes (postgresql.org/docs/8.0/static/ind...).

Another way you might do your original method, but without an extra table, is to add a boolean to the event table named latest and require that the StreamId is unique in a partial index over records where that is true.

CREATE UNIQUE INDEX flag_latest_version ON Event(StreamId) WHERE latest;

I've kind of dismissed this though in favour of just enforcing the streamid/version unique constraint. It can fail if a writer tries to write an event with a version greater than 1 more than the current max but, as we control the writers, we can prevent that I guess.

Anyway - it's all good fun. I'm enjoying Postgres! :-)

As a bit of a non sequitur, check out github.com/pgManage/pgManage. I've looked at a stack of PG clients but nothing's come close to SQL Management studio. PGAdmin3 is dated, PGAdmin4 is slooooooow and not even any of the commercial ones have been that impressive. PGManage is great!

Yeah, you are probably right about the exception handling. I had since added a Commit table to the design, mainly to denormalize the meta data across the different events in the same commit. And between 3 tables, I had it in my head to detect conflicts early. But there is great value in the simplicity of one table and just handling the unique key violation exception to detect a concurrency conflict. Using a multi row insert it should still be an all-or-nothing append operation. This has been a great conversation to clarify my thinking. Thank you.

Aside: I use FP, so I typically try to avoid knowingly triggering exceptions unless things are actually exceptional (like database down). In this case, a concurrency conflict is not exceptional -- it is something I expect to happen sometimes under normal circumstances. However, triggering an exception at the database level is probably the best play and I will just convert it to a Result before handling it.

For the latest version idea, I probably would not opt for updating past records (to unset the latest flag). It would no longer be append only, and it may limit options in the future. For example, I have been considering the idea of backing up events to S3 (in addition to SQL backups), and updates would really complicate that scenario.

I appreciate your non sequitur. We have been using PGAdmin4 and have sortof found workflows that work for us. But it is not well polished in certain aspects. I will give PGManage a try. Thanks!

All sweet. I hadn't considered the "append only" thing, but that's a great point. I also didn't know about FP being less tolerant of throwing exceptions.

In the C# world I know that exceptions are slow and I definitely try not to throw them during normal execution - however I would have considered a concurrency failure a good case for throwing one (and specifically catching it higher up the call stack to make retry decisions). Perhaps this comes down to personal style and a glass half full vs empty argument.

I've got the idea of using S3 or similar for events as well (how cool is immutability!!). We haven't started designing our integrators and builders yet - but when we do, even if we don't do it now, I want to at least allow for the idea of running them all as serverless functions at massive scale. At the other end of the spectrum I'm wondering if we can build the whole system to run from a wrist watch. I'm thinking Postgres et al on Docker + k8s + Android Wear. No idea if there's a watch that can do it yet, but there might be soon :-)

re: Exceptions. FP is not necessarily less tolerant. I use F#, and it has pretty rich exception handling. But it is common in FP to represent anticipated error conditions as normal return values rather than through exceptions. Mainly because you cannot see what exception will be thrown unless you look in the code or it is documented. But you can see the return value type. In this case, I expect optimistic concurrency violations sometimes and I expect the caller to handle it. So even though the database adapter will throw a unique constraint violation exception, I will catch it and convert it to a normal return value. However, if the database is just down, I will let that exception propagate up to the caller. Hope that makes sense.

Using messaging (e.g. events) just opens up a whole world of things. It is awesome. :) If you really want your mind blown, event sourcing on the front end.

Cheers for that. :-)

 

I have a question.
We very similiar setup on Postgres. But we encountered an issue that seems to be not solvable with Postgres without logical replication (that we cannot do, as Npgsql do not support it)

How to ensure foreign events table readers to not miss events?

Example,
App open transaction and write

  • stream A with global sequence 100-200
  • stream B with global sequence 200-300

External app read events in pages and request events from 0 to 300

So far so good, but stream B commits earlier then stream A.
External app got stream B with sequence 200-300 and remember that it saw index 300.
Stream A commmits. Extrernal reader will never know that there is a missing events.

We have patched this issue with table lock during read. But this really hurts write performance.

Is there some well known and used by everyone solution that we are missing?

 

Hi Victor,

This exact point was brought up on a subsequent post about multi-tenant event store in the comments.

The problem is the auto-increment position, since it is not transactional. And to solve this problem, you have to manage the position separately so that it is transactional. This should not affect throughput to the same degree as locking the table.

Here are the structures that we use in our event store to solve the listener-missed-event problem.

        -- position counter
        CREATE TABLE IF NOT EXISTS PositionCounter
        (
            Position bigint NOT NULL
        );

        -- initialize the value
        INSERT INTO PositionCounter VALUES (0);

        -- prevent duplication on reinitialization
        CREATE OR REPLACE RULE rule_positioncounter_noinsert AS
        ON INSERT TO PositionCounter DO INSTEAD NOTHING;
        -- prevent accidental deletion
        CREATE OR REPLACE RULE rule_positioncounter_nodelete AS
        ON DELETE TO PositionCounter DO INSTEAD NOTHING;

        -- create function to increment/return position
        DROP FUNCTION IF EXISTS NextPosition();
        CREATE FUNCTION NextPosition() RETURNS bigint AS $$
            DECLARE
                nextPos bigint;
            BEGIN
                UPDATE PositionCounter
                   SET Position = Position + 1
                ;
                SELECT INTO nextPos Position FROM PositionCounter;
                RETURN nextPos;
            END;
        $$ LANGUAGE plpgsql;

        -- events
        CREATE TABLE IF NOT EXISTS Event
        (
            Position bigint NOT NULL,
            StreamId uuid NOT NULL,
            Version int NOT NULL,
            Type text NOT NULL,
            Data jsonb,
            Meta jsonb NOT NULL,
            LogDate timestamptz NOT NULL DEFAULT now(),
            CONSTRAINT pk_event_position PRIMARY KEY (Position),
            CONSTRAINT uk_event_streamid_version UNIQUE (StreamId, Version)
        );

        -- Append only
        CREATE OR REPLACE RULE rule_event_nodelete AS
        ON DELETE TO Event DO INSTEAD NOTHING;
        CREATE OR REPLACE RULE rule_event_noupdate AS
        ON UPDATE TO Event DO INSTEAD NOTHING;

        -- event notification
        DROP TRIGGER IF EXISTS trg_NotifyEvent ON Event;
        DROP FUNCTION IF EXISTS NotifyEvent();

        CREATE FUNCTION NotifyEvent() RETURNS trigger AS $$

            DECLARE
                payload text;

            BEGIN
                -- { position }/{ stream id }/{ version }/{ event type }
                SELECT CONCAT_WS( '/'
                                , NEW.Position
                                , REPLACE(CAST(NEW.StreamId AS text), '-', '')
                                , NEW.Version
                                , NEW.Type
                                )
                    INTO payload
                ;

                -- using lower case channel name or else LISTEN would require quoted identifier.
                PERFORM pg_notify('eventrecorded', payload);

                RETURN NULL;

            END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_NotifyEvent
            AFTER INSERT ON Event
            FOR EACH ROW
            EXECUTE PROCEDURE NotifyEvent()
        ;

Then when you write events to the table, you set the Position to NextPosition().

            INSERT
              INTO Event
                 ( Position
                 , StreamId
                 , Version
                 , Type
                 , Data
                 , Meta
                 )
            VALUES
                 ( NextPosition()
                 , @StreamId
                 , @Version
                 , @Type
                 , @Data
                 , @Meta
                 )
            ;

Side note: You are probably aware of this and consciously made the trade-off, but just in case: writing to multiple streams in a transaction brings some significant restrictions on how the system can evolve in the future. It's fine if you don't plan to scale past a single db node, or can work if you are partitioning db nodes by tenant or some other key. But it does narrow your options in general.

 

Thanks Kasey,

Your articles very helpful.

I will try this solution. But it's not that obvious why is this will work.

As for writing to multiple streams. In most of the time it's totally ok for us. Our app is survey collecting tool. All our tenants start with own empty database, conduct surveys, extract data from server and server with db can be disposed. Most of servers are alive for less then a year. There are some tenants that collect millions of interviews, but they still using shared Amazon RDS instance with no issue on DB load so far.

Our main issue is a bit wrong implementation of CQRS. We run all things synchronously, i.e. [Command-> WriteEvents-> Denormalizers] all happen in single transaction. During this transaction very costly CPU opeartions can happen (our interview aggregate can be very large and consist of 10k-200k events). That's why we stubled on sequence gaps very badly when we extracted one of subsystem that export data from system in separate service. Reading events stream turns out to be not that easy when there is a gaps in sequences.

Glad it helped!

The difference between the two: using the Position table instead of serial, Position will get assigned in the order that transactions commit. Whereas when you use an auto increment position, its value is calculated up front and it is not adjusted if an earlier transaction stalls and commits after a later one.

We did fully consistent write models in one of our apps. It is a valid strategy, but causes write amplification. So yeah, I can see why that didn't work for you!

Ough.

Unfortunatly this solution no better then lock events table on write.

UPDATE PositionCounter SET Position = Position + 1 will actually apply lock on row and block all access from other threads (postgresql.org/docs/12/explicit-lo... ROW LEVEL LOCKS)

This literally drop performance in 10 times according to our load test suite (from 1600 rps to 120 requests per second)...

So question remains open.

Still thanks. Will look for other options.

That is unfortunate.

It could be that the generalist nature of postgres does not allow it to do append-only log as well as a more focused solution like EventStore.

There is one other thing I can think of to try: set Position from an AFTER INSERT trigger. To do that, you will have to make Position nullable or default -1, not a primary key. (StreamId + Version would work as primary.) Do not create it as BIGSERIAL, but instead just as BIGINT with a regular non-unique index. Manually create a sequence. Create an AFTER INSERT trigger to set NEW.Position = nextval('my_sequence').

I'm not confident that it will completely solve the problem of out-of-order positions, but could be worth a try. My understanding is that default values get filled in up front. This includes BIGSERIAL since is just a shortcut for BIGINT ... DEFAULT nextval .... Whereas this approach would fill them after the row was written.

 

I've read your article, and I've one question. You said that you want to go as simple as possible. So why you didn't use one of the ready solutions like RabbitMQ which is open-source message broker? It could be connected to a persistent store to save all events/messages.
Maybe that will solve all problems you've faced connected to the database?

 

In the naive implementation I did try to go as simple as possible for the features I wanted. In growing the implementation, it necessarily becomes less simple to support more capability.

I think adding RabbitMQ in front would in fact greatly increase the complexity. The surface reasons:

  • Going thru a queue adds latency versus directly saving
  • Maintaining a new service which listens to the queue to save events
  • Must train the team on RabbitMQ
  • Must consider how to scale RabbitMQ with load
    • Also scaling the listener service with load

  • RabbitMQ still does not absolve me of scaling the database, because queues are not infinite. Nor is the user's patience in waiting for their request to finally get thru the queue to an overloaded database.

But there are deeper problems with this setup. Let's say two concurrent writers save conflicting events into RabbitMQ (they created divergent changes to the same stream, like a git merge conflict). They have no way of knowing this at the time they submit the events. It is only once RabbitMQ delivers the events to be stored that a conflict can be noticed. By the time, the clients have already received success responses, and the context where the conflict happened (and therefore what might be the best way to handle it) has been lost.

 

I understand your point of view.

I agree that an introduction new component requires additional time to get familiar with it. Based on my experience, in most cases, at the end of the day, an effort which I had to devote to learn brought me gain.

As you said, a new component introduces new problems. Maintaining is a great example. Another benefit of custom implementation of any problem is that you get familiar with the mechanism of saving events more than me for instance. So you became an expert :)

You've mentioned about concurrent writers who are saving events at the same time to the event store. I thought that each event is an independent object with a unique identifier. Even if both events are saving at the same time, they will produce (based on your example) separate entries in the database. I thought that saving an event is one thing and processing it is something slightly different. The responsibility for handling an event is on the event consumer.
Maybe I didn't misunderstand the example.

I suppose I didn't provide enough context in how this is used. (The post was already so large.) Probably the best background is Greg Young's excellent series of articles taking you from a traditional approach to event sourcing.

Events belong to a stream. It parallels very well to git. In git terms, an event is a commit and a stream is a branch. A concrete example: I have Training streams to represent user training. The stream starts with RegistrationCreated { StreamId, TraineeId, CourseId, DueDate, etc.}. It could be that after a Training stream is created, these two things happen simultaneously:

  • An admin cancels the registration.
    • Using the UI, they issue a command CancelRegistration.
    • The Training is currently active, so this is a valid request.
    • The RegistrationCanceled event is generated.

  • A trainee begins training in the same registration by watching a video.
    • Using the UI, they issue a StartVideo command.
    • The Training is currently active and has a video, so this is valid.
    • The VideoStarted event is generated.

Both of these are trying to make divergent changes to the same stream at the same instant in time. The first one to commit events into the stream succeeds. (Using the Stream table described in the post) the second one sees that the stream has changed versions while it was processing the StartVideo command, so it will detect the conflict and probably provide an error back to the trainee.


That was the command side. There is another side to this coin, the query side. In this case there is no business logic to run for user queries (aside from validating queries are well-formed, and the user is permitted to run it).

The models on the query side are built from events as they come in. A concrete example would be when a trainee logs in, they are presented with a list of training they can take. How do we build that list? A service listens for events that affect this list and makes changes to a relational model.

  • RegistrationCreated inserts a new row using data on the event
  • RegistrationCanceled deletes the appropriate row
  • TrainingCompleted deletes the appropriate row

There could be another view for Training History that only inserts rows when it sees TrainingCompleted events. Basically we can create any view of the data we want by replaying events onto it.

This is perhaps the place where RabbitMQ could be useful, to deliver notifications to listeners as new events occurred. However, RabbitMQ would only be part of the solution. You see, listeners usually need to replay streams from a given starting position. For example, I deploy a new listener and let it rebuild from the first event. Another example, a listener experiences an outage and when it starts back it wants to start from the last event it saw. RabbitMQ doesn't have the old events anymore since it was already delivered to all subscribers at the time it was generated. So listeners still need the capability to read from the database from where they left off.

It makes sense. Thanks for deeper explanations. Cheers.

 

"Allows efficient queries based on position. i.e. WHERE SequenceNum > @LastSeenSeqNum"

A good test is to have many parallel threads inserting events and have another thread periodically issuing this query and checking that what it retrieves doesn't contain gaps.

Postgres assigns the serial number at the start of each transaction that inserts the event. If the transactions commit out of order (which postgres is happy to do!) then the query can return gaps which are later filled in. The net effect is that checking against the LastSeen value means you can miss updates as they became visible to your query out of order. Search for "how to create a gapless sequence in postgres".

 

Thanks for pointing this out. I missed this the first time around, but this was pointed in my subsequent article.

I discovered the same gapless sequence article and posted an updated solution in the comments, which can properly handle concurrent writers.

 

That's interesting! You're not concerned with modeling relationships between "snapshots", I take it?

I've been working on an application based on this pattern using triggers to project events into state tables. It's a bit funny just how much room there is in the definition of "event sourcing"...

 

There could be a misunderstanding here on what snapshots are for. What I described above is only the event storage. It does not include read models (aka projections).

I see why that is confusing. The missing piece here is the assumption of separating commands and queries. The "write" model used by commands exists nowhere on disk as a relational model. If it did, it would add deployment complications like having to stop the world to update changed schemas. When I handle commands, I always rebuild the current state of the stream from events. However, replaying particularly large streams can slow things down. Snapshots are only an aid to speed up replay for commands, not something to be used directly.

In contrast, read models (not represented in the article) are kept updated as events happen in the system. They are read as-is (preferably) without any rebuild step. They could be fully relational, key-value, document, graph, or whatever. (Also, it is fine for the command side to query read models for related information, like settings.)

I hadn't seen that project before. Thanks for linking that.

 

Just wanted to nudge you to add the #postgresql tag :)

 

Good catch. I added it.