I had a call with an engineer on another team who wanted advice for how to schedule a large number of concurrent orchestration functions in Durable Functions. I shared with him some sample code for how we do it in our own internal performance tests and decided it might be useful to share publicly too.
First, here is the orchestration that we're running for the test. It's just a basic sequential orchestrator that calls a SayHello
activity function 5 times.
[FunctionName(nameof(HelloSequence))]
public static async Task<List<string>> HelloSequence(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var outputs = new List<string>
{
await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),
await context.CallActivityAsync<string>(nameof(SayHello), "London"),
await context.CallActivityAsync<string>(nameof(SayHello), "Amsterdam"),
await context.CallActivityAsync<string>(nameof(SayHello), "Mumbai")
};
return outputs;
}
[FunctionName(nameof(SayHello))]
public static string SayHello([ActivityTrigger] string name) => $"Hello {name}!";
And here is the HTTP trigger function we use to trigger a performance run. It takes a count
parameter from the query string as the number of concurrent "HelloSequence" orchestrations to run. In our tests, we'll often run more than 100K orchestrations concurrently.
[FunctionName(nameof(StartManySequences))]
public static async Task<IActionResult> StartManySequences(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
[DurableClient] IDurableClient starter,
ILogger log)
{
if (!int.TryParse(req.Query["count"], out int count) || count < 1)
{
return new BadRequestObjectResult("A 'count' query string parameter is required and it must contain a positive number.");
}
string prefix = await ScheduleManyInstances(starter, nameof(HelloSequence), count, log);
return new OkObjectResult($"Scheduled {count} orchestrations prefixed with '{prefix}'.");
}
This method calls into a ScheduleManyInstances
helper method, which does the actual scheduling. Before I get into our implementation, however, I think it would be useful to describe what not to do.
Naïve implementation #1: Sequential
The most common naïve implementation is to use a for-loop with an await
in each iteration.
public static async Task<string> ScheduleManyInstances(
IDurableOrchestrationClient client,
string orchestrationName,
int count,
ILogger log)
{
log.LogWarning($"Scheduling {count} orchestration(s)...");
DateTime utcNow = DateTime.UtcNow;
string prefix = utcNow.ToString("yyyyMMdd-hhmmss");
for (int i = 0; i < count; i++)
{
// Start each instance one-at-a-time
string instanceId = $"{prefix}-{i:X16}";
await client.StartNewAsync(orchestrationName, instanceId);
}
log.LogWarning($"All {count} orchestrations were scheduled successfully!");
return prefix;
}
The above is really slow because you're only enqueuing a single orchestration start message at a time. If you're scheduling a large number of orchestrations, then the client that invoked this HTTP function will probably time-out before all the orchestrations are scheduled. Ideally we'd schedule orchestrations in parallel, which leads us to the next bad practice.
Naïve implementation #2: Too much parallelism
Next we try using Task.WhenAll
to schedule all the orchestrations in parallel. This is better than scheduling orchestrations sequentially because it allows us to queue up new work much more quickly. However, it has a major scalability problem, which I'll describe below.
public static async Task<string> ScheduleManyInstances(
IDurableOrchestrationClient client,
string orchestrationName,
int count,
ILogger log)
{
log.LogWarning($"Scheduling {count} orchestration(s)...");
DateTime utcNow = DateTime.UtcNow;
string prefix = utcNow.ToString("yyyyMMdd-hhmmss");
// Run all StartNewAsync tasks concurrently
var startTasks = new Task[count];
for (int i = 0; i < count; i++)
{
string instanceId = $"{prefix}-{i:X16}";
startTasks[i] = client.StartNewAsync(orchestrationName, instanceId);
}
await Task.WhenAll(startTasks);
log.LogWarning($"All {count} orchestrations were scheduled successfully!");
return prefix;
}
The problem is that if count
is a large number (like 100K), you will very quickly exhaust both threads and outbound TCP connections on your VM. This is because the .NET thread scheduler will try to aggressively allocate a huge number of threads to satisfy your concurrency demands. Each of those threads will also try to open a connection to Azure Storage concurrently, requiring a new TCP connection. The result is often that the function will fail, making this implementation highly unreliable.
Naïve implementation #3: Throttled Parallelism for Parallel.For
This next approach uses Parallel.For
to use a throttled concurrency approach.
public static async Task<string> ScheduleManyInstances(
IDurableOrchestrationClient client,
string orchestrationName,
int count,
ILogger log)
{
log.LogWarning($"Scheduling {count} orchestration(s)...");
DateTime utcNow = DateTime.UtcNow;
string prefix = utcNow.ToString("yyyyMMdd-hhmmss");
// Use up to 200 threads to schedule orchestrations concurrently
var maxConcurrencyOptions = new ParallelOptions { MaxDegreeOfParallelism = 200 };
Parallel.For(0, count, maxConcurrencyOptions, i =>
{
string instanceId = $"{prefix}-{i:X16}";
// Use GetAwaiter().GetResult() to block since Parallel.For() doesn't support async
client.StartNewAsync(orchestrationName, instanceId).GetAwaiter().GetResult();
});
log.LogWarning($"All {count} orchestrations were scheduled successfully!");
return prefix;
}
This works much better than the first two solutions. It fixes the TCP connection exhaustion issue by giving the system enough time to reuse existing TCP connections. It also addresses the thread starvation issue by ensuring we don't use more than 200 threads at the same time.
However, the Parallel.For
solution is still inefficient because each degree of parallelism is occupying a dedicated thread, and those threads will get blocked waiting for the StartNewAsync
call to complete. Threads are expensive in terms of CPU and memory and take time to allocate. Ideally we'd do this work in a non-blocking way that allows us to aggressively reuse threads.
Final implementation: Async throttled parallelism
The solution we use is a variation of the above that provides much better thread reuse by using a fully async implementation. I defined a ParallelForEachAsync
helper extension method to achieve this.
public static async Task<string> ScheduleManyInstances(
IDurableOrchestrationClient client,
string orchestrationName,
int count,
ILogger log)
{
log.LogWarning($"Scheduling {count} orchestration(s)...");
DateTime utcNow = DateTime.UtcNow;
string prefix = utcNow.ToString("yyyyMMdd-hhmmss");
await Enumerable.Range(0, count).ParallelForEachAsync(200, i =>
{
string instanceId = $"{prefix}-{i:X16}";
return client.StartNewAsync(orchestrationName, instanceId);
});
log.LogWarning($"All {count} orchestrations were scheduled successfully!");
return prefix;
}
Here is the ParallelForEachAsync
extension method implementation, which includes the async parallel throttling behavior.
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> items, int maxConcurrency, Func<T, Task> action)
{
List<Task> tasks;
if (items is ICollection<T> itemCollection)
{
// optimization to reduce the number of memory allocations
tasks = new List<Task>(itemCollection.Count);
}
else
{
tasks = new List<Task>();
}
using var semaphore = new SemaphoreSlim(maxConcurrency);
foreach (T item in items)
{
tasks.Add(InvokeThrottledAction(item, action, semaphore));
}
await Task.WhenAll(tasks);
}
static async Task InvokeThrottledAction<T>(T item, Func<T, Task> action, SemaphoreSlim semaphore)
{
await semaphore.WaitAsync();
try
{
await action(item);
}
finally
{
semaphore.Release();
}
}
As you can see, we use a SemaphoreSlim
to ensure we don't execute more than maxConcurrency
concurrent actions at the same time. Because the code path is fully async, we can now run many operations in parallel with a much smaller number of threads. Right now we have maxConcurrency
set to 200, but it's very possible that a larger number could have worked as well.
Note that this ParallelForEachAsync
extension method is generic and can be used for any .NET code that needs to execute asynchronous tasks with a cap on concurrency. In fact, we use it inside parts of the Durable Task Framework to reduce the number of concurrent TCP connections we open when making calls to Azure Storage.
Anyways, I hope this is helpful for anyone doing performance work with Durable Functions. If there is a simple way to do this in .NET and I just didn't know about it, I'm interested in learning about that too!
Top comments (5)
You can also use ActionBlock (TPL dataflow) to orchestrate throttled consumer. It provides maxDegreeOfParallelism which works similar to max Concurrency. It would be more concise and has additional features that are useful in more complex scenarios (e.g. Chaining ActionBlocks to create a workflow.)
Could you explain how to accomplish somthing similar from within an orchestration function to start tons of sub orchestrations or activity functions? There does not seem to be connction pooling for starting sub orchestrations or activity functions. So, my consumtion plan AZ func instance quickly runs out of connections.
Check to see if you're using the latest nuget package version. We added similar logic from this post into the Durable Task Framework code that schedules activities and sub-orchestrations. If you're still running out of connections, then there might be something we missed.
However, be careful about scheduling too many actions from a single orchestration instance. That could cause the history to become large and unwieldy. It's better to break up large fan-outs across multiple orchestrations. Perhaps the subject of another blog post. :)
This is a such advanced C# topic. Thanks!
As @rahuldj mentioned, I also wrote a post to discuss the usage of ActionBlock in async/await codes.