DEV Community

Event Storage in Postgres

Kasey Speakman on March 25, 2018

One of the critical pieces of any infrastructure is storage. Compared to traditional relational models, storing events in a log is quite simple. Ho...
Collapse
 
alfeg profile image
Victor Gladkikh

I have a question.
We very similiar setup on Postgres. But we encountered an issue that seems to be not solvable with Postgres without logical replication (that we cannot do, as Npgsql do not support it)

How to ensure foreign events table readers to not miss events?

Example,
App open transaction and write

  • stream A with global sequence 100-200
  • stream B with global sequence 200-300

External app read events in pages and request events from 0 to 300

So far so good, but stream B commits earlier then stream A.
External app got stream B with sequence 200-300 and remember that it saw index 300.
Stream A commmits. Extrernal reader will never know that there is a missing events.

We have patched this issue with table lock during read. But this really hurts write performance.

Is there some well known and used by everyone solution that we are missing?

Collapse
 
kspeakman profile image
Kasey Speakman • Edited

Hi Victor,

This exact point was brought up on a subsequent post about multi-tenant event store in the comments.

The problem is the auto-increment position, since it is not transactional. And to solve this problem, you have to manage the position separately so that it is transactional. This should not affect throughput to the same degree as locking the table.

Here are the structures that we use in our event store to solve the listener-missed-event problem.

        -- position counter
        CREATE TABLE IF NOT EXISTS PositionCounter
        (
            Position bigint NOT NULL
        );

        -- initialize the value
        INSERT INTO PositionCounter VALUES (0);

        -- prevent duplication on reinitialization
        CREATE OR REPLACE RULE rule_positioncounter_noinsert AS
        ON INSERT TO PositionCounter DO INSTEAD NOTHING;
        -- prevent accidental deletion
        CREATE OR REPLACE RULE rule_positioncounter_nodelete AS
        ON DELETE TO PositionCounter DO INSTEAD NOTHING;

        -- create function to increment/return position
        DROP FUNCTION IF EXISTS NextPosition();
        CREATE FUNCTION NextPosition() RETURNS bigint AS $$
            DECLARE
                nextPos bigint;
            BEGIN
                UPDATE PositionCounter
                   SET Position = Position + 1
                ;
                SELECT INTO nextPos Position FROM PositionCounter;
                RETURN nextPos;
            END;
        $$ LANGUAGE plpgsql;

        -- events
        CREATE TABLE IF NOT EXISTS Event
        (
            Position bigint NOT NULL,
            StreamId uuid NOT NULL,
            Version int NOT NULL,
            Type text NOT NULL,
            Data jsonb,
            Meta jsonb NOT NULL,
            LogDate timestamptz NOT NULL DEFAULT now(),
            CONSTRAINT pk_event_position PRIMARY KEY (Position),
            CONSTRAINT uk_event_streamid_version UNIQUE (StreamId, Version)
        );

        -- Append only
        CREATE OR REPLACE RULE rule_event_nodelete AS
        ON DELETE TO Event DO INSTEAD NOTHING;
        CREATE OR REPLACE RULE rule_event_noupdate AS
        ON UPDATE TO Event DO INSTEAD NOTHING;

        -- event notification
        DROP TRIGGER IF EXISTS trg_NotifyEvent ON Event;
        DROP FUNCTION IF EXISTS NotifyEvent();

        CREATE FUNCTION NotifyEvent() RETURNS trigger AS $$

            DECLARE
                payload text;

            BEGIN
                -- { position }/{ stream id }/{ version }/{ event type }
                SELECT CONCAT_WS( '/'
                                , NEW.Position
                                , REPLACE(CAST(NEW.StreamId AS text), '-', '')
                                , NEW.Version
                                , NEW.Type
                                )
                    INTO payload
                ;

                -- using lower case channel name or else LISTEN would require quoted identifier.
                PERFORM pg_notify('eventrecorded', payload);

                RETURN NULL;

            END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_NotifyEvent
            AFTER INSERT ON Event
            FOR EACH ROW
            EXECUTE PROCEDURE NotifyEvent()
        ;

Then when you write events to the table, you set the Position to NextPosition().

            INSERT
              INTO Event
                 ( Position
                 , StreamId
                 , Version
                 , Type
                 , Data
                 , Meta
                 )
            VALUES
                 ( NextPosition()
                 , @StreamId
                 , @Version
                 , @Type
                 , @Data
                 , @Meta
                 )
            ;

Side note: You are probably aware of this and consciously made the trade-off, but just in case: writing to multiple streams in a transaction brings some significant restrictions on how the system can evolve in the future. It's fine if you don't plan to scale past a single db node, or can work if you are partitioning db nodes by tenant or some other key. But it does narrow your options in general.

Collapse
 
alfeg profile image
Victor Gladkikh

Thanks Kasey,

Your articles very helpful.

I will try this solution. But it's not that obvious why is this will work.

As for writing to multiple streams. In most of the time it's totally ok for us. Our app is survey collecting tool. All our tenants start with own empty database, conduct surveys, extract data from server and server with db can be disposed. Most of servers are alive for less then a year. There are some tenants that collect millions of interviews, but they still using shared Amazon RDS instance with no issue on DB load so far.

