loading...
Cover image for C# events as asynchronous streams with ReactiveX or Channels

C# events as asynchronous streams with ReactiveX or Channels

noseratio profile image Andrew Nosenko Updated on ・7 min read

The source code repro for this article can be found here.

As I am getting myself up to date with the modern C# language features, I'd like to share what feels like an interesting and natural use case for IAsyncEnumerable: iterating through a sequence of arbitrary events.

A (not so) brief introduction (TLDR, skip)

Support for asynchronous iterators (or "async streams") has been added to C# 8.0, and includes the following language and runtime features:

Asynchronous streams may come handy when implementing various producer/consumer scenarios in C#. IAsyncEnumerable and await foreach are just async counterparts for IEnumerable and foreach. Same as with IEnumerable<T> EnumSomething() or async Task<T> DoSomethingAsync(), when the C# compiler encounters async IAsyncEnumerable<T> EnumSomethingAsync(), it generates a special state machine class. The compiler breaks the logical execution flow within EnumSomethingAsync into multiple asynchronous continuations, separated by await or yield return operators. Prior to C# 8.0, it wasn't possible to combine these two within the same method. Now it is, and the whole set of the familiar Linq extension is now available as part of System.Linq.Async, to operate on the asynchronous stream of data generated via yield return or by other means, like below.

There is an abundance of great material on the web to help familiarize yourself with the concept of asynchronous streams. I could highly recommend "Iterating with Async Enumerables in C# 8", by Stephen Toub.

What I'd like to show here is how to turn a sequence of events of any kind of origin into an iterable async stream. While there are many ways of doing this, I'd like to focus on the following two: with Reactive Extensions (System.Reactive) or by using an unbound Channel from the relatively new System.Threading.Channels.

Producing async streams with Reactive Extensions

To illustrate this, I wrote a simple WinForms app (source gist) that has two independent event sources: a timer and a button. It's a contrived example but it's easy to play with and step through, to show the concept.

We turn timer ticks and button clicks into Reactive's IObservable observables with Observable.FromEventPattern. Then we combine two observables into one using Observable.Merge:

// observe Click events
var clickObservable = Observable
    .FromEventPattern(
        handler => button.Click += handler,
        handler => button.Click -= handler)
    .Select(_ => (button as Component, $"Clicked on {DateTime.Now}"));

// observe Tick events
var tickObservable = Observable
    .FromEventPattern(
        handler => timer.Tick += handler,
        handler => timer.Tick -= handler)
    .Select(_ => (timer as Component, $"Ticked on {DateTime.Now}"));

// merge two observables
var mergedObservable = Observable.Merge(clickObservable, tickObservable);
Enter fullscreen mode Exit fullscreen mode

Now we simply turn the combined observable into an instance of IAsyncEnumerable with ToAsyncEnumerable(), and we can asynchronously iterate though all events with await foreach as they occur:

// process events as async stream via ToAsyncEnumerable(),
// that's when the actual subscriptions happen, i.e.,  
// the event handlers get connected to their corresponding events
await ReadEventStreamAsync(mergedObservable.ToAsyncEnumerable(), cts.Token);
Enter fullscreen mode Exit fullscreen mode
static async Task ReadEventStreamAsync(
    IAsyncEnumerable<(Component, string)> source, 
    CancellationToken token)
{
    await foreach (var (component, text) in source.WithCancellation(token))
    {
        // e.g., delay processing
        await Task.Delay(100, token);
        Console.WriteLine($"{component.GetType().Name}: {text}");
    }
}
Enter fullscreen mode Exit fullscreen mode

Various LINQ operators can now be applied to the source stream above, like projection, filtering, etc.

Running it and clicking the button:

A simple Winforms app

Producing async streams with System.Threading.Channels

What if we don't want to involve Reactive Extensions here? They do seem a bit like an overkill for a simple producer/consumer workflow like above.

No worries, we can use an unbound Channel to act as buffer for our event stream. A Channel is like a pipe, we can push event data objects into one side of the pipe, and fetch them asynchronously from the other. In case with an unbound channel, its internal buffer size is limited by the available memory. In real-life scenarios, we'd almost always want to limit that. Channels are just one way of implementing the Asynchronous Queue data structure in .NET, there are some others, notably Dataflow BufferBlock<T>. For more details on Channels, visit "An Introduction to System.Threading.Channels" by Stephen Toub.

So, we introduce a helper class EventChannel (source) to expose Channel.Writer.TryWrite, Channel.Reader.ReadAllAsync and manage the scope of event handlers as IDisposable:

public class EventChannel<T> : IDisposable
{
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    public Task Completion => _channel.Reader.Completion;

    /// <summary>Queue an event item to the write side of the channel</summary>
    public bool Post(T item)
    {
        return _channel.Writer.TryWrite(item);
    }

    /// <summary>Read queued events as an async stream</summary>
    public IAsyncEnumerable<T> ToAsyncEnumerable(CancellationToken token)
    {
        return _channel.Reader.ReadAllAsync(token);
    }

    /// <summary>A simple helper to wrap even handler scope as IDisposable</summary>
    internal struct EventSubscription<TEventHandler> : IDisposable
        where TEventHandler : Delegate
    {
        private readonly Action _unsubscribe;

        public EventSubscription(
            TEventHandler handler,
            Action<TEventHandler> subscribe,
            Action<TEventHandler> unsubscribe)
        {
            subscribe(handler);
            _unsubscribe = () => unsubscribe(handler);
        }

        public void Dispose()
        {
            _unsubscribe();
        }
    }

