DEV Community

Cameron
Cameron

Posted on

BlockingCollection And You: A Producer-Consumer Tale

A full sample project is available to browse here.

Overview

I recently stumbled upon BlockingCollection and the Producer-Consumer pattern whilst researching an appropriate approach for a solution I was trying to build.

C#s BlockingCollection is quite a useful collection type, when used in the appropriate situations and applies to the Producer-Consumer pattern.

How does it work?

Typically, you would have services of some description (producers) that create data, usually in a task. These producers would then deposit their data payloads into the blocking collection. One blocking collection could service multiple producers.

To access the data in the blocking collection, you would typically have a 'consumer' service, again usually ran in it's own task to prevent blocks. This consumer service constantly checks the enumerator within the blocking collection for any newly added items to process.

This enumerator is a blocking call, so it should be ran within it's own task to run in the background. We'll go over how this works in a later section.

This process is illustrated in the below diagram, where there are three producer tasks adding data to the same BlockingCollection. One consumer task iterates around the BlockingCollection enumerator, constantly processing newly added items.

Untitled Diagram

Size Limits

When you're instantiating your collection, you have the option of specifying the maximum collection capacity in the constructor, or using the default constructor to have an infinite capacity.

For example, this snippet will instantiate a collection with a maximum capacity of 10:
var blockingCollection = new BlockingCollection<int>(10)

An this will instantiate an infinite capacity collection:
var blockingCollection = new BlockingCollection<int>()

The effects of this are covered in the below sections on adding and taking collection items.

Adding Collection Items

Add vs TryAdd

If you're anything like me, the second you see these two methods with this naming convention you instantly think of the likes of int.Parse() and int.TryParse(), where you're returned a boolean and receive your variable through the out keyword.

However, these functions work slightly differently.

Add

Add() is fairly straightforward, however it does have a caveat. Consider the following code snippet:

var blockingCollection = new BlockingCollection<int>(1)

_blockingCollection.ItemCollection.Add(1);
_blockingCollection.ItemCollection.Add(2);

Console.WriteLine("All items added.");
Enter fullscreen mode Exit fullscreen mode

Looks innocent enough, right? Not quite! Because we specified the bounded capacity parameter in the BlockingCollection constructor, our collection now has '1' as the maximum capacity. So what happens here, when we try to add two items?

The answer is fairly straightforward: The first Add() will succeed and '1' will be added to our collection. However, the second Add() will block the thread due to our collection being at capacity. Until items are removed from the collection to bring it below the capacity limit, our second Add() will continue to block. With this code, our console statement is never reached.

The solution? TryAdd()!

TryAdd

TryAdd() functions in a very similar fashion to Add(), with one main difference - it can escape blocking the thread. In the previous section we covered how Add() will block if the collection has a limit placed on it. Consider the following modified code from that section:

int additionTimeout = 1000;
var blockingCollection = new BlockingCollection<int>(1)

var firstAdditionSuccess = _blockingCollection.ItemCollection.TryAdd(1, additionTimeout);
var secondAdditionSuccess = _blockingCollection.ItemCollection.TryAdd(2, additionTimeout);

Console.WriteLine("All items added.");
Enter fullscreen mode Exit fullscreen mode

Almost the same, right? Here, we've declared an integer, additionTimeout, and passed it as a parameter into TryAdd. What's that about?

Basically, we're saying that we want to try and add our item into the collection, but if the collection is at capacity, we're only going to wait a specific period of time trying to add our new item. If that period of time is exceeded, we don't want to try and add the item anymore.

So essentially, additionTimeout is our time limit, and if that is exceeded, TryAdd returns false and unblocks the thread.

So in the above example, the first TryAdd is instantly successful as the collection count is below the limit. The second TryAdd will wait one second, and if the collection is still at capacity, it will return false and unblock itself.

Completing our collection

BlockingCollection works differently in that you need to actually tell it when to stop accepting new items, in effect completing the collection. By marking a collection as complete, no new items can be added, and any blocking consumers are lifted.

It's important to note that you cannot reset a completed collection; you'll have to re-instantiate it if you want to re-use it.

So how do you do that? It's been made nice and simple for us, we just need to call CompleteAdding() on our collection, and voila! It's now complete.

Consuming our items

Now that we've covered how we add items to our collection, what about actually retrieving them? There are three ways we can do this:

  • The GetConsumingEnumerable() method
  • The Take() method
  • The TryTake() method

You can probably guess that the two Take methods function pretty similarly to the two previously covered Add methods.

GetConsumingEnumerable

Calling this method will return you a blocking enumerable that you can use to iterate over the items of your collection.

There are some catches with using this, however. Consider the following snippet:

foreach (var item in _myBlockingCollection.GetConsumingEnumerable())
{
    Console.WriteLine($"Iterating over item {item}");
}

Console.WriteLine("Enumeration complete!");
Enter fullscreen mode Exit fullscreen mode

Now, you might think like other collections that we'll just iterate over what's currently in the collection, exit our for loop, then hit our final write line statement? Nope! What actually happens is that as soon as we access the enumerator, it blocks the current thread (hence why it's important to run your consumer on a separate task). It will print out our write line statement, removing each processed item as it goes, and if no new items have been added and the collection is empty, it will block the thread and wait for any new additions.

So, how do we go about cancelling this blocker? Remember when we talked about marking a collection as complete? You'll need to call that CompleteAdding() method, at which point this tells the enumerator that the collection has been marked as complete, and it will no longer block waiting for items once it finishes iterating through the remaining items.

Take

There's not much need to go into too much detail here due to the similarities with the Add() method, but Take() essentially functions the same. It will try to take a singular item from the collection, and blocks if there are no items present.

TryTake

Again, without going into too much unnecessary detail, TryTake() works in a similar fashion to TryAdd() where you can specify a time period that the method will wait for, after which it no longer blocks and frees up the thread.

Key Takeaways 🔑

The main points to take away from using BlockingCollection are:

  • Run your consumer(s) on separate tasks to prevent blocking your main app logic
  • Remember to mark your collection as complete when you have finished producing data
  • Make use of the IsAddingCompleted and IsCompleted boolean properties

I hope this post has been insightful into using BlockingCollection!

Discussion (1)

Collapse
aivarasatk profile image
aivarasatk

Something like an ActionBlock seems better in most cases. These both constructs give a "queue" like mechanism but ActionBlock takes away the blocking part of complexity