Our main issue is a bit wrong implementation of CQRS. We run all things synchronously, i.e. [Command-> WriteEvents-> Denormalizers] all happen in single transaction. During this transaction very costly CPU opeartions can happen (our interview aggregate can be very large and consist of 10k-200k events). That's why we stubled on sequence gaps very badly when we extracted one of subsystem that export data from system in separate service. Reading events stream turns out to be not that easy when there is a gaps in sequences.

Thread Thread
 
kspeakman profile image
Kasey Speakman

Glad it helped!

The difference between the two: using the Position table instead of serial, Position will get assigned in the order that transactions commit. Whereas when you use an auto increment position, its value is calculated up front and it is not adjusted if an earlier transaction stalls and commits after a later one.

We did fully consistent write models in one of our apps. It is a valid strategy, but causes write amplification. So yeah, I can see why that didn't work for you!

Thread Thread
 
alfeg profile image
Victor Gladkikh • Edited

Ough.

Unfortunatly this solution no better then lock events table on write.

UPDATE PositionCounter SET Position = Position + 1 will actually apply lock on row and block all access from other threads (postgresql.org/docs/12/explicit-lo... ROW LEVEL LOCKS)

This literally drop performance in 10 times according to our load test suite (from 1600 rps to 120 requests per second)...

So question remains open.

Still thanks. Will look for other options.

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

That is unfortunate.

It could be that the generalist nature of postgres does not allow it to do append-only log as well as a more focused solution like EventStore.

There is one other thing I can think of to try: set Position from an AFTER INSERT trigger. To do that, you will have to make Position nullable or default -1, not a primary key. (StreamId + Version would work as primary.) Do not create it as BIGSERIAL, but instead just as BIGINT with a regular non-unique index. Manually create a sequence. Create an AFTER INSERT trigger to set NEW.Position = nextval('my_sequence').

I'm not confident that it will completely solve the problem of out-of-order positions, but could be worth a try. My understanding is that default values get filled in up front. This includes BIGSERIAL since is just a shortcut for BIGINT ... DEFAULT nextval .... Whereas this approach would fill them after the row was written.

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

This might work when combined with an advisory lock. Although I think the AFTER INSERT trigger has performance overhead of its own.

Re: Sequences, txids, and serial order of transactions

Thread Thread
 
janisk profile image
janisk

Thanks for your great article and also to the community for discussing so much about best practices!
I also want to build an event store with Postgres - but I am now a bit confused what the best practice is for storing the number of the event. I'm not so deep in Postgres - maybe that's why I don't understand your last two comments in a way that I would be able to start on it right away ;)
Could you maybe elaborate a bit on what you think is the best practice for this? AFTER INSERT? The position counter you proposed above? How would such a solution look like?
Thank you so much!

Thread Thread
 
kspeakman profile image
Kasey Speakman

I have not found a best practice. The most straightforward method is the position counter table I posted in a previous comment, here. Your app code can simply do the insert and the position is guaranteed monotonic and gapless.

The advisory lock method may or may not improve performance. And you can implement it in such a way that notify could still get out-of-order notifications. So it would require some attention to detail. It's not something that we need so I haven't tried it or invested time in designing it.

I also found these tips for improving insert performance. I may incorporate a couple of these in the future. Some of them I would not do for an event store (e.g. unlogged). Following the suggestion to reduce indexing, you could also remove Position altogether and just use the LogDate as the ordering. Except some events will have an exactly identical LogDate (example: saving 2 events in the same transaction). Only ordering by LogDate, these events may swap order between different reads. So you'd probably really sort by LogDate + StreamId + Version to get a consistent ordering.

If you need better performance beyond what these provide, then you have to write some application-level code.

First, you can go back to using sequences but make out-of-order events the listener's problem. Code the event listeners to look out for sequence gaps. For example, event Position should be last seen Position + 1 but it is higher. Then the listener waits until it sees the missing Position or until a timeout passes. The timeout is needed because Postgres sequence gaps can naturally occur. For example, two users try to change the same stream simultaneously. Both will consume a sequence number but only one will actually be committed -- the other will experience a unique index violation. Without a timeout to give up on the missing position, the listener will wait forever.

If you need to completely alleviate the performance drain of monotonic, gapless sequences, you can keep the Position counter in memory and use a single writer (no simultaneous writes) at the application level. The writer code will hand out Positions itself before executing the inserts in order. You can batch writes to further improve perf. I've done something like this (and some of the linked perf tips) in other contexts and achieved millions of writes per second... briefly. As indexes grow, performance drops off.

Victor got 120 request / sec with the transactional PositionCounter table. Keep in mind these are writes only (and includes writes to other tables in the same transaction). A typical transactional system will read an order of magnitude more times than it writes. If our systems sustained 100 writes per second, we would have brought in a lot of revenue. And we would have had the resources to switch to the higher-performing EventStore.org before then. But you have to evaluate what fits your needs.

Thread Thread
 
janisk profile image
janisk

Thank you so much for this really long and informative reply!
I kind of like the approach that you suggested to keep the logic in the application and let the client handle it.