    /// <summary>
    /// Subscribe to an event
    /// </summary>
    public IDisposable Subscribe<TEventHandler>(
        TEventHandler handler,
        Action<TEventHandler> subscribe,
        Action<TEventHandler> unsubscribe) where TEventHandler : Delegate
    {
        return new EventSubscription<TEventHandler>(handler, subscribe, unsubscribe);
    }

    public void Dispose()
    {
        _channel.Writer.Complete();
    }
}
Enter fullscreen mode Exit fullscreen mode

The event subscription code now looks like this:

using var eventChannel = new EventChannel<(Component, string)>();

// push Click events to the channel
using var clickHandler = eventChannel.Subscribe<EventHandler>(
    (s, e) => eventChannel.Post((button as Component, $"Cicked on {DateTime.Now}")),
    handler => button!.Click += handler,
    handler => button!.Click -= handler);

// push Tick events to the channel
using var tickHandler = eventChannel.Subscribe<EventHandler>(
    (s, e) => eventChannel.Post((timer as Component, $"Ticked on {DateTime.Now}")),
    handler => timer!.Tick += handler,
    handler => timer!.Tick -= handler);

// process events as async stream via ToAsyncEnumerable(),
await ReadEventStreamAsync(eventChannel.ToAsyncEnumerable(cts.Token), cts.Token);
Enter fullscreen mode Exit fullscreen mode

The consumer part almost hasn't changed, we only removed source.WithCancellation(token) which is now redundant:

static async Task ReadEventStreamAsync(
    IAsyncEnumerable<(Component, string)> source, 
    CancellationToken token)
{
    await foreach (var (component, text) in source)
    {
        // e.g., delay processing
        await Task.Delay(100, token);
        Console.WriteLine($"{component.GetType().Name}: {text}");
    }
}
Enter fullscreen mode Exit fullscreen mode

It produces exactly the same result as with ReactiveX (provided we can manage to click the button with the same intervals 🙂). The full source code (a .NET Core 3.1 project) can be found in this repo.

Conclusion

The domain of the problems that C# asynchronous streams can help solving certainly overlaps with that of the Reactive Extensions (aka ReactiveX/Rx.NET/Rx). E.g., in the first example above I could have just subscribed to mergedObservable notifications and used the powerful toolbox of System.Reactive.Linq extensions to process them.

That said, I personally find it is easier to understand the pseudo-liner code flow of async/await, than the fluent syntax of ReactiveX. In my opinion, it may also be easier to structure the exception handling (particularly, cancellations) while using the familiar language constructs like foreach, yield return, try/catch/finally. I myself only recently came across IAsyncEnumerable, and I decided to try it out by implementing coroutines in C#. I certainly didn't need ReactiveX for that.

However, it should be quite possible to combine ReactiveX and C# asynchronous streams to work together for complex asynchronous workflows.

I hope this has been useful. I'll be blogging more on this topic as I make progress with my side project.

Updated, here's a follow-up blog post:
Asynchronous coroutines with C# 8.0 and IAsyncEnumerable.

Follow me on twitter for further updates, if interested.

Discussion

pic
Editor guide
Collapse
nn profile image
NN

There is some work in this area to combine Rx.NET with Asynchronous streams.
Hopefully it will be inside Rx.NET one day github.com/dotnet/reactive/pull/435.

Collapse
noseratio profile image
Andrew Nosenko Author

I think you mean AsyncRx.NET, in which case it's already there in the master. However, it hasn't been officially published as a Nuget package, and sadly, it doesn't look like it will be any time soon:

When will AsyncRx.NET release?

We don't know as there hasn't been much development around it lately, or some documentation for devs. Besides, I find it too convoluted and I feel uncomfortable supporting it at this point in time. I recommend AsyncEnumerables instead.

Note that AsyncRx.NET is built around IAsyncObservable<T>, which actually is dual to IAsyncEnumerable<T> (similar to how Rx's IObservable<T> is dual to IEnumerable<T>). So, even though Rx maintainers recommend using AsyncEnumerables above, it isn't a direct substitute for what can be done with IAsyncEnumerable (e.g., here's how they play with it).

OTOH, it appears that Project Orleans uses IAsyncEnumerable for their async streams implementations, so maybe it will have the future there.

Collapse
victorioberra profile image
Victorio Berra

Do you have any practical examples where someone would want to do this?

Collapse
noseratio profile image
Andrew Nosenko Author

It's a good question. In theory, I imagine it could be any code that needs to process a sequence of events, e.g., collecting data from an IoT device or processing stock market updates. For use cases like these, we would probably want to use a bounded channel or something like BufferBlock<T> as event buffer on the producer side, to prevent the queue from growing indefinitely.

In practice though, I myself so far only have used that for automated coded UI testing of my side project DevComrade with asynchronous coroutines approach. In that case, the consumer of IAsyncEnumerable stream is essentially a pseudo-linear script (thanks to async/await) that just expects UI events in a particular order. I.e., it doesn't even need an await foreach loop. I'll go into more details about that with another blog post.