loading...

CQRS In Microservices - A Practical Example

jeastham1993 profile image James Eastham Originally published at jameseastham.co.uk ・8 min read

In my last post, I went through a thought experiment regarding CQRS across separate microservices.

This breaks a lot of my conceptions around microservices. Mainly, one database should only ever be used by one service.

In case you haven’t read my previous post (or simply just don’t want to read two posts of mine) here is a rundown.

CQRS Microservices – The Theory

CQRS, stands for Command Query Request Segregation. It’s a pattern for manipulating and querying data by completely separating the two interactions.

  • Data is manipulated (Create/Update/Delete) by commands. It’s then retrieved by queries. There are a number of benefits to this pattern:
  • Speed – Users will normally expect a response when GET-ing data to be quicker than manipulating. Having different methods for each type of interaction can then be optimized in different ways
  • Data Models – Having different data models for reads and writes is an immensely powerful tool. A lot of end-users don’t care about integer foreign keys. But do care about the relationships them FK’s give.
  • Caching – Separating out concerns, allows for a completely different method of storing/retrieving data. Including a super-fast cache for GET requests. Implementing two separate microservices is the focus of this article. One for commands and one for queries.

The system will cover:

  1. Data is sent via HTTP POST and created in an MS SQL Database
  2. Data is retrieved via HTTP GET, first looking at a Redis cache before going direct to the database
  3. The retrieved data will have a different and more ‘end-user friendly’ data model than the sent.

The Write Service

The write service is a simple .NET Core Web API that uses Entity Framework to create records in the database.

There is a single API endpoint that allows the creation of an order.

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
    private readonly IOrderRepository _orderRepo;
    private readonly IPublishEndpoint _publishEndpoint;

    public OrderController(ILogger<OrderController> logger
        ,IOrderRepository orderRepo
        ,IPublishEndpoint publishEndpoint)
    {
        this._orderRepo = orderRepo;
        this._publishEndpoint = publishEndpoint;
    }

    [HttpPost]
    public async Task<Order> Create([FromBody] Order orderToCreate)
    {
        if (orderToCreate is null)
        {
            throw new ArgumentNullException(nameof(orderToCreate));
        }

        orderToCreate = await this._orderRepo.CreateAsync(orderToCreate);

        await this._publishEndpoint.Publish(new NewOrderAddedEvent{
            OrderId = orderToCreate.Id
        });

        return orderToCreate;
    }
}

The order object itself is also extremely simple.

public class Order
{
    [JsonProperty(PropertyName = "orderLines")]
    private List<OrderLine> _orderLines;

    [JsonProperty]
    public int Id { get; private set; }

    [JsonProperty]
    public string OrderNumber { get; private set; }

    [JsonProperty]
    public DateTime OrderDate { get; private set; }

    [JsonIgnore]
    public IReadOnlyCollection<OrderLine> OrderLines => _orderLines;
}

public class OrderLine
{
    [JsonProperty]
    public int Id { get; private set; }

    [JsonProperty]
    public string Product { get; private set; }

    [JsonProperty]
    public decimal Value { get; set; }

    [JsonProperty]
    public int? CategoryId { get; set; }

    [JsonIgnore]
    public virtual Category? Category { get; set; }

    [JsonProperty]
    public virtual int OrderId { get; private set; }

    [JsonIgnore]
    public virtual Order Order { get; set; }
}

The controller offloads a lot of the grunt work, to two services injected using DI. These are an order repository, which is a simple wrapper around EF and also IPublishEndpoint.

IPublishEndpoint

IPublishEndpoint comes from the rather fantastic MassTransit library. Mass Transit provides abstractions around common Event Bus technologies.

In this instance, I am using RabbitMQ as the Event Bus.

When a service is only concerned with publishing events, the configuration for Mass Transit is extremely simple, just add the following nuget packages and the below code to your Startup.cs file.