As I thought about how such a solution could work, I came to another issue though: assuming the case that has been described in the comments before. One transaction starts (trying to manipulate aggregate a), then another transaction starts (trying to manipulate the same aggregate a) and terminates before the first one. In this case it would have the higher sequential number/position, but in fact a lower version number of that aggregate a!
So in fact, for this aggregate, we would actually HAVE TO process the event with higher position before the event with lower position - because it has the lower version number.
Am I know completely lost here, or isn't that actually an issue? This would essentially mean that the position number is no reliable source for our projections anyway and instead we have to look at aggregate/ aggregate version level. This would complicate things by a lot though. I hope that I am wrong about this :D

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

Thanks for the question.

It does not work quite the way you described, assuming a few things at the application level. (There is no need to hold a transaction open for the whole process. That will reduce performance even more.) First, aggregate state is replayed from events. Since the separate operations are simultaneous, they loaded the same Version of the stream, say, Version 4. They will each independently calculate their emitted events to start at Version = 5. When each process tries to simultaneously appended its events to the db, the first transaction to complete will take Version = 5 in the database. The other will fail, having violated the unique index on Stream ID and Version for Version = 5. This is optimistic concurrency.

In the case where one change emits multiple events, I refer to this in code as a “commit”. Events in a commit should be saved in the same transaction, which will order them correctly. As well as save them with all-or-nothing semantics. This is the only need for a transaction. (To reserve more architectural options for yourself in the future, you should also avoid cross-stream transactions. This is already expected of DDD aggregates. This constraint can present some design challenges, but has always resulted in better design for us.)

I suppose there is a lot of unstated cooperation required at the application level to make everything work properly. I’ve had in mind to write an article with more about this. And how these details were discovered as we evolved from a simple to more robust implementation of an event store. Our journey has illuminated why EventStore.org has made many of its choices.

I’d also like to publish our Postgres event store F# libraries. It requires me to publish and package other internal libraries first. And I’m just not keen on doing it. I’m quite happy to give the code away, but as a creative/builder personality, I will absolutely neglect repo maintenance.

Thread Thread
 
janisk profile image
janisk

Thanks Kasey for your response!
I am not sure though if we talk about the same thing! I am aware of how optimistic concurrency works for event sourcing and was thinking about exactly this use case.
My example would be the same you describe, except that the transaction which started first finishes last, which means that it has the lower position number and the higher aggregate version number. Our read model is in general interested in processing the events in the correct version order. However, if the position of the event does not reflect this version order, then we cannot rely on this as a correct way of processing our events with our projections.
Does this make sense?

Thread Thread
 
kspeakman profile image
Kasey Speakman

The only way you could make this happen is to calculate the event Version in SQL as part of the database transaction. But then you would also lose optimistic concurrency. Optimistic concurrency requires that the Version be calculated in application code to guarantee the facts it based decisions on are the latest. It also means your scenario would trigger a failure instead of saving Position out of order within the same stream.

Walking through it, the application code will not see events from uncommitted transactions when they calculate their event Versions. So assuming the first transaction was held open long enough for a second operation to run and commit its transaction first, both transactions would have calculated the same event Version on that stream. The fast transaction would commit with a higher assigned Position, and the slow transaction would later fail on optimistic concurrency and its lower assigned position would never be used. This is all assuming that positions are assigned up front, either by pg sequence or a serialized-access counter in application code.

Positions can still be saved out-of-order across different streams. This can be a problem if views depend on total ordering. For example, foreign key relationships.

I appreciate this line of questions as I came to some other realizations in explaining it.

Thread Thread
 
janisk profile image
janisk

You're right. I went one step too far, thinking already about how to still make the first transaction which took too long work. But in reality, I'll make it work (if possible) in application code, not in the same database transaction. Therefore, this situation would indeed not happen.

An example of what I planned on doing: If the first transaction takes too long so that the second transaction takes over the respective aggregate version, the first transaction will fail due to the uniqueness requirement. Then I can load the new state of the aggregate from the database and can retry the command which triggered the first transaction. If it makes sense, then I can save the new events in the database. However, this would, of course, be a new transaction. I don't really know what I was thinking there...

I'm glad that you appreciate my questions and thoughts, because you helped me a lot with your article and your answers! Really, thank you so much!

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

I found a possible solution: ctid.

ctid is a system column which all postgres tables have. It contains the physical location of the row in the table file. It will be in insert order initially, but will change as records are updated and deleted and auto-vacuum is run. Since this implementation has rules to prevent updates and deletes, it should always be in insert order.

However, there are some issues. Ordering by ctid can be less performant than a primary key. Main issue I found was queries ordered by ctid cannot be index only scans, just index scans. (Because of technical reasons around HOT chains.) I haven't measured to see if that makes a difference here. Also if you do ever update or delete+autovacuum events, the ctids will change and no longer be in insert order. So if ctids are used for ordering, the only option to patch data is to copy events to a new table, modifying or omitting during the copy as necessary. (This was always our plan of last resort rather than modifying events, but maybe not everyone's.) Even then, event changes could shift the ctids. An updated event could be larger or smaller by enough that a different number of events fit in the 8k block, shifting up or down the ctids of every event that comes after it. Then you need to determine how the listeners will deal with that. View writers are not a problem as long as you don't mind rebuilding all views. Event sourced process managers should be okay if they didn't listen for the deleted/updated event -- just shift the last seen position accordingly. Otherwise they will require further analysis.

This makes me think that if I ever do need to delete an event (e.g. legal reasons), it might be better to update it to be a tombstone. And also maybe I should keep the last event id (stream id + version) instead of last position on event listeners. Keep the ordering column as an implementation detail rather than requiring clients to use it directly as I do now. More to think on.

