DEV Community

Anthony Dodd
Anthony Dodd

Posted on • Updated on

Distributed State Management - Without the "E"

Come, let us journey into the depths of software, into the heart of application data and state management. We will explore an event-less pattern, relying only on our trusty old friends Postgres and gRPC.

On this journey, we will explore how a software team can manage their data in a distributed microservices architecture, where state "ownership" is divided between various microservices.

Setting the Stage

To set the stage, let's say that the software team we are journeying with is in the business of hosting a key/value store for customers — let's call it KVX — and runs everything in the cloud on a Kubernetes platform.

This team of ours loves Postgres (who doesn't), is pretty good with gRPC, but is not at all into the world of events, event processing, EDA, none of that.

This is presented as our challenge on this journey. Anything to do with events seems to be a serious blocker. We need to find a way to build a robust platform, but if our solution has anything to do with eventing patterns, then we've failed.

Break it Down

If we can't dig into anything related to the "E" word, then we are left with a somewhat narrow scope. Various technologies like Kafka, Nats, RabbitMQ, and any of the myriad other tools in the ecosystem which may help on this front, all of that is off the table.

So let's get down to the basics. First principals. What are we even trying to do? What is the problem which needs to be solved? Here's what we know:

  • We need to propagate state changes from one transactional service to others, maybe even call a few external cloud provider APIs and the like.
  • We're pretty good at handling requests using gRPC.
  • We can persist data in Postgres using transactions pretty well.
  • Responding to customer requests, we're pretty good at that too.

So what's the problem then? Well, this whole microservices thing. Often times when a microservice finishes updating its database tables, we need to do some asynchronous work. Ok, we've recorded what the user has requested, but now we need to actually make some things happen. We need to provision some KVX instances, we need to make some calls to our cloud provider to provision some infrastructure, we need to make some real things happen.

To state the problem succinctly: we need to drive the state of our system to match the state which the user has requested. The aspired state needs to become the current state.

Take a Shot

Given our constraints about the "E" word and all, our options are somewhat limited. The only tools we really have to work with are our trusty old Postgres database, gRPC ... and well, that's about it. Let's see what we can do.

When a user tells us that they want to provision a new KVX instance on our cloud platform, we need to be sure that we actually record their request in Postgres. This algorithm is simple enough:

  • A request comes in on one of our gRPC services, so we handle the request,
  • Next we open a Postgres transaction (we're good at this) and write some data,
  • Once we've committed our transaction, we respond to the user telling them that they've received their request and we'll get started on provisioning their new KVX instance right away.

We've responded to the user, we know what they want. Now what? How do we do something with that data in Postgres? It's just sitting there.

Work Templates

Because we are avoiding anything to do with the "E" word, we need to keep our solution simple. Along with our standard data models, we will be creating records in our database called "work templates", and we will store these records in a table called work. To make things even more simple, let's say we store the specification of the work which needs to be done as JSON in a JSONB column in our work table. Let's say the table looks like:

                                     Table "work"
 Column |           Type           | Nullable |             Default
-------------+--------------------------+----------+----------------------------------
 id     | bigint                   | not null | nextval('work_id_seq'::regclass)
 time   | timestamp with time zone | not null | now()
 spec   | jsonb                    | not null |
Indexes:
    "work_pkey" PRIMARY KEY, btree (id)
Enter fullscreen mode Exit fullscreen mode

With this approach, we will need to have some tasks in our code which poll Postgres periodically to check for work to be done. If we're feeling advanced, we could use Postgres LISTEN/NOTIFY to make things a bit more real-time. Our worker tasks will need to lock a row in the work table in order to process it, maybe just using a simple DELETE FROM work WHERE id=ANY(SELECT id FROM work ORDER BY id LIMIT 1) RETURNING *; query within a transaction.

While that record is held in limbo by our transaction, we can now safely begin our work without worrying about another worker attempting to process the same record. If some error takes place and we need to rollback, that work will be available for later processing. If we break down our units of work small enough we can apply the following pattern:

  • Grab a record from our work table, and hold onto that record in our transaction.
  • Do the work described therein. Maybe we call a few other microservices, call Kubernetes, update a few other database rows.
  • If there is additional work which needs to be performed, but we want it to be its own isolated unit of work, we can write a new record to our work table describing the work to be done, which will be picked up later by another worker.
  • Finally we will commit our transaction, and that unit of work will now be done.

Nice! A successful eve... I mean, work processing implementation. Let's look at the edge cases.

Errors

We've got a few different types of errors that we need to worry about here. On the highest of levels, we can categorize our errors as being either transient or intransient errors.

Transient errors are things like network blips, VM restarts, pods being suddenly rescheduled in Kubernetes and work being interrupted. The idea is that these errors are not permanent. The next time your code is able to execute, things will quite likely work as they should.

Intransient errors are errors which will not resolve themselves. Someone misspelled the name of our werk table, someone forgot to commit a transaction, maybe some boolean logic was wrong, so on and so forth. An engineer will typically need to fix the bug and deploy the updated code. Once the code has been deployed, the expectation is that the system will begin to work as needed once again.

Great, so error handling is covered. Worst case scenario, we need to deploy some bug fixes, but then things will be back to normal.

Idempotence

This is a slightly more of a difficult issue. In order to make our workers idempotent, we will need to ensure that our algorithms expect to be retried. We will need to account for the fact that the various other microservices or external APIs (cloud providers, Kubernetes &c) may have already been successfully called as part of this unit of work, but that a failure may have taken place after the fact.

It is important to note that retires are critical for state consistency. If we are actually attempting to drive our system's state to match the user's requested state, then we can't just give up at the first sign of trouble. Think about how terrible of an experience that would be for the user. What's worse? Think about all of the inconsistent half-state we would have sitting around. Consider this:

  • If we run into an error after successfully provisioning a new EC2 instance for a user in AWS, but then we just decided to abandon our work and not drive it to completion because we ran into some transient error after the fact, we would end up with lots of orphaned EC2 instances in AWS ... and we would also end up with a LARGE bill at the end of the month.
  • Similarly, if we are making gRPC requests to another microservice from our worker, and that peer microservice has already committed a transaction as part of our gRPC request, but then we decide to rollback our work because of some transient error after the fact, we now have a partially propagated state change. We can't just leave it there. What if that service was responsible for billing our customers. They get charged even though their KVX service was never provisioned? Not a good idea.

The point is hopefully now clear. Retries are an integral part of ensuring that we reliably propagate state changes throughout the system. In order to make this work properly, we need to ensure that our algorithms are crafted to expect such conditions.

As a final note on this subject, if one of our units of work is not safe to retry — E.G., sending emails — it is often the case that email providers offer their own idempotency mechanism to guard against duplicate requests. If such is not available, then we are by definition dealing with an "at most once" constraint, and the algorithm must be crafted to ensure that the critical section will never be retried. The solution can be simple: commit your transaction first, then try to send the email. If it fails, well ... the user will have to request that a new email be sent. Really, just use an email provider which provides an idempotent interface.

Synchronization

Awesome, we've covered some serious ground, we even have an idempotent eve... I mean, worker system. What next? Well, we need to be sure that we avoid race conditions.

Race conditions are endemic to distributed systems. Engineers need to stay vigilant to ensure that their algorithms are race condition free. This is especially a problem with distributed systems because the scope is no longer isolated to an individual process. We are now dealing with lots of different microservices, all of them communicating with each other at various points in time, updating state and making various sorts of changes to the system.

Many different conditions can trigger race conditions. Nearly any user-facing feature is subject to the possibility of race conditions. These can be especially dangerous when we are dealing with money, infrastructure, and other things which are more than just "data".

We are using Postgres for everything, so this is actually an easy problem for us to solve. Any work initiated in our system related to a customer's KVX instance will simply cause the has_active_work column to be set to true for the respective KVX instance. This update takes place as part of the transaction initiated by a user request, and the update takes place directly on the database row for that specific KVX instance.

If a user attempts to make another request to change some aspect of their KVX instance, we can kindly inform them that they still have some active work taking place for their KVX instance, and that they should try again when it is finished. Presenting a nice UI on top of this data is quite simple.

Once a worker task has finished all of its work, as part of its transaction it will set that has_active_work column to false for the target KVX instance.

Problem solved. We lock out race conditions before they have a chance to get into the system. However, we need to remain vigilant! Bugs can always creep in. A little documentation and peer review can go a long way to keep this pattern robust.

Stigma

Wow, that's got to be everything right? We've got a production grade eve... I mean, worker system now. But what will everyone else say about this?

There is some stigma associated with this approach. Typically, the criticism is that this approach espouses tight coupling between services. If a worker in service-abc (a microservice) needs to call service-xyz (another microservice), then we have created a dependency chain. In order for service-abc to function, service-xyz and any other services it depends upon must be available. This happens to also be true for any external 3rd party APIs such as AWS, Kubernetes, email providers, or any other such services which our workers in service-abc depend upon.

Practically speaking, this is inescapable for 3rd party resources which a team might depend upon. If a team needs to orchestrate the AWS API ... it doesn't matter how deep you burry that code under layers of abstraction ... sooner or later, the AWS API will need to be called. Is anyone really trying to remove that constraint? What would that even practically mean? A new dependency, that's what it would mean.

Maybe this isn't such an issue after all. We can generate alerts and observability data when we are having trouble accessing our dependencies. Maybe having explicit dependencies can be a good thing. Let's roll with that perspective for now.

Scale

Perhaps a criticism might be that the "work template" pattern just won't scale. All of those gRPC calls. All of that dependency management. However, none of these are concrete arguments. Data flowing over a network is going to be common to any distributed system (practically speaking), and dependencies will always exist no matter where you put them or how deep you burry them.

"Well ... Postgres won't be able to handle all of those worker queries at scale." I would counter by saying that Postgres can be quite surprising. Benchmarks would be needed; however, there is truly no doubt that even a single Postgres database instance can handle a surprising scale of throughput. Maybe if this were to become an issue, we could look into CockroachDB, TimescaleDB, or one of the other alternatives.

PID Controllers

A final note that I would like to make on this subject overall is that this worker system we've defined reminds me a bit of a simple proportional controller — a type of PID controller — which is far more common in the embedded systems space.

The templates of work can be seen as an error signal. The system is currently in state "x", but the user wants it to be in state "y", do some work to bring the system into state "y". With this model, an entire system can be seen as a series of controllers reconciling their observed state with the desired state, and then they take action to drive the system to the desired state.

Conclusion

We did it. All that's left to do is to give some feedback. What do you think? Would you build a system like this? What are we missing?

Any and all feedback is welcome! Cheers 🍻

Top comments (2)

Collapse
 
feikesteenbergen profile image
Feike Steenbergen

I would be very happy with such a system. I've ran some queuing systems on PostgreSQL before (without partitioning), that could easily handle thousands of requests per second.
Partitioning may be useful if you do run more than a few dozens eve... per second though, to counter bloat of your tables/indexes.

Collapse
 
feikesteenbergen profile image
Feike Steenbergen • Edited
- It is important to note that retires are critical for state consistency
+ It is important to note that retries are critical for state consistency
Enter fullscreen mode Exit fullscreen mode
- we respond to the user telling them that they've received their request
+ we respond to the user telling them that we've received their request
Enter fullscreen mode Exit fullscreen mode