DEV Community

Dennis
Dennis

Posted on

Improving application resilience with Channels

In this blog I'm going to briefly explore the Channel object in dotnet. I'll give a short introduction to what a Channel is, what you use it for and I'll show a small example how you might use the Channel object.

The what?

Microsoft describes channels as: "An implementation of the producer/consumer conceptual programming model". What it means is that this device stores work items as an ordered to-do list. A "worker" is then able to claim a work item and process it.

Why?

For dotnet applications, this device gives you an easy way to funnel work to a background worker while letting your request finish, even before the work is actually done. If, for example, you want to track every page view in your application, then you need to send a signal to your database on every request. According to microsoft, a well designed app could handle thousands of concurrent requests, but should your database also handle that much traffic? By handling work such as this in the background, you get two benefits:

  1. Your pages load faster
  2. The pressure on your external resources is reduced and more consistent

How?

I'm going to showcase the page view example from the previous chapter using some code snippets to illustrate the usage of the Channel object. We're going to need 3 things:

  1. A communication model
  2. A queue object
  3. A consumer

Communication model

In order to transfer the information from the request thread to the background worker, we define a model:

public record PageViewCommand(int PageId, DateTime ViewDateTime);
Enter fullscreen mode Exit fullscreen mode

Queue

The queue is where we will use the channel. This object is a wrapper around the channel that will help us control the channel configuration and makes it easier for us to obtain the queue through dependency injection:

public interface IPageViewQueue
{
    ValueTask<PageViewCommand> ReadAsync(CancellationToken cancellationToken = default);

    ValueTask WriteAsync(PageViewCommand item);
}

internal sealed class PageViewQueue : IPageViewQueue
{
    private readonly Channel<PageViewCommand> _queue;

    public ClientErrorProcessorQueue()
    {
        // A bounded channel creates a channel with a maximum amount of items.
        // I use this option so that the amount of memory usage by the channel stays within reasonable limits.
        // I set the maximum amount of items to 10.000.
        var options = new BoundedChannelOptions(10000)
        {
            // I configure the channel to discard any new items as long as the queue is full
            // I do this, because I find it acceptable to lose a few page views in case the application is extremely busy.
            // This ensures that I don't accidentally fill up my queue to the point that I run out of memory
            FullMode = BoundedChannelFullMode.DropWrite
        };

        _queue = Channel.CreateBounded<PageViewCommand>(options);
    }

    public ValueTask WriteAsync(PageViewCommand item)
    {
        ArgumentNullException.ThrowIfNull(item);
        return _queue.Writer.WriteAsync(item);
    }

    public async ValueTask<PageViewCommand> ReadAsync(CancellationToken cancellationToken = default)
    {
        var result = await _queue.Reader.ReadAsync(cancellationToken);
        return result;
    }
}
Enter fullscreen mode Exit fullscreen mode

Remember to register the queue in your dependency injection container:

services.AddSingleton<IPageViewQueue, PageViewQueue>();
Enter fullscreen mode Exit fullscreen mode

Consumer

The consumer is the background worker that will read commands from the queue and execute them:

// ๐Ÿ‘‡ BackgroundService is an implementation of `IHostedService` that starts a service on startup and cancels execution upon shutdown
internal sealed partial class PageViewWorker(IPageViewRepository pageViewRepository, IPageViewQueue queue, ILogger<PageViewWorker> logger)
    : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // You always want to wrap the whole operation inside a try/catch statement to ensure that the worker keeps running when something goes wrong.
            try
            {
                var item = await queue.ReadAsync(stoppingToken);
                await pageViewRepository.Insert(item.PageId, item.ViewDateTime);
            }
            catch (OperationCanceledException e)
            {
                // No need to do anything here. The process is stopping, the exception signals cancellation
            }
            catch (Exception e)
            {
                LogBackgroundProcessingFailure(logger, e);
            }
        }
    }

    [LoggerMessage(LogLevel.Error, "Failed to process an item in the queue")]
    private static partial void LogBackgroundProcessingFailure(ILogger logger, Exception e);
}
Enter fullscreen mode Exit fullscreen mode

Remember to register the worker in your dependency injection container:

services.AddHostedService<PageViewWorker>();
Enter fullscreen mode Exit fullscreen mode

Producing work

All that is left is to throw work on the queue for processing. You may for example create a middleware:

public class PageViewMiddleware(RequestDelegate next, IPageViewQueue queue)
{
    // ๐Ÿ‘‡ ICmsContext here is a made-up interface that resembles a device that returns a content page model corresponding to the current request from your preferred CMS.
    public async Task InvokeAsync(HttpContext context, ICmsContext cmsContext)
    {
        var currentContent = cmsContext.GetCurrentContent();
        await queue.WriteAsync(new PageViewCommand(currentContent.Id, DateTime.Now));

        await next(context);
    }
}
Enter fullscreen mode Exit fullscreen mode

Alternatively, if you use a CDN for example, you may choose to create an API controller:

[ApiController]
[Route("api/[controller]/[action]")]
public class PageViewController(IPageViewQueue queue)
    : ApiController
{
    [HttpGet]
    public async Task<IActionResult> Register(RegisterPageViewRequest request)
    {
        await queue.WriteAsync(new PageViewCommand(request.PageId, DateTime.Now));

        return Accepted();
    }
}
Enter fullscreen mode Exit fullscreen mode

That is all you need to get started with channels. This blog is merely an introduction and a means to make you aware that this exists. If you want to read more in-depth information about this, I recommend checking out microsoft's documentation about channels. There is still a lot more to explore around channels.

If you're interested to see a practical example, I used this device in my UrlTracker package for Umbraco CMS and it delegates client error tracking to a background worker to reduce database traffic during requests for better performance.

I hope you learned something new today and maybe I'll see you in my next blog! ๐Ÿ˜Š

Top comments (0)