dotnet add package MassTransit
dotnet add package MassTransit.AspNetCore
dotnet add package MassTransit.Extensions.DependencyInjection
dotnet add package MassTransit.RabbitMQ
services.AddMassTransit(provider =>
    {
        var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri("rabbitmq://localhost/"), h => { });
        });

        services.AddSingleton<IPublishEndpoint>(bus);
        services.AddSingleton<ISendEndpointProvider>(bus);
        services.AddSingleton<IBus>(bus);

        bus.Start();
    });

Events and Contracts

To digress slightly for a moment, I want to quickly mention event contracts. As you can see in the code for the OrderController, the IPublishEndpoint sends a NewOrderAddedEvent. Which you can see the code for here

public class NewOrderAddedEvent : NewOrderAdded
{
    public int OrderId { get; set; }
}

For Mass Transit to correctly route the events, both the publisher and the consumer must use the same event. To keep this nice and simple, I created a shared class library named Contracts, which holds a set of interfaces that detail the contents of the events.

Both publisher and consumer can then both access the same class library and any event changes are easily implemented in multiple services.

I tend to stay away from sharing code between multiple services in the vast majority of cases. But the tradeoff that having interfaces to map my events brings is worth the potential extra deployment effort.

Write Service In Summary

To quickly summarise, the write service:

  • Exposes a POST API endpoint, allowing people to create a new order
  • Commits that order to a MS SQL Databsae using Entity Framework
  • Publishes a NewOrderAdded event into a RabbitMQ event Bus

The Read Service

For the read service, I have again stuck with using a .NET Core Web API. There is no relationship between the read and write API aside from the shared event contracts.

The read API has no concept at all of an Order object or Entity Framework, as far as the data model in the write service goes.

It exposes a single GET endpoint, which will simply return all the orders that the service is currently aware of.

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
    private readonly OrderRepository _orderRepo;

    public OrderController(OrderRepository orderRepo)
    {
        this._orderRepo = orderRepo;
    }

    [HttpGet]
    public async Task<IEnumerable<Order>> Get()
    {
        var orders = await this._orderRepo.GetAndStoreInCacheIfEmpty();

        return orders;
    }
}

As you can see, the read API does still have a concept of an ‘Order’ but looking at the data model you can see there are some, albeit trivial, differences.

public class Order
{
    public int Id { get; set; }

    public string OrderNumber { get; set; }

    public DateTime OrderDate { get; set; }

    public int LineCount { get; set; }

    public decimal TotalValue { get; set; }
}

For reading data using the current API functionality, I don’t actually care about the OrderLine information. Instead, I just want to show a summary. (imagine a high-level list view of all orders in a web interface).

This is one of the huge benefits of using CQRS, easily allowing a different data model.

Obviously, this is completely possible using Entity Framework and LINQ. However, I’ve never been a fan of complicated grouped LINQ statements from either a legibility or performance point of view.

Order Repository

So how does the order repository actually retrieve data? It combines Redis and a direct connection to the SQL database. Here is the code for the GetAndStoreInCacheIfEmpty method.

public class OrderRepository
{
    private readonly IDistributedCache _distributedCache;

    public OrderRepository(IDistributedCache distributedCache)
    {
        if (string.IsNullOrEmpty(DbSettings.ConnectionString))
        {
            throw new ArgumentException("Database connection string cannot be null or empty", nameof(DbSettings.ConnectionString));
        }

        this._distributedCache = distributedCache;
    }

    public async Task<IEnumerable<Order>> GetAndStoreInCacheIfEmpty()
    {
        IEnumerable<Order> orderResponse = null;

        var cachedOrderData = await this._distributedCache.GetStringAsync(InstanceSettings.InstanceIdentifier);

        if (cachedOrderData == null)
        {
            using (var sqlConnection = new SqlConnection(DbSettings.ConnectionString))
            {
                await sqlConnection.OpenAsync();

                orderResponse = await sqlConnection.QueryAsync<Order>(GET_QUERY);
            }

            await this._distributedCache.SetStringAsync(InstanceSettings.InstanceIdentifier, JsonConvert.SerializeObject(orderResponse));
        }
        else
        {
            orderResponse = JsonConvert.DeserializeObject<IEnumerable<Order>>(cachedOrderData);
        }

        if (orderResponse == null)
        {
            orderResponse = new List<Order>(0);
        }

        return orderResponse;
    }