Thread Thread
 
rusfighter profile image
ilija

A little bit late, however since it is almost impossible to avoid locking when using global counters, it is perhaps a good idea to think about how to reduce the IO operations (calls between api and db). One solution is to use batching and making just one sql statement. Here is my version:

        with pos as (
            UPDATE benchmark.positioncounter
            SET position = position + ${items.length}
            returning position
        ),
        new_events as (
            SELECT
                stream,
                event_name,
                version,
                data
            FROM json_to_recordset(${items}) as x(
                stream text,
                event_name text,
                version integer,
                data json
            )
        )
        INSERT INTO benchmark.events (
            position,
            stream,
            event_name,
            version,
            data
        ) SELECT 
            (row_number() OVER () + pos.position -  ${items.length}) as position, 
            stream, 
            event_name, 
            version, 
            data 
        FROM new_events, pos
Enter fullscreen mode Exit fullscreen mode

Same idea as suggested by kasey, however using a CTE and ability to insert multiple events. To increase performance you can do some batching (inserting like 50 events at a time). Using a basic benchmarking with batches of 20 events and concurrency of 50, I can insert 200000 events in about 20 seconds.

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

Thanks for posting this. If my math is right, this is about 10k events per second. This could be a decent increase depending on your hardware. We've gotten about 1k events per second on an AWS t2.micro instance. That's in testing of course. We'll have company growth problems before we get close to that number.

Our use cases are somewhat pathological as far as seeing performance gains from this alone. Our use cases only save a handful of events at a time, often just 1. When there are multiple, they need to be saved in the same transaction (all-or-nothing). Sometimes saving an event can and should fail due to optimistic concurrency, but it should not prevent subsequent events from being saved. We would need a batching layer in the API that follows these constraints. Complicated, but seems doable.

But that's our usage, not everyone's. Once we're looking to raise this limit, we'll probably evaluate system options like sharding, switching to EventStore, etc. By then we should be in a different ballpark revenue-wise too, so would be a great problem to have.

Thread Thread
 
rusfighter profile image
ilija • Edited

I wonder did you try to use advisory locks (should be faster in general) instead of row locking (on the position) and if so, did it improve the performance? Using batches has major drawbacks because of the stream + version constraint.

And what about write performance if the table gets grows very large, is it consistent or does it degrades over number of rows?

Thread Thread
 
kspeakman profile image
Kasey Speakman

I haven't tried an advisory lock. My basic understanding is that advisory locks are simpler than row/table locks. So they should reduce lock/unlock time. But how much, I haven't measured.

My observation in testing is that write perf hits a degradation breakpoint when indexes can't fit in memory anymore. And the bigger they get, the more cache misses, the more disk IO has to be done. First fix is scaling up db mem/cpu. When that limit is hit or impractically expensive, it's time to use multiple nodes like sharding.

The ultimate solution for perf is to have no position at all. The only order that matters is within a stream. Cross-stream ordering is best effort. So you could use something unstable (sometimes wrong) like timestamp. Readers should tolerate some out-of-order events across streams. So for example, no strict foreign key enforcement in tabular views (aka projections). Because the causal event from one stream might mistakenly be ordered after the effect event in another stream. But the events within each stream are totally ordered correctly. Sometimes it can be very convenient to have a stable ordering across streams so I probably wouldn't take this step until perf was a critical constraint.

Thread Thread
 
rusfighter profile image
ilija

On how many rows did you see the performance degregation? Is it in range of 1,10 or 100 millions?

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

I can't remember the exact breakpoints and it would vary anyway based on your hardware and index structures (3 fields vs 1 field, etc). This is anecdotal memory rather than a scientific experiment. I wrote a chess simulation program that generated unique boards and looked for checkmates. It would start off saving millions of boards per second. But very quickly stabilized in the tens of thousands of writes per second. I was using COPY to insert rows with batch sizes of 1024. I ran it for 2-3 months. When I stopped, db had over a trillion records and a write rate in the single digits. DB size was getting close to 3TB. The indexes were bigger than the data table. I learned a lot from that. I was very impressed with postgres.

Also, the performance drop-off was accelerated due to duplicate checking. Which also got more expensive (more IO) as time went on. I was tracking the dupe rate as well, but don't remember it off the top.

Thread Thread
 
rusfighter profile image
ilija

Interesting. Did you also test your implementation of the eventstore with large amounts of data?

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

Frankly no. We are using it for business ops rather than something like sensor data. If our write ops crosses even 1 event per second monthly avg we would have already grown the organization dramatically. The goal is not to ride out this solution to a million ops per second where it becomes very complicated to use. But to evaluate all the options when we come to the next inflection point. Though I constructed this and iterate on it, I realize that it has a context where it is useful and instructive. I am open to other options to meet different needs. I am still looking for perf gains with this, as that also means better efficiency with same resources. And that means less cost for cloud services, which is especially helpful early in a product life cycle. So I very much appreciate the code you posted and this discussion. :) And any testing anyone wants to do. I still want to post my updates and the F# libraries I use on top of this at some point. Still not quite where I want them.

Thread Thread
 
rusfighter profile image
ilija • Edited

