DEV Community

André Slupik
André Slupik

Posted on

Comparing IAsyncEnumerable and IObservable for event streams

I spent a day experimenting with both approaches for representing and manipulating streams of events and here are some preliminary thoughts.

Similarities

Both can represent a stream of events or messages.

Both define LINQ-style operators like Select, Where and Aggregate, making it easy to manipulate the streams.

Both allow merging two streams into one. Note that at the time of writing, the appropriate method for this with IAsyncEnumerable is AsyncEnumerableEx.Merge from System.Interactive.Async, not the extension method of the same name, which behaves differently, concatenating the input streams rather than interweaving their results as they arrive.

Differences

IAsyncEnumerable

It's a pull model: items are created on request as the consumer iterates the stream, just like with IEnumerable. This implies that there needs to be some buffering between where your events are produced (e.g. from the real world, from a message broker, etc.) and when they are consumed. System.Threading.Channels.Channel is designed to fulfill this need.

Has strong traction in the dotnet ecosystem, with language integration in C#8 both for consuming (await foreach) and producing (transforming async IAsyncEnumerable functions into async iterator generators). System.Threading.Channels is a currently recommended approach (source) for producer/consumer scenarios.

Integrates well with async/await.

Encourages an imperative style: the language integration consists of a foreach loop, which means mutable state. That said, it's easy to define additional extension methods to allow writing in a more functional style. Here's an equivalent of F#'s choose for IAsyncEnumerable:

public static async IAsyncEnumerable<TResult> Choose<TSource, TResult>(
    this IAsyncEnumerable<TSource> source,
    Func<TSource, TResult?> selector)
{
    await foreach (var item in source)
    {
        var result = selector(item);
        if (result is not null)
        {
            yield return result;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

IEnumerable and Task are two concepts most C# developers interact with on a daily basis, so the combination of the two should be fairly intuitive.

Support for F# is planned, in the form of taskSeq.

IObservable

It's a push model, similar to C# events: you subscribe a synchronous handler and it gets called whenever an event happens. If you have asynchronous work to do in response to the event, you could convert its output to another IObservable; this approach is described here. The IObservable<->Task interop story seems solid, but the back-and-forth seems like an unavoidable point of friction. Task is a pull model; IObservable is a push model. Task permeates more and more of the dotnet ecosystem every year, it seems, for better or worse.

Documentation on how to create Observables and Observers is surprisingly hard to find. ReactiveX, the open-source project, has a high-level, language-agnostic documentation, which is not enough to fully understand how to use Rx.Net specifically.

The MSDN documentation exists, but is above 10 years old and not currently maintained.
Image description

Googling for examples yields mostly results for RxJS or RxJava. Blog posts or SO questions tend to date 5-10 years, which doesn't necessarily make them wrong today, but means they're not taking into account the current state of the dotnet ecosystem.

Admittedly, the story isn't that great for IAsyncEnumerable documentation either: googling AsyncEnumerable.ToArrayAsync sends me to Entity Framework documentation and if the real doc for System.Linq.AsyncEnumerable exists anywhere, let me know.

The thing is, I don't need to form a new conceptual model to understand what ToArrayAsync is going to do, or what await foreach will do. To use IObservable, I need to understand new terminology such as hot vs cold observables, what Publish does, what Subscribe does, what a Subject is and when should I use it; how to avoid deadlocks which are reportedly easy to run into; how and when to properly call OnNext/OnError/OnCompleted - should I use try-catch-finally on all my logic to make sure I call these reliably? What really happens when exceptions are thrown, are they just propagated or do they stop the stream? I need guidance and best practices if I'm to introduce a new library to a team project, especially one with a high learning curve and deadly gotchas, and the fact that I, or colleagues on my team, can't refer to an official up-to-date doc means this is probably a no-go.

In conclusion

Well, I meant to write an objective pro/cons list, but this turned out into a long rant against IObservable, didn't it? The fact is, the push model appeals to me and seems very powerful. Rx makes it easy to define and manipulate streams of events, and if I could find the time to learn it properly, I feel like it could help me come up with even more elegant solutions than what Channels and async streams allow. The fundamental issue is that the Task-based, pull model seems to have slowly taken over everything in dotnet in the past decade. With IAsyncEnumerable having language integration in C# and little in the way of new concepts, it's very hard to argue for a competing model, especially without proper guidance and documentation for that model.

That said, this was the result of a single day of investigation, so keep in mind this is a pretty superficial overview. I look forward to deepening my understanding of this topic in the future.

Further reading:
A good discussion of async pull vs IObservable: https://fsprojects.github.io/FSharp.Control.AsyncSeq/AsyncSeq.html

Discussion (0)