This article was originally published at: https://www.ahmetkucukoglu.com/en/event-sourcing-with-aspnet-core-02-messaging/
Since this article is second part of the article below, I recommend you to read the following article before starting. We will continue with the sample project in the article below.
In the previous article, we made the example of Kanban Board. By creating a RESTful API, we wrote the create, assign, move and complete endpoints. We recorded the requests coming to these endpoints as an event in the Event Store. So we focused on the store part of the Event Store. In this article, we will focus on with the messaging part.
We will include the following endpoint in our RESTful API endpoints.
Since the events are not queryable, the events must be stored as nosql documents or sql row to represent the whole. For example; We can't get tasks which their states appear as "Done, from the Event Store. For this reason, we need to keep the events as a whole in a queryable database. In this example, we will keep the tasks on the couchbase.
We run Couchbase in Docker with the command line below.
docker run -d --name couchbase -p 8091-8094:8091-8094 -p 11210:11210 couchbase
When the Couchbase has been run, you should enter the panel at the address below and create a cluster.
Let's create two buckets named "checkpoints" and "tasks" in Couchbase with the curl command lines below. Remember to replace the -u parameter with the information you have provided when creating the cluster.
curl -X POST -u [admin]:[password] http://localhost:8091/pools/default/buckets -d name=checkpoints -d ramQuotaMB=100 -d authType=none -d bucketType=couchbase
curl -X POST -u [admin]:[password] http://localhost:8091/pools/default/buckets -d name=tasks -d ramQuotaMB=100 -d authType=none -d bucketType=couchbase
We install the packages "CouchbaseNetClient" and "Couchbase.Extensions.DependencyInjection" to the API project with the following command lines.
dotnet add package CouchbaseNetClient -v 2.7.16 dotnet add package Couchbase.Extensions.DependencyInjection -v 2.0.2
We add Couchbase connection information to appsettings.json as follows.
In the Startup.cs file, we configure the Couchbase.
Each event has a position value in the Event Store. After recording the events read from the Event Store to Couchbase, we must keep the position value of the last event read. We will store this position value in the CheckpointDocument type.
Let's add a class named "CheckpointDocument.cs" in the Infrastructure folder. Let's paste the code below into the class.
We write the repository class to read the last position value that we have written to Couchbase or to record the position value.
Let's add a class named "CheckpointRepository.cs" in the Infrastructure folder. Let's paste the code below into the class.
We will store task information in TaskDocument type. Let's add a class named "TaskDocument.cs" in the Infrastructure folder. Let's paste the code below into the class.
We write the repository class to write the events read from the Event Store to Couchbase and to query the tasks from Couchbase.
Let's add a class named "TaskRepository.cs" in the Infrastructure folder. Let's paste the code below into the class.
In the save method, we add the relevant task according to the type of the event recorded in the EventStore and update the related information of the task.
In the Get method, we query the tasks from Couchbase by id .
Let's add CheckpointRepository and TaskRepository classes to DI Container in Startup.cs file as below.
We need the consumer to listen to the events from the Event Store. For this reason, we will use the Hosted Service in ASP.NET Core.
Let's add a folder named "HostedServices" to the project and add a class named "TaskHostedService.cs" into it. Let's paste the code below into the class.
In the line 33, we query the position value of the last event recorded to Couchbase.
In the line 42, we subscribe to the Event Store and start listening to the events. If lastCheckpoint value is not given, all events are read repeatedly whenever we run the project. With lastCheckpoint, we state that we will read the events coming after the last position value we have read.
In the line 47, we skip in order not to take action on the Event Store's own events.
In the line 52, we find the type of the event which is read.
In the line 53, we deserialize the event according to its type.
In the line 55, we skip in order not to take action for events other than these events.
In the line 58, we record the task to Couchbase.
In the line 60, we record the position value of the event read from the Event Store to Couchbase.
We add the endpoint where we will query the task to TasksController as follows.
Everything is ready now. We can run the API project. We can query the task we have added at the end of the previous article with the curl command line below.
You can access the final version of the project from Github.