Yes you are correct. I am currently using kind of the same eventstore implementation in postgres for a SaaS app and the event rate is really low. However migrating the data to another system could be painful and you want to make the best decisions as early as possible but keeping the infra manageable without using many different systems. My worries for postgres are mostly about the table size. A rate of 1 event per second already produced 32 million of events per year. And after a year or two, you still want a decent performance for fetching a event stream or inserting an event

I also found another eventstore implementation for postgres, used with elixir: github.com/commanded/eventstore . They use a different table structure, as can be found here: github.com/commanded/eventstore/bl...

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

The nice thing about event sourcing is that events are the source of truth and they are pretty portable. We have copies of the system for multiple environments as I'm sure you do. So it would not be too much of a stretch to work out the integration details with a different solution, spin up a new system with it, save the existing events onto that copy, and validate it.

32 million rows is easy for postgres as far as just storage and insertion perf. One of our products is getting close to a million events after 6 years and it still runs lightly on micro instances. So I have lots of scale-up room before I need to reevaluate. That one used full consistency, so the read models are in the same db and there is write amplification. I used different schemas per tenant so it's not one set of indexes. If that does make a difference, the same could be accomplished with table partitioning.

The length of a stream (and therefore replay time) is more in the design camp than the perf camp. I've been meaning to make a post summarizing things I've learned and rules of thumb I use for event stream design. For example I will use unbounded streams for repeated processes. Like a yearly audit. But I don't replay the whole thing. I replay from the last completed audit. So replay sees a bounded number of events every time. And perf doesn't significantly change with years of history.

I will check that implementation out and see what I can learn. Thanks for linking it! :)

Thread Thread
 
kspeakman profile image
Kasey Speakman

I only looked at the table structures. It looks like it's meant to be eventstore.com on postgres. Interesting idea.

One of things I didn't mention with scaling limits to postgres is connection limits. Each connection eats non-trivial amounts of server resources. Our AWS t2.micro instances can handle about 85 before it can't make new ones. (Ask me how I know.) But resources allocated to sessions are resources not allocated to running SQL. So we want to stay well below the limit. This is why things like Pgbouncer exist.

I want to explore creating my own event store service that will accept ES commands and use SignalR or WebSockets for listeners on top of my postgres ES. It can maintain its own limited number of connections to give the db as much resources as possible. And assign Positions from mem to alleviate that bottleneck. It can use opportunistic batching for yet more perf. Practically though, this creates many new downsides. An extra network hop (+latency). New failure modes and recovery models. Pub/sub handling. This is all potentially fun stuff I might like to do for my own learning. But when approaching limits of the original solution, for work I'd be evaluating eventstore.com instead since they solved most of these problems already. And it's high availability and free to use. I'm sure it has its own issues / workarounds, but probably so would my service. :)

Collapse
 
damiensawyer profile image
damiensawyer

Thanks for this. Very useful.

Your stream table has got me thinking. I worked on an event sourced system once where they managed consistency between concurrent writers by reading all the events in a stream in a snapshot isolated transaction then writing the new events in the same transaction - catching the concurrency exception if someone else had written records inside the range covered by the initial select query (at least, I think that's how it worked).

I'm building a system now and was planning to do similar. Advantages over yours is that there's not an extra table required. Disadvantages are that you need to keep the transaction open between the reads and the writes. I'm wondering if yours would allow for greater concurrency on on the db?

This is my first large Postgrep app in anger (after years with SQL Server). Perhaps some more investigation is required on this.

Collapse
 
kspeakman profile image
Kasey Speakman • Edited

I'm wondering if yours would allow for greater concurrency on on the db?

Yes, for sure. What you described is pessimistic concurrency, and it will allow concurrent writers (if all writers follow the same process). But it will slow down drastically as you add more concurrent writers. Generally, I prefer to do a single writer over pessimistic locking. For example, I used a built-in F# MailboxProcessor (essentially queue + worker) to process concurrent requests in a serialized fashion -- so there is no concurrency to worry about. And there is no need for a Stream table. For an internal system, you will probably never need anything else. If you need some availability guarantees, this works fine in an active-passive failover scenario.

If you need concurrent writers for scalability, then optimistic concurrency is better. It is essentially a check-and-set transaction on write only. No need to hold the transaction open while you run decision code. I have described the tables needed for optimistic concurrency, but I have conveniently (for me) left the append implementation as an exercise for the reader. I have a stored procedure for this that I may post once I have some confidence in it.

And if you run into so much concurrency against the same streams that even optimistic concurrency is too slow, then you probably need to move to stateful processing.

I have decent amount of experience in both MSSQL and Postgres (and MySQL a long time ago). MSSQL tooling and user experience is generally better. But Postgres text search is very compelling (and easy) over MSSQL FTS service. Postgres JSON support is great. Postgres still annoys me in the way it handles casing, but it is my default database choice over MSSQL.

Collapse
 
damiensawyer profile image
damiensawyer

Thanks for this.

On pondering it, are you sure that you need your Stream table?

The Event table has a unique constraint on (StreamId, Version). If writers are required to always increment the version to be 1 more than they highest event they read (which you may or may not be doing), then if two writers try to compete, that constraint will block the second.

Thread Thread
 
kspeakman profile image
Kasey Speakman

Thanks for the comment! That is a good point. I probably would not let the writer throw an exception. But I could check the Event table (instead of the Stream table) for the highest version. Like this:

SELECT Version
  FROM Event
 WHERE StreamId = @StreamId
 ORDER BY Version DESC
 LIMIT 1
;

Unless I’m missing something, this should be O(1) lookup due to the index.

The Stream table with a Stream Type can still be nice for admin purposes. But it does seem even less important now.

Thread Thread
 
damiensawyer profile image
damiensawyer • Edited

All sweet.

I can't see any reason that query wouldn't be fast.

I'm not sure that I'd have an issue with catching an exception. You'd only catch that 'specific' one (key violation or whatever) and then report that back gracefully to the caller. Alternatively you need to do a potentially unneeded read before every write. Catching the error and retrying is kind of like asking for forgiveness instead of permission.

I was reading about partial indexes (postgresql.org/docs/8.0/static/ind...).

Another way you might do your original method, but without an extra table, is to add a boolean to the event table named latest and require that the StreamId is unique in a partial index over records where that is true.

CREATE UNIQUE INDEX flag_latest_version ON Event(StreamId) WHERE latest;

I've kind of dismissed this though in favour of just enforcing the streamid/version unique constraint. It can fail if a writer tries to write an event with a version greater than 1 more than the current max but, as we control the writers, we can prevent that I guess.

Anyway - it's all good fun. I'm enjoying Postgres! :-)

