DEV Community

Oleksii Nikiforov
Oleksii Nikiforov

Posted on • Originally published at nikiforovall.github.io on

Building pipelines with IAsyncEnumerable in .NET

TL;DR

This article demonstrates how to use IAsyncEnumerable and System.Linq.Async to build pipelines in C#.

Source code : https://github.com/NikiforovAll/async-enumerable-pipelines

You can see all demos by running:

dotnet example --list
Enter fullscreen mode Exit fullscreen mode

Here’s a sneak peek 👀:

// TextSummarizationAndAggregationPipeline
var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .ReportProgress()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .SelectAwait(Summarize)
    .WriteResultToFile(path: Path.Combine(Path.GetTempPath(), "summaries.txt"))
    .ForEachAsync(x => AnsiConsole.MarkupLine($"Processed [green]{x.Name}[/]"));

Enter fullscreen mode Exit fullscreen mode

Introduction

Pipelines are a powerful way to process data in a streaming fashion. They are a series of stages that transform data from one form to another. In this article, we will explore how to build pipelines using IAsyncEnumerable and System.Linq.Async.

Pipelines are a common pattern in modern software development. They are used to process data in a streaming fashion, which can be more efficient than processing it all at once. Pipelines are also composable, meaning that you can combine multiple stages together to create more complex processing logic.

💡 I already describe an approach to building pipelines in my previous blog post, you might want to take a look at Building pipelines with System.Threading.Channels. Both System.Threading.Channels and IAsyncEnumerable provide powerful tools for managing asynchronous data streams in .NET. However, while System.Threading.Channels offers a more explicit approach to handling producer-consumer scenarios, IAsyncEnumerable brings a more integrated and LINQ-friendly way to work with asynchronous sequences. Understanding the strengths and nuances of each can help you choose the right tool for your specific use case.

Examples

There are many interesting concepts that I’m going to cover in this article. Let’s start with the basics.

Using System.Linq.Async operators to build a pipeline

Let’s say we want to build a pipeline that reads files from a directory, parses them, and counts the number of words in each file.

This can be illustrated as follows:

graph LR; A[Read files] --> B[Parse files]; B --> C[Count words]; C --> D[Output results];

🎯 Our goal is to represent each stage of the pipeline in the code using IAsyncEnumerable and System.Linq.Async.

System.Linq.Async is a library that provides asynchronous versions of LINQ operators. It allows you to work with IAsyncEnumerable in a similar way to how you would work with IEnumerable. It makes it easy to build pipelines.

Basically, you have control-flow described as chain of methods calls, and you can implement each stage of the pipeline as a separate method. In my opinion, it makes the code more readable and maintainable. The benefit of this approach is once you determine the stages of the pipeline, you can implement them independently and focus on the logic of each stage.

The process of parsing files can be implemented as follows:

var path = Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "Data");

var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .Select(CalculateWordCount)
    .OrderByDescending(x => x.WordCount)
    .ForEachAsync(Console.WriteLine);

await pipeline;
Enter fullscreen mode Exit fullscreen mode

Everything starts with the conversion of IEnumerable<string> file paths to IAsyncEnumerable<string>.

💡 Alternatively, we could write our own method that returns IAsyncEnumerable, for example, we could easily swap the local file system with Azure Blob Storage. It means we can reuse the same pipeline with different data sources.

💡 Later, we will see that not only IEnumerable can be converted to IAsyncEnumerable, but also IObservable.

As you can see System.Linq.Async provides a set of extension methods that allow you to work with IAsyncEnumerable in a similar way to how you would work with IEnumerable. The SelectAwait method is used to asynchronously project each element of the sequence. The Where method is used to filter elements based on a predicate. The OrderByDescending method is used to sort the elements of the sequence in descending order. The ForEachAsync method is used to asynchronously iterate over the sequence.

It worth to point out that ForEachAsync is a terminal operation that triggers the execution of the pipeline. It is important to remember that IAsyncEnumerable is a cold sequence, meaning that it does not start processing until you start iterating over it.

Here are the building blocks of the pipeline:

public static class Steps
{
    public static async ValueTask<FilePayload> ReadFile(string file)
    {
        var content = await File.ReadAllTextAsync(file);
        var name = Path.GetFileName(file);

        return new FilePayload(name, content);
    }

    public static bool IsValidFileForProcessing(FilePayload file) =>
        file is { Content.Length: > 0, Name: [.., 't', 'x', 't'] };

    public static WordCountPayload CalculateWordCount(FilePayload payload)
    {
        var words = payload.Content.Split(' ');

        return new(payload.Name, words.Length);
    }
}

public record FilePayload(string Name, string Content);
public record WordCountPayload(string Name, int WordCount);
Enter fullscreen mode Exit fullscreen mode

Let’s see the pipeline in action:

Combining IAsyncEnumerable with IObservable

Let’s say we want to use simple file watcher to monitor changes in the directory and trigger the pipeline when a new file is created or an existing file is modified.

var fileWatcher = CreateFileObservable(path);

var pipeline = fileWatcher
    .TakeUntil(DateTimeOffset.Now.AddSeconds(15))
    .ToAsyncEnumerable()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .Select(CalculateWordCount)
    .ForEachAsync(Console.WriteLine);
Enter fullscreen mode Exit fullscreen mode

In this example, we use IObservable to monitor changes in the directory. We create an observable sequence of file paths using the CreateFileObservable method. We then use the TakeUntil operator to limit the duration of the sequence to 15 seconds. We convert the observable sequence to an asynchronous enumerable sequence using the ToAsyncEnumerable method. We then apply the same pipeline as before to process the files.