 private const string GET_QUERY = @"SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

        SELECT [Orders].Id
    ,[Orders].OrderNumber
    ,[Orders].OrderDate
    ,COUNT([OrderLines].Id) as LineCount
    ,SUM([OrderLines].Value) as TotalValue
FROM [Orders]
LEFT JOIN [OrderLines] on [OrderLines].OrderId = [Orders].Id
GROUP BY [Orders].Id
    ,[Orders].OrderNumber
    ,[Orders].OrderDate";
    }

On Startup, the read service assigns itself a random GUID identifier. When querying for data, this GUID is used by Redis to see if there is already any data in the cache. Remember, we could potentially be running 10’s of this service.

After the cached data is retrieved if it is NOT null the code is nice and simple. It deserializes the cached order data to a set of order objects and returns. Simple!

If the cache is NULL for a specific instance, then a query is run directly against the SQL database. Using Dapper, the query results are mapped directly to a set of Order objects.

If a direct query is used, the Redis cache is updated before the method returns.

Handling a new order

As you saw in the write service when a new order is created a NewOrderEvent is raised to the EventBus.

In this specific scenario, I need the read services to be as near to up to date as possible. Using Events makes this really straightforward.

Again, I am using MassTransit for the event bus interactions, but the configuration looks slightly different from a Startup.cs perspective.

services.AddMassTransit(provider =>
    {
        provider.AddConsumer<NewOrderConsumer>();

        var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri("rabbitmq://localhost/"), h => { });

            cfg.ReceiveEndpoint(host, InstanceSettings.InstanceIdentifier, e =>
            {
                e.Consumer<NewOrderConsumer>(services.BuildServiceProvider());
            });
        });

        services.AddSingleton<IPublishEndpoint>(bus);
        services.AddSingleton<ISendEndpointProvider>(bus);
        services.AddSingleton<IBus>(bus);

        bus.Start();
    });

As you can see, the setup is broadly the same aside from two additional lines:

provider.AddConsumer<NewOrderConsumer>();

e.Consumer<NewOrderConsumer>(services.BuildServiceProvider());

All these two lines of code are doing, are creating the links between the EventBus and the specific event consumer named NewOrderConsumer. Let’s have a quick look at the code for the NewOrderConsumer.

public class NewOrderConsumer : IConsumer<NewOrderAdded>
{
    private readonly OrderRepository _orderRepo;

    public NewOrderConsumer(OrderRepository orderRepo)
    {
        this._orderRepo = orderRepo;
    }

    public async Task Consume(ConsumeContext<NewOrderAdded> context)
    {
        await this._orderRepo.AddSpecificOrderToCache(context.Message.OrderId);    
    }
}

Not an awful lot to it…

The important part is the inheritance from IConsumer, passing in a Type argument of the NewOrderAdded interface. If you remember, this is the same interface that the NewOrderAdded event in the write service inherited from.

When a new order event is handled, the Id (SQL Primary Key) of the order is passed into the order repository.

public async Task AddSpecificOrderToCache(int newOrderId)
{
    List<Order> orderResponse = null;

    var existingCacheDate = await this._distributedCache.GetStringAsync(InstanceSettings.InstanceIdentifier);

    using (var sqlConnection = new SqlConnection(DbSettings.ConnectionString))
    {
        if (existingCacheDate == null)
        {
            await sqlConnection.OpenAsync();

            orderResponse = (await sqlConnection.QueryAsync<Order>(GET_QUERY)).ToList();

            await this._distributedCache.SetStringAsync(InstanceSettings.InstanceIdentifier, JsonConvert.SerializeObject(orderResponse));
        }
        else
        {
            orderResponse = JsonConvert.DeserializeObject<List<Order>>(existingCacheDate);

            var newOrder = await sqlConnection.QueryFirstOrDefaultAsync<Order>(GET_SPECIFIC_QUERY, new {orderId = newOrderId});

            if (newOrder != null)
            {
                orderResponse.Add(newOrder);

                await this._distributedCache.SetStringAsync(InstanceSettings.InstanceIdentifier, JsonConvert.SerializeObject(orderResponse));
            }
        }
    }

    if (orderResponse == null)
    {
        orderResponse = new List<Order>(0);
    }
}

    private const string GET_QUERY = @"SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

    SELECT [Orders].Id