As a bit of a non sequitur, check out github.com/pgManage/pgManage. I've looked at a stack of PG clients but nothing's come close to SQL Management studio. PGAdmin3 is dated, PGAdmin4 is slooooooow and not even any of the commercial ones have been that impressive. PGManage is great!

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

Yeah, you are probably right about the exception handling. I had since added a Commit table to the design, mainly to denormalize the meta data across the different events in the same commit. And between 3 tables, I had it in my head to detect conflicts early. But there is great value in the simplicity of one table and just handling the unique key violation exception to detect a concurrency conflict. Using a multi row insert it should still be an all-or-nothing append operation. This has been a great conversation to clarify my thinking. Thank you.

Aside: I use FP, so I typically try to avoid knowingly triggering exceptions unless things are actually exceptional (like database down). In this case, a concurrency conflict is not exceptional -- it is something I expect to happen sometimes under normal circumstances. However, triggering an exception at the database level is probably the best play and I will just convert it to a Result before handling it.

For the latest version idea, I probably would not opt for updating past records (to unset the latest flag). It would no longer be append only, and it may limit options in the future. For example, I have been considering the idea of backing up events to S3 (in addition to SQL backups), and updates would really complicate that scenario.

I appreciate your non sequitur. We have been using PGAdmin4 and have sortof found workflows that work for us. But it is not well polished in certain aspects. I will give PGManage a try. Thanks!

Thread Thread
 
damiensawyer profile image
damiensawyer

All sweet. I hadn't considered the "append only" thing, but that's a great point. I also didn't know about FP being less tolerant of throwing exceptions.

In the C# world I know that exceptions are slow and I definitely try not to throw them during normal execution - however I would have considered a concurrency failure a good case for throwing one (and specifically catching it higher up the call stack to make retry decisions). Perhaps this comes down to personal style and a glass half full vs empty argument.

I've got the idea of using S3 or similar for events as well (how cool is immutability!!). We haven't started designing our integrators and builders yet - but when we do, even if we don't do it now, I want to at least allow for the idea of running them all as serverless functions at massive scale. At the other end of the spectrum I'm wondering if we can build the whole system to run from a wrist watch. I'm thinking Postgres et al on Docker + k8s + Android Wear. No idea if there's a watch that can do it yet, but there might be soon :-)

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

re: Exceptions. FP is not necessarily less tolerant. I use F#, and it has pretty rich exception handling. But it is common in FP to represent anticipated error conditions as normal return values rather than through exceptions. Mainly because you cannot see what exception will be thrown unless you look in the code or it is documented. But you can see the return value type. In this case, I expect optimistic concurrency violations sometimes and I expect the caller to handle it. So even though the database adapter will throw a unique constraint violation exception, I will catch it and convert it to a normal return value. However, if the database is just down, I will let that exception propagate up to the caller. Hope that makes sense.

Using messaging (e.g. events) just opens up a whole world of things. It is awesome. :) If you really want your mind blown, event sourcing on the front end.

Thread Thread
 
damiensawyer profile image
damiensawyer

Cheers for that. :-)

Thread Thread
 
alphahydrae profile image
Simon Oulevay

There may be an edge case I have missed and I don't know about performance, but I think it may be possible to enforce that the version increment is always exactly 1 at the cost of some redundancy, by adding the following column and constraints to the table:

PreviousVersion int,
FOREIGN KEY (StreamId, PreviousVersion)
  REFERENCES Event (StreamId, Version),
CHECK (
  Version = 1 AND PreviousVersion IS NULL
  OR
  PreviousVersion IS NOT NULL
    AND Version - PreviousVersion = 1
)
Enter fullscreen mode Exit fullscreen mode

We make each event reference the previous version of the stream with a multi-column foreign key and enforce the increment by 1 with a check constraint. The first event (version 1) is allow to have a null previous version.

Thread Thread
 
damiensawyer profile image
damiensawyer