The CreateFileObservable method is implemented as follows:

static IObservable<string> CreateFileObservable(string path) =>
    Observable.Create<string>(observer =>
    {
        var watcher = new FileSystemWatcher(path)
        {
            NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite,
            Filter = "*.*",
            EnableRaisingEvents = true
        };

        void onChanged(object sender, FileSystemEventArgs e)
        {
            try
            {
                observer.OnNext(e.FullPath);
            }
            catch (Exception ex)
            {
                observer.OnError(ex);
            }
        }

        watcher.Created += onChanged;
        watcher.Changed += onChanged;

        return () =>
        {
            watcher.Created -= onChanged;
            watcher.Changed -= onChanged;
            watcher.Dispose();
        };
    });
Enter fullscreen mode Exit fullscreen mode

Let’s see the pipeline in action:

In the demo below, I’m appending “ word” to the end of the file content to trigger the pipeline.

Implementing reusable operators - Batch

Let’s say we want to batch the processing of files to improve performance. We can implement a custom operator called Batch that groups elements of the sequence into batches of a specified size.

In the example above, we are reading files in batches in parallel. We are using the Batch operator to group files into batches of size 2. We then process each batch in parallel using the ProcessEachAsync method.

const int batchSize = 2;

var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .Batch<string, FilePayload>(batchSize)
    .ProcessEachAsync(ReadFile)
    .Where(IsValidFileForProcessing)
    .Select(CalculateWordCount)
    .OrderByDescending(x => x.WordCount)
    .ForEachAsync(Console.WriteLine);

await pipeline;
Enter fullscreen mode Exit fullscreen mode

In the example above, we are reading files in batches in parallel. We are using the Batch operator to group files into batches of size 2. We then process each batch in parallel using the ProcessEachAsync method.

💡 I will leave the implementation of the Batch operator as an exercise for the reader. Please check source code for the full implementation. https://github.com/NikiforovAll/async-enumerable-pipelines/blob/main/Pipelines.Core/PipelineBuilderExtensions.cs

Let’s see the pipeline in action:

Implementing reusable domain-specific operators - TextSummarization with Semantic Kernel

To demonstrate something more complex, let’s say we want to summarize the content of the files using the Semantic Kernel library. Summarization is a common task in natural language processing (NLP) that involves generating a concise representation of a text document.

var pipeline = Directory
    .EnumerateFiles(path)
    .ToAsyncEnumerable()
    .ReportProgress()
    .SelectAwait(ReadFile)
    .Where(IsValidFileForProcessing)
    .SelectAwait(Summarize)
    .WriteResultToFile(path: Path.Combine(Path.GetTempPath(), "summaries.txt"))
    .ForEachAsync(x => AnsiConsole.MarkupLine($"Processed [green]{x.Name}[/]"));
Enter fullscreen mode Exit fullscreen mode

In the example above, we are reading files, summarizing their content, and writing the results to a file. We are using the ReportProgress operator to report progress as each file is processed. We are using the Summarize operator to summarize the content of each file. We are using the WriteResultToFile operator to write the results to a file.

Before we move forward, let’s see how the pipeline works in the demo below:

Now, we are ready to move forward and see the details of the implementation.

The Summarize method is implemented as follows:

async ValueTask<SummarizationPayload> Summarize(FilePayload file)
{
    var prompt = """

        Please summarize the content above in 20 words or less:

        The output format should be: [title]: [summary]
        """;

    var result = await kernel.InvokePromptAsync(prompt, new KernelArguments() { ["input"] = file.Content });

    return new(file.Name, result.ToString());
}
Enter fullscreen mode Exit fullscreen mode

Then we want to write the results into a file:

public static async IAsyncEnumerable<SummarizationPayload> WriteResultToFile(
    this IAsyncEnumerable<SummarizationPayload> values,
    string path
)
{
    const int batchSize = 10;

    using var streamWriter = new StreamWriter(path, append: true);

    await foreach (var batch in values.Buffer(batchSize))
    {
        foreach (var value in batch)
        {
            await streamWriter.WriteLineAsync(value.Summary);

            yield return value;
        }

        await streamWriter.FlushAsync();
    }

    AnsiConsole.MarkupLine($"Results written to [green]{path}[/]");
}
Enter fullscreen mode Exit fullscreen mode

💡 Note, IAsyncEnumerable is pull-based model. With this approach, each summary is read individually and appended to the end of the file. This means that results are continuously saved as each batch is processed by calling the FlushAsync method.

The ReportProgress method is quite interesting because it eagerly reads all elements of the sequence to determine the total count. It then reports progress as each element is processed.

public static async IAsyncEnumerable<string> ReportProgress(this IAsyncEnumerable<string> values)
{
    var totalCount = await values.CountAsync();

    await foreach (var (value, index) in values.Select((value, index) => (value, index)))
    {
        yield return value;

        AnsiConsole
            .Progress()
            .Start(ctx =>
            {
                var task = ctx.AddTask($"Processing - {Path.GetFileName(value)}", true, totalCount);
                task.Increment(index + 1);
                task.StopTask();
            });
    }
}

Enter fullscreen mode Exit fullscreen mode

💡 This is a good demonstration of leaky abstractions. Not all data sources can provide the full sequence immediately, so we need to be careful.

Conclusion

That is it! 🙌 We have seen how to build pipelines using IAsyncEnumerable and System.Linq.Async. I hope you found this article helpful. If you have any questions or comments, please feel free to leave them below.

References

Top comments (0)