,[Orders].OrderNumber
,[Orders].OrderDate
,COUNT([OrderLines].Id) as LineCount
,SUM([OrderLines].Value) as TotalValue
FROM [Orders]
LEFT JOIN [OrderLines] on [OrderLines].OrderId = [Orders].Id
GROUP BY [Orders].Id
,[Orders].OrderNumber
,[Orders].OrderDate";

    private const string GET_SPECIFIC_QUERY = @"SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

    SELECT [Orders].Id
,[Orders].OrderNumber
,[Orders].OrderDate
,COUNT([OrderLines].Id) as LineCount
,SUM([OrderLines].Value) as TotalValue
FROM [Orders]
LEFT JOIN [OrderLines] on [OrderLines].OrderId = [Orders].Id
WHERE [Orders].Id = @orderId
GROUP BY [Orders].Id
,[Orders].OrderNumber
,[Orders].OrderDate";
}

Again, it first checks to see if there is any data at all in the cache. If there isn’t, the full query of all orders from the database runs.

However, if there is already data in the cache a slightly different query runs. This time, a query is run directly against the SQL database using the PK passed into the method.

If a record is found, it is added to the Deserialized List of existing cache data and then written back to the cache.

Read Service In Summary

To quickly summarise, the read service:

  • Stores a copy of all order data in a Redis cache
  • Returns a full list of orders from the cache if not empty, if the cache is empty a query runs directly against the database
  • Handles a new order being created, and instantly stores the new order data in the cache

The System In Summary

There we have it! CQRS split across two separate microservices that provide near real-time updates when new data is created.

There is also, a really simple API gateway built using Ocelot to allow for a single URL to both read and write. However, that’s a conversation for another day.

As I hope you’ve realized whilst reading the code, it is nowhere near production-ready.

End users would want to search for orders, view the lines and all the other crazy things that end-users want to do with their own data.

As a conceptual model for moving forward though, the theory stands up. It’s certainly been a great learning exercise for me.

I’d love to hear your thoughts, good or bad, in the comments.

You can also have a look at the source code and even built it yourself if you so wish at https://github.com/jeastham1993/cqrs-microservices.

First published at https://www.jameseastham.co.uk/microservices/cqrs-in-microservices-a-practical-example/

Discussion

pic
Editor guide
Collapse
thatham profile image
Venkatesh Thatham

I'm unable to publish an event using IPublishEndpoint from ConsumerContext instance. The source code can be found here github.com/thatham/test

Appreciate your valuable suggestion

Collapse
jeastham1993 profile image
James Eastham Author

Sorry if I've missed the code, but it looks like you have a hosted service for your BusControl (github.com/thatham/test/blob/maste...) but you never actually add the hosted service to your Startup.cs...

Collapse
thatham profile image
Venkatesh Thatham

It is mentioned here in a method called "services.AddRabbitMQDependencies()" in startup.cs

Thread Thread
jeastham1993 profile image
James Eastham Author

I see that now, thanks for pointing that out.

What exactly isn't working?

Thread Thread
thatham profile image
Venkatesh Thatham

If you try to execute the tenant->post of service project, the message will be sent using "endpoint.Send" method. I also have a consumer class (Handler in CQRS) for the command, upon receiving the message the event will be registered in presisted event store database(here it's cosmos DB).

Then, I try to publish an event using IPublishEndpoint which is not working.

Thread Thread
thatham profile image
Venkatesh Thatham

Looking forward to be hearing from you...