I've only just skimmed this quickly (trying to get something done with kids screaming!)
Some quick thoughts:

  • If you're always enforcing that PreviousVersion is Version - 1, why store PreviousVersion?
  • In the most recent event sourced system I worked on, we allowed for gaps in the events. This was so we could delete events post production (arrgh!! The nerve!!). This was done after years of managing production event sourced systems and really wishing that we could do that.
  • If you did have gaps, you could have a PreviousVersion column... however, if all of the versions are incremental, do you really need it? The previous version is ascertainable through sorting the events.
  • I guess that if you allows forks in your event stream it would make sense to refer to a parent.... kind of like a directed acylclic graph... but that's not really the use case here.

If any of that sounds confusing or off, it probably is as it was a very quickly considered response :-)

Thread Thread
 
alphahydrae profile image
Simon Oulevay

This idea of the previous version was in response to your (2-year old) discussion with the author about concurrent writers, namely:

The Event table has a unique constraint on (StreamId, Version). If writers are required to always increment the version to be 1 more than they highest event they read (which you may or may not be doing), then if two writers try to compete, that constraint will block the second.

And:

It can fail if a writer tries to write an event with a version greater than 1 more than the current max but, as we control the writers, we can prevent that I guess.

Assuming there's a bug in one of the concurrent writers and it increments by 2 or more, multiple events may be written in the same stream at the same time.

As an intellectual exercise, I was wondering if there was a way to enforce that writers always increment the version by 1 at the database level. That's the purpose of the previous version column. The multi-column foreign key forces an event to reference another event, and the check constraint forces that event to be the previous version of the stream exactly.

If a writer is bugged and tries to increment by 2, the database will refuse to store the event (since it would fail the check constraint). It complements the protection against concurrent writers provided by the unique constraint on stream ID and version, by also protecting against misbehaved writers.

Of course that would only work if you want to implement an extremely strict event store. It would not support gaps between events or deleting events like in the systems you mention. You probably don't want to be that strict in a real-life system.

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

So this brings up a classic trade-off of what should be managed through the DB and what should be managed by code. The particular trade-off for managing this constraint in the database is a performance hit. To ensure writers do the right thing, we sided with code and made a library so that they all behave consistently. And in fact I'm still searching for ways to reduce our existing indexing/locking (of Position table, see other comments) which negatively impacts write perf. Not enough to matter for our use cases, but I would like to optimize further.

I've started to see why Greg Young's Event Store made the trade-offs it did -- many of which involve moving responsibilities out of disk into service code. It doesn't have the auto-increment or position-lock problem because it uses a serialized writer so the current position is able to be tracked in memory and recorded to disk when events are written. (Your client code can make concurrent writes, but inside the ES service, writes are serialized and fast.) Instead of a Stream table to record StreamType, you make stream type part of the stream id. Then you can categorize streams after the fact based just on stream ID. Instead of a unique index on StreamId + Version, GYES uses an Event ID, which at the application level you are supposed to create deterministically from StreamId + Version. So the the effect is the same -- duplicate events are prevented from being saved. The big perf gains (especially around Position) are implausible when using Postgres because it is not optimized for this type of use case. But we still want to use Postgres since we'd really need more people before we looked at supporting multiple DBs. And I like that it is a gentle starting point for smaller teams/projects.

Thread Thread
 
damiensawyer profile image
damiensawyer

Kasey, that's really interesting about GYES. It's a product I'd really love to have a play with one day. We looked at it a few years ago when starting our system but I chickened out because "No one ever got fired buying IBM (... in this case, Postgres)".

Simon, I get what you're going for and it's interesting. It would be interesting to see how that affected write performance.

Thread Thread
 
gregyoung profile image
gregyoung

If you want to look at the code involved look in StorageWriter

github.com/EventStore/EventStore/b... is a good place to start.

Thread Thread
 
damiensawyer profile image
damiensawyer

Thanks Greg. That's great. I'll have a look.

Collapse
 
rafalpienkowski profile image
Rafal Pienkowski

I've read your article, and I've one question. You said that you want to go as simple as possible. So why you didn't use one of the ready solutions like RabbitMQ which is open-source message broker? It could be connected to a persistent store to save all events/messages.
Maybe that will solve all problems you've faced connected to the database?

Collapse
 
kspeakman profile image
Kasey Speakman • Edited

In the naive implementation I did try to go as simple as possible for the features I wanted. In growing the implementation, it necessarily becomes less simple to support more capability.

I think adding RabbitMQ in front would in fact greatly increase the complexity. The surface reasons:

  • Going thru a queue adds latency versus directly saving
  • Maintaining a new service which listens to the queue to save events
  • Must train the team on RabbitMQ
  • Must consider how to scale RabbitMQ with load
    • Also scaling the listener service with load

  • RabbitMQ still does not absolve me of scaling the database, because queues are not infinite. Nor is the user's patience in waiting for their request to finally get thru the queue to an overloaded database.

But there are deeper problems with this setup. Let's say two concurrent writers save conflicting events into RabbitMQ (they created divergent changes to the same stream, like a git merge conflict). They have no way of knowing this at the time they submit the events. It is only once RabbitMQ delivers the events to be stored that a conflict can be noticed. By the time, the clients have already received success responses, and the context where the conflict happened (and therefore what might be the best way to handle it) has been lost.

Collapse
 
rafalpienkowski profile image
Rafal Pienkowski

