DEV Community

Pavan
Pavan

Posted on

Concurrent Processing of Azure Service Bus Queue Messages

How do you improve the throughput of the Azure Function which has service bus trigger function to process messages in the queue?

This is most common Cloud Design Pattern to offload the long running or background processes to Azure Function via Service Bus Queue. Example, as soon as you upload an image via Front-End you might want to resize the image to reduce/compress the size. Your backend api could put the message in the queue and location of the original image in the message payload. Your Azure function has service bus trigger function which picks up messages from the queue to resize. Its not good idea to put the image in the message payload, as the messages in service bus queue can only be between 64Kb to 1MB. This restriction is based on the SKU of the Azure Service Bus.

Azure Functions can run in Dynamic Plans as well as in App Service Plans. In Dynamic Plans Azure Functions can be scaled based on Event Driven Scaling. In App Service Plans it is based on Auto Scaling rules that we configure. Both of these are used for running multiple instances of Azure Functions running in multiple VM instances.

Along with above Azure Functions have built in support to configure the concurrency per instance of Function App running in single instance of VM. For Service Bus Trigger Functions setting MaxConcurrentCalls and MaxConcurrentSessions in hosts.json helps us to control the number of messages that can be processed in single instance of Azure function.

But achieving the correct numbers for these settings are a bit hard, we need to find out by trial and error as setting the high values can push the System resources and low values mean under utilisation.

Answer to the above dilemma is Dynamic Concurrency. Currently this is only supported for Azure Service Bus Queue, Blob and Azure Storage Queues.

{
"version": "2.0",
"concurrency": {
**"dynamicConcurrencyEnabled": true,
"snapshotPersistenceEnabled": true **
}
}

Here Function host intelligently identifies the sweet spot based on the availability of System Resources.

When SnapshotPersistenceEnabled is true, which is the default, the learned concurrency values are periodically persisted to storage so new instances start from those values instead of starting from 1 and having to redo the learning.

Sample trigger Function

[FunctionName("ServiceBusQueueTrigger1")]
public void Run([ServiceBusTrigger("myqueue", Connection = "ServiceBusConnectionString")]string myQueueItem, ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function started message: {myQueueItem}");
// Simulate a long running process
System.Threading.Thread.Sleep(20000);
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
}

Sample hosts.json

{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"concurrency": {
"dynamicConcurrencyEnabled": true,
"snapshotPersistenceEnabled": true
}
}

Here in my example each message takes about 20 Secs to process. I have simulated this by adding putting thread to sleep.

Now from service bus explorer i have added three messages successively

All the messages are pickedup immediately.

Image description

Function processed those message sequentially

Image description

For more information :
https://learn.microsoft.com/en-us/azure/azure-functions/functions-concurrency

Top comments (0)