I understand your point of view.

I agree that an introduction new component requires additional time to get familiar with it. Based on my experience, in most cases, at the end of the day, an effort which I had to devote to learn brought me gain.

As you said, a new component introduces new problems. Maintaining is a great example. Another benefit of custom implementation of any problem is that you get familiar with the mechanism of saving events more than me for instance. So you became an expert :)

You've mentioned about concurrent writers who are saving events at the same time to the event store. I thought that each event is an independent object with a unique identifier. Even if both events are saving at the same time, they will produce (based on your example) separate entries in the database. I thought that saving an event is one thing and processing it is something slightly different. The responsibility for handling an event is on the event consumer.
Maybe I didn't misunderstand the example.

Thread Thread
 
kspeakman profile image
Kasey Speakman • Edited

I suppose I didn't provide enough context in how this is used. (The post was already so large.) Probably the best background is Greg Young's excellent series of articles taking you from a traditional approach to event sourcing.

Events belong to a stream. It parallels very well to git. In git terms, an event is a commit and a stream is a branch. A concrete example: I have Training streams to represent user training. The stream starts with RegistrationCreated { StreamId, TraineeId, CourseId, DueDate, etc.}. It could be that after a Training stream is created, these two things happen simultaneously:

  • An admin cancels the registration.
    • Using the UI, they issue a command CancelRegistration.
    • The Training is currently active, so this is a valid request.
    • The RegistrationCanceled event is generated.

  • A trainee begins training in the same registration by watching a video.
    • Using the UI, they issue a StartVideo command.
    • The Training is currently active and has a video, so this is valid.
    • The VideoStarted event is generated.

Both of these are trying to make divergent changes to the same stream at the same instant in time. The first one to commit events into the stream succeeds. (Using the Stream table described in the post) the second one sees that the stream has changed versions while it was processing the StartVideo command, so it will detect the conflict and probably provide an error back to the trainee.


That was the command side. There is another side to this coin, the query side. In this case there is no business logic to run for user queries (aside from validating queries are well-formed, and the user is permitted to run it).

The models on the query side are built from events as they come in. A concrete example would be when a trainee logs in, they are presented with a list of training they can take. How do we build that list? A service listens for events that affect this list and makes changes to a relational model.

  • RegistrationCreated inserts a new row using data on the event
  • RegistrationCanceled deletes the appropriate row
  • TrainingCompleted deletes the appropriate row

There could be another view for Training History that only inserts rows when it sees TrainingCompleted events. Basically we can create any view of the data we want by replaying events onto it.

This is perhaps the place where RabbitMQ could be useful, to deliver notifications to listeners as new events occurred. However, RabbitMQ would only be part of the solution. You see, listeners usually need to replay streams from a given starting position. For example, I deploy a new listener and let it rebuild from the first event. Another example, a listener experiences an outage and when it starts back it wants to start from the last event it saw. RabbitMQ doesn't have the old events anymore since it was already delivered to all subscribers at the time it was generated. So listeners still need the capability to read from the database from where they left off.

Thread Thread
 
rafalpienkowski profile image
Rafal Pienkowski

It makes sense. Thanks for deeper explanations. Cheers.

Collapse
 
vgough profile image
Valient Gough

"Allows efficient queries based on position. i.e. WHERE SequenceNum > @LastSeenSeqNum"

A good test is to have many parallel threads inserting events and have another thread periodically issuing this query and checking that what it retrieves doesn't contain gaps.

Postgres assigns the serial number at the start of each transaction that inserts the event. If the transactions commit out of order (which postgres is happy to do!) then the query can return gaps which are later filled in. The net effect is that checking against the LastSeen value means you can miss updates as they became visible to your query out of order. Search for "how to create a gapless sequence in postgres".

Collapse
 
kspeakman profile image
Kasey Speakman

Thanks for pointing this out. I missed this the first time around, but this was pointed in my subsequent article.

I discovered the same gapless sequence article and posted an updated solution in the comments, which can properly handle concurrent writers.

Collapse
 
dmfay profile image
Dian Fay

That's interesting! You're not concerned with modeling relationships between "snapshots", I take it?

I've been working on an application based on this pattern using triggers to project events into state tables. It's a bit funny just how much room there is in the definition of "event sourcing"...

Collapse
 
kspeakman profile image
Kasey Speakman • Edited

There could be a misunderstanding here on what snapshots are for. What I described above is only the event storage. It does not include read models (aka projections).

I see why that is confusing. The missing piece here is the assumption of separating commands and queries. The "write" model used by commands exists nowhere on disk as a relational model. If it did, it would add deployment complications like having to stop the world to update changed schemas. When I handle commands, I always rebuild the current state of the stream from events. However, replaying particularly large streams can slow things down. Snapshots are only an aid to speed up replay for commands, not something to be used directly.

In contrast, read models (not represented in the article) are kept updated as events happen in the system. They are read as-is (preferably) without any rebuild step. They could be fully relational, key-value, document, graph, or whatever. (Also, it is fine for the command side to query read models for related information, like settings.)

I hadn't seen that project before. Thanks for linking that.

Collapse
 
radumas profile image
Raphael Dumas

Just wanted to nudge you to add the #postgresql tag :)

Collapse
 
kspeakman profile image
Kasey Speakman

Good catch. I added it.