DEV Community

Deyan Petrov
Deyan Petrov

Posted on

F# App Stub for AKS hosting v2 (without Azure WebJobs SDK fluff)

TLDR; Use a standard .NET 8+ host for your microservices running for example in AKS without any Azure WebJobs SDK syntactic sugar or similar fluff/magic1.

Introduction

We started originally now almost 5 years ago with .NET/F# Azure Functions apps, and we had pretty bad experiences with that setup, the biggest issues being low node app density, high costs and too much bloat from MS frameworks.

Soon after that we migrated to AKS (managed Kubernetes on Azure similar to EKS and GKE), but to keep migration efforts reasonable we re-used the WebJobs SDK, which is a building stone for the Azure Functions SDK. More than 2 years ago I wrote an article about the app stub we were using.

Everything was running relatively fine until recently when we had to improve our local development experience, and we needed a way to change the low-level workings of some of our code related to Azure Event Hubs and similar. That was the trigger for re-visiting the usage of WebJobs SDK and actually migrating away from, which is the topic of this article.

When it comes to what Azure WebJobs SDK triggers we were using, I believe we are nothing special and like many others we have the following in place:

  1. Pub-Sub message bus, in our case Azure Event Hubs by means of EventHubTrigger
  2. Queues, in our case Azure Storage Queues by means of QueueTrigger
  3. Time-triggered jobs by means of TimerTrigger
  4. Websockets, in our case Azure SignalR by means of SignalRTrigger

Additionally, we use various other IHostedService/BackgroundService processes, which get started upon app start and are running in the background, doing some other work - e.g. listening to notifications from the database (change streams), aggregating something every x seconds, etc.

All of the above is using configuration based on simple environment variables, some of them pointing to a secrets store (in our case Azure Key Vault). There is of course some logging and telemetry sent to a cloud service (in our case Azure Application Insights), as well as some web/REST/HTTP API (in our case using barebone ASP.NET Core SDK).

All XyzTriggers above have been migrated away from the Azure WebJobs SDK to pretty small, sweet and (almost) fully in our control implementations using directly the Azure SDK, which will be explain in the sections below. But before we do that let's look at how the app stub looks like now.

App Stub v2

// Program.fs
module TestService.XyzHandling.Program

open System.Threading
open Microsoft.Extensions.Hosting
open Framework.Hosting
open Framework.AzureKeyVault.Environment
open TestService.XyzHandling.Api.Wiring

Environment.overwriteEnvironmentVariablesFromKVRef () |> Async.RunSynchronously

[<EntryPoint>]
let main argv =

    let builder =
        HostBuilder.createDefaultBuilder argv BackgroundServiceExceptionBehavior.StopHost
        |> HostBuilder.configureLogging
        |> HostBuilder.configureAppInsights
        |> HostBuilder.configureEventHubProcessors
            EnvVars.appName
            [ EventHubProcessors.eventHubProcessor1; EventHubProcessors.eventHubProcessor2 ]
        |> HostBuilder.configureQueueProcessors
            EnvVars.appName
            ([ Some QueueProcessors.queueProcessor1 ] @ [ QueueProcessors.queueProcessor2 ]
             |> List.choose id)
        |> HostBuilder.configureBackgroundServices
            EnvVars.appName
            [ BackgroundServices.eventHubPublisherTest
              BackgroundServices.queuePublisherTest ]
        |> HostBuilder.configureTimers EnvVars.appName [ Timers.timerProcessor1; Timers.timerProcessor2 ]
        |> HostBuilder.configureStartup Startup.startupFunctions
        |> HostBuilder.configureWebHost
            EnvVars.appName
            [ WebApi.checkHealth; WebApi.checkReadiness; WebApi.Entity.getById ]

    use tokenSource = new CancellationTokenSource()
    use host = builder.Build()
    host.RunAsync(tokenSource.Token) |> Async.AwaitTask |> Async.RunSynchronously

    0 // return an integer exit code

Enter fullscreen mode Exit fullscreen mode

Notes:

  • The above code spawns in total 9 IHosted/Background services (incl. the standard web host)
  • Event Hub Processors listen to & process Azure Event Hub events
  • Queue Processors listen to & process Azure Storage queue messages
  • Timer Processors run stuff every x seconds, minutes, hours etc.
  • There are even some generic Background Services started for writing to an event hub and a queue
  • One can see at a glance all the running processes and even web api endpoints from the Program.fs file

Trigger Implementations

The trigger implementations below are all based on BackgroundService/IHostedService, which means a background task is spawned and is running the whole time the host itself is running.

Compared to the original implementations in Azure Webjobs SDK the below ones may have less functionality (no auto-scaling for Azure Functions or similar) but at the same time are very lean and much easier to understand/maintain.

The source code of a working sample application will be eventually made available here.

Event Hub Processor

Old code:


// Api.Wiring.fs, handle the event
type WebJobs(...) =
    [<FunctionName("HandleXyzEvent")>]
    member _.HandleXyzEvent
        (
            [<EventHubTrigger("",
                              Connection = EnvVars.EventHubs.Xyz.connectionStringKey,
                              ConsumerGroup = EnvVars.EventHubs.consumerGroup)>] msg: EventData,
            enqueuedTimeUtc: DateTime,
            sequenceNumber: Int64,
            offset: string,
            logger: ILogger
        )
        =
        // handle the event

// Program.fs, configure the host with web jobs
    let configureWebJobs (builder:IHostBuilder) = 
        builder.ConfigureWebJobs(fun b ->
          b.AddAzureStorageCoreServices() |> ignore
          b.AddEventHubs() |> ignore)   

Enter fullscreen mode Exit fullscreen mode

Notes:

  • There is some magic going on, because in Program.fs you do not say what you actually want to listen to, you just say "I want to enable Event Hubs Handling", and then you decorate some method of a class with attributes, which indicate that you want to listen to an event hub
  • The EventHubTrigger insists on getting a key to a connection string environment variable, and fetch the value when it decides ...
  • You seem to need to specify also a FunctionName in addition to the method name
  • Any configuration settings for the EventHubTrigger are hidden away, you need to know what environment variables to configure, which are picked "automatigally" by the framework ;)

New code:


// Api.Functions.fs, handle the event
    let processEvent log partitionId (cancellationToken: CancellationToken) event =
        // handle the event

        () |> Async.retn

// Api.Wiring.fs, instantiate the processor
    let eventHubProcessor1 =
        EventHubProcessorDef.create
            "EventHubProcessor1"
            EnvVars.appName
            EnvVars.EventHubs.checkpointStorageConnectionString
            EnvVars.EventHubs.eventHubConnectionString
            EnvVars.EventHubs.consumerGroup
            EnvVars.EventHubs.eventBatchMaximumCount
            EnvVars.EventHubs.assignedPartitionIds
            EnvVars.EventHubs.defaultStartingPosition
            EventHubHandlers.processEvent

// Program.fs, configure the host with the processors
        |> HostBuilder.configureEventHubProcessors
            EnvVars.appName
            [ EventHubProcessors.eventHubProcessor1; EventHubProcessors.eventHubProcessor2 ]
Enter fullscreen mode Exit fullscreen mode

Event Hub Processor Implementation:

module Framework.AzureEventHubs.EventProcessing

open System
open System.Threading.Tasks
open System.Collections.Generic
open System.Threading
open Azure.Messaging.EventHubs.Consumer
open Azure.Storage.Blobs
open Azure.Messaging.EventHubs
open Azure.Messaging.EventHubs.Primitives
open Framework
open Framework.AzureEventHubs.LogEvents
open Framework.Logging.StructuredLog

// https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample08_CustomEventProcessor.md
// Event Processor which considers assigned partitions
type AssignablePartitionProcessor
    (
        log: Log,
        name: string,
        storageClient: BlobContainerClient,
        assignedPartitions: string[] option,
        eventBatchMaximumCount: int,
        consumerGroup: string,
        connectionString: string,
        clientOptions: EventProcessorOptions,
        processEvent: Log -> string -> CancellationToken -> EventData -> Async<unit>,
        processError: Log -> string option -> string -> CancellationToken -> Exception -> Async<unit>
    )
    =
    inherit
        PluggableCheckpointStoreEventProcessor<EventProcessorPartition>(
            BlobCheckpointStore(storageClient),
            eventBatchMaximumCount,
            consumerGroup,
            connectionString,
            clientOptions
        )

    // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
    member this.BaseListPartitionIdsAsync
        (
            connection: EventHubConnection,
            cancellationToken: CancellationToken
        )
        : Task<string[]>
        =
        base.ListPartitionIdsAsync(connection, cancellationToken)

    override this.ListPartitionIdsAsync
        (
            connection: EventHubConnection,
            cancellationToken: CancellationToken
        )
        : Task<string[]>
        =
        match assignedPartitions with
        | Some assignedPartitions -> assignedPartitions |> Task.FromResult
        | None -> this.BaseListPartitionIdsAsync(connection, cancellationToken)

    // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
    member this.BaseListOwnershipAsync
        (cancellationToken: CancellationToken)
        : Task<IEnumerable<EventProcessorPartitionOwnership>>
        =
        base.ListOwnershipAsync(cancellationToken)

    override this.ListOwnershipAsync
        (cancellationToken: CancellationToken)
        : Task<IEnumerable<EventProcessorPartitionOwnership>>
        =
        match assignedPartitions with
        | Some assignedPartitions ->
            assignedPartitions
            |> Seq.map (fun partition ->
                EventProcessorPartitionOwnership(
                    FullyQualifiedNamespace = this.FullyQualifiedNamespace,
                    EventHubName = this.EventHubName,
                    ConsumerGroup = this.ConsumerGroup,
                    PartitionId = partition,
                    OwnerIdentifier = this.Identifier,
                    LastModifiedTime = DateTimeOffset.UtcNow
                ))
            |> Task.FromResult
        | None -> this.BaseListOwnershipAsync(cancellationToken)

    // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
    member this.BaseClaimOwnershipAsync
        (
            desiredOwnership: IEnumerable<EventProcessorPartitionOwnership>,
            cancellationToken: CancellationToken
        )
        : Task<IEnumerable<EventProcessorPartitionOwnership>>
        =
        base.ClaimOwnershipAsync(desiredOwnership, cancellationToken)

    override this.ClaimOwnershipAsync
        (
            desiredOwnership: IEnumerable<EventProcessorPartitionOwnership>,
            cancellationToken: CancellationToken
        )
        : Task<IEnumerable<EventProcessorPartitionOwnership>>
        =
        // Warning: if the match is removed, and only the code in the Some part is left => High CPU utilization if no assignedPartitions defined!
        // for more info see https://github.com/Azure/azure-sdk-for-net/issues/39603
        match assignedPartitions with
        | Some _ ->
            desiredOwnership
            |> Seq.iter (fun ownership -> ownership.LastModifiedTime <- DateTimeOffset.UtcNow)

            desiredOwnership |> Task.FromResult
        | None -> this.BaseClaimOwnershipAsync(desiredOwnership, cancellationToken)

    // Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
    member this.BaseUpdateCheckpointAsync
        (
            partitionId: string,
            offset: int64,
            sequenceNumber: Nullable<int64>,
            cancellationToken: CancellationToken
        )
        =
        base.UpdateCheckpointAsync(partitionId, offset, sequenceNumber, cancellationToken)

    // used in the OnProcessingEventBatchAsync member, calculate once
    member private this.EventHubFullName =
        Subscribing.createEventHubFullPath this.FullyQualifiedNamespace this.EventHubName this.ConsumerGroup

    // https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventprocessorclient.onprocessingeventbatchasync?view=azure-dotnet#remarks
    override this.OnProcessingEventBatchAsync
        (
            events: IEnumerable<EventData>,
            partition: EventProcessorPartition,
            cancellationToken: CancellationToken
        )
        : Task
        =
        task {
            try
                if not (isNull events || events |> Seq.isEmpty) then
                    do!
                        events
                        |> Seq.map (fun event ->
                            async {
                                Subscribing.checkEventEnDequeueTime log this.EventHubFullName event

                                do! processEvent log partition.PartitionId cancellationToken event
                            })
                        |> Async.Sequential
                        |> Async.Ignore
                        |> Async.StartAsTask
                        :> Task

                    let lastEvent = events |> Seq.last

                    do!
                        this.BaseUpdateCheckpointAsync(
                            partition.PartitionId,
                            lastEvent.Offset,
                            lastEvent.SequenceNumber,
                            cancellationToken
                        )
            with ex ->
                // It is very important that you always guard against exceptions in
                // your handler code; the processor does not have enough
                // understanding of your code to determine the correct action to take.
                // Any exceptions from your handlers go uncaught by the processor and
                // will NOT be redirected to the error handler.
                //
                // In this case, the partition processing task will fault and be restarted
                // from the last recorded checkpoint.

                log.Exception
                    (int EventId.EventProcessorError, string EventId.EventProcessorError)
                    "OnProcessingEventBatchAsync: Exception while processing events: {ex}"
                    ex
                    [| ex |]

        // bubble up, which will kill the background service and result in Health Check alert
        // alternative is to log and "swallow" the exception here, but then the processor will go in infinite loop ...
        // NOT a good idea, raising exception here invokes OnProcessingErrorAsync, which causes the host to get restarted automatically, and the health check does not detect this ..
        // ex.Reraise()
        }

    // https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventprocessorclient.onprocessingerrorasync?view=azure-dotnet#remarks
    override this.OnProcessingErrorAsync
        (
            ex: Exception,
            partition: EventProcessorPartition,
            operationDescription: string,
            cancellationToken: CancellationToken
        )
        : Task
        =
        task {
            try
                let partitionId = partition |> Option.ofObj |> Option.map _.PartitionId

                do!
                    processError log partitionId operationDescription cancellationToken ex
                    |> Async.StartAsTask

            with wex ->
                // It is very important that you always guard against exceptions
                // in your handler code; the processor does not have enough
                // understanding of your code to determine the correct action to
                // take. Any exceptions from your handlers go uncaught by the
                // processor and will NOT be handled in any way.
                //
                // In this case, unhandled exceptions will not impact the processor
                // operation but will go unobserved, hiding potential application problems.

                log.Exception
                    (int EventId.EventProcessorError, string EventId.EventProcessorError)
                    "OnProcessingErrorAsync: Exception occurred while processing events: {wex}. Original exception: {ex}."
                    wex
                    [| wex; ex |]

        // do! this.StopProcessingAsync(cancellationToken)

        // bubble up, which will kill the background service and result in Health Check alert
        // alternative is to log and "swallow" the exception here, but then the processor will go in infinite loop ...
        // NOT a good idea, the host gets restarted automatically, and the health check does not detect this ..
        // ex.Reraise()
        }

/// Starts the Event Processor
let startConsumeEvents
    (name: string)
    (checkpointStorageConnectionString: string)
    (checkpointBlobContainerName: string)
    (eventHubConnectionString: string)
    (consumerGroup: string)
    (eventBatchMaximumCount: int)
    (assignedPartitionIds: string[] option)
    (defaultStartingPosition: EventPosition)
    (processEvent: Log -> string -> CancellationToken -> EventData -> Async<unit>)
    (processError: Log -> string option -> string -> CancellationToken -> Exception -> Async<unit>)
    (started: ManualResetEvent)
    (log: Log)
    : (IDictionary<string, obj> -> CancellationToken -> Async<unit>)
    =
    fun state cancellationToken ->
        async {
            let blobContainerClient =
                BlobContainerClient(checkpointStorageConnectionString, checkpointBlobContainerName)
            // automatically create container if it does not exist
            do! blobContainerClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore

            let options = EventProcessorOptions() // TODO: Customize some of them?

            options.DefaultStartingPosition <- defaultStartingPosition

            let processor =
                AssignablePartitionProcessor(
                    log,
                    name,
                    blobContainerClient,
                    assignedPartitionIds,
                    eventBatchMaximumCount,
                    consumerGroup,
                    eventHubConnectionString,
                    options,
                    processEvent,
                    processError
                )

            state.Add("processor", processor)

            log.Info
                (int EventId.EventProcessorStarted, string EventId.EventProcessorStarted)
                "Starting with config:\n\
                    \tEventHubNamespace/Name/ConsumerGroup = {eventHubNamespace}/{eventHubName}/{consumerGroup}\n\
                    \tBlobContainerClient.Uri = {blobContainerClientUri}\n\
                    \tAssignedPartitionIds = {assignedPartitionIds}\n\
                    \tEventBatchMaximumCount = {eventBatchMaximumCount}\n\
                    \tOptions.PrefetchCount = {prefetchCount}\n\
                    \tOptions.PrefetchSizeInBytes = {prefetchSizeInBytes}\n\
                    \tOptions.MaximumWaitTime = {maximumWaitTime}\n\
                    \tOptions.TrackLastEnqueuedEventProperties = {trackLastEnqueuedEventProperties}\n\
                    \tOptions.DefaultStartingPosition = {defaultStartingPosition}\n\
                    \tOptions.LoadBalancingStrategy = {loadBalancingStrategy}\n\
                    \tOptions.LoadBalancingUpdateInterval = {loadBalancingUpdateInterval}\n\
                    \tOptions.PartitionOwnershipExpirationInterval = {partitionOwnershipExpirationInterval}\n\
                    \tOptions.RetryOptions.Mode = {retryOptionsMode}\n\
                    \tOptions.RetryOptions.Delay = {retryOptionsDelay}\n\
                    \tOptions.RetryOptions.MaximumDelay = {retryOptionsMaximumDelay}\n\
                    \tOptions.RetryOptions.MaximumRetries = {retryOptionsMaximumRetries}\n\
                    \tOptions.RetryOptions.TryTimeout = {retryOptionsTryTimeout}\n\
                    \tOptions.RetryOptions.CustomRetryPolicy = {retryOptionsCustomRetryPolicy}\n\
                    "
                [|
                    (processor.FullyQualifiedNamespace |> String.replace ".servicebus.windows.net" "")
                    processor.EventHubName
                    processor.ConsumerGroup
                    blobContainerClient.Uri
                    $"%A{assignedPartitionIds}"
                    eventBatchMaximumCount
                    options.PrefetchCount
                    options.PrefetchSizeInBytes
                    options.MaximumWaitTime
                    options.TrackLastEnqueuedEventProperties
                    options.DefaultStartingPosition
                    options.LoadBalancingStrategy
                    options.LoadBalancingUpdateInterval
                    options.PartitionOwnershipExpirationInterval
                    options.RetryOptions.Mode
                    options.RetryOptions.Delay
                    options.RetryOptions.MaximumDelay
                    options.RetryOptions.MaximumRetries
                    options.RetryOptions.TryTimeout
                    options.RetryOptions.CustomRetryPolicy
                |]

            do! processor.StartProcessingAsync(cancellationToken) |> Async.AwaitTask

            started.Set() |> ignore
        }

/// Stops the Event Processor
let stopConsumeEvents
    name
    (started: ManualResetEvent)
    (log: Log)
    (state: IDictionary<string, obj>)
    : (CancellationToken -> Async<unit>)
    =
    fun cancellationToken ->
        async {
            let processor = state["processor"] :?> AssignablePartitionProcessor
            do! processor.StopProcessingAsync(cancellationToken) |> Async.AwaitTask

            log.Info
                (int EventId.EventProcessorStopped, string EventId.EventProcessorStopped)
                "Event Hub Processor was stopped"
                [||]

            started.Reset() |> ignore
        }

Enter fullscreen mode Exit fullscreen mode

Notes:

  • You define a function, then instantiate processor(s) with the function and a bunch of configuration values, which you can fetch by yourself, and then you tell the HostBuilder to configure your processor(s) - pretty straightforward, no reflection, no magic
  • The implementation of the EventHubProcessor is using EventProcessorClient in the background, which does everything required, including the same checkpointing in blob storage as done by WebJobs SDK. The whole implementation is less than 350 LOCs ..

Queue Processor

Old code:

type WebJobs(...) =
    [<FunctionName("RetryHandleXyzEvent")>]
    member _.RetryHandleXyzEvent
        ([<QueueTrigger("xyz-events-retry-queue", Connection = "StorageQueueConnectionStringKey")>] msg: string)
        (logger: ILogger)
        =
        // handle the message

// Program.fs, configure the host with web jobs
    let configureWebJobs (builder:IHostBuilder) = 
        builder.ConfigureWebJobs(fun b ->
          b.AddAzureStorageCoreServices() |> ignore
          b.AddAzureStorageQueues() |> ignore)   

Enter fullscreen mode Exit fullscreen mode

New code:

// Api.Functions.fs, handle the event
    let processMessage log (cancellationToken: CancellationToken) (msg: QueueMessage) : Async<unit> =
        // handle the queue message

        () |> Async.retn

// Api.Wiring.fs, instantiate the processor
    let queueProcessor1 =
        QueueProcessorDef.create
            "QueueProcessor1"
            EnvVars.appName
            EnvVars.Queues.queueStorageConnectionString
            EnvVars.Queues.queueName
            EnvVars.Queues.messageBatchMaximumCount
            EnvVars.Queues.visibilityTimeout
            EnvVars.Queues.maxPollingInterval
            EnvVars.Queues.maxDequeueCount
            EnvVars.Queues.defaultBackOffIntervalMs
            QueueHandlers.processMessage

// Program.fs, configure the host with the processors
        |> HostBuilder.configureQueueProcessors
            EnvVars.appName
            [ QueueProcessors.queueProcessor1 ]
Enter fullscreen mode Exit fullscreen mode

Queue Processor Implementation:

module Framework.AzureStorageQueues.QueueProcessing

open System
open System.Threading
open Azure.Storage.Queues
open Azure.Storage.Queues.Models
open Framework.AzureStorageQueues.BasicOperations
open Framework.AzureStorageQueues.LogEvents
open Framework.Logging.StructuredLog
open Framework.ExceptionHandling

let private doProcessMessage
    (log: Log)
    (name: string)
    (cancellationToken: CancellationToken)
    (maxDequeueCount: int)
    (queueClient: QueueClient)
    (poisonQueueClient: QueueClient)
    (processMessage: Log -> CancellationToken -> QueueMessage -> Async<unit>)
    (msg: QueueMessage)
    =
    async {
        if msg.DequeueCount > maxDequeueCount then
            // message has been retried too many times => move it the the poison queue
            do!
                poisonQueueClient.SendMessageAsync(msg.Body, cancellationToken = cancellationToken)
                |> Async.AwaitTask
                |> Async.Ignore

            log.Debug
                (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
                "Message dequeue count = {messageDequeueCount} > maxDequeueCount = {maxDequeueCount} => message moved to poison queue {poisonQueueUri}"
                [| msg.DequeueCount; maxDequeueCount; poisonQueueClient.Uri |]

            do!
                queueClient.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, cancellationToken)
                |> Async.AwaitTask
                |> Async.Ignore

            log.Debug
                (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
                "Message deleted from queue {queueUri}"
                [| queueClient.Uri |]
        else
            // normal processing
            // NOTE: try-with is used instead of Async.Catch to catch also cases when processMessage throws exception outside of an Async block ...
            try
                do! processMessage log cancellationToken msg

                do!
                    queueClient.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, cancellationToken)
                    |> Async.AwaitTask
                    |> Async.Ignore

                log.Debug
                    (int EventId.QueueMessageBeingProcessed, string EventId.QueueMessageBeingProcessed)
                    "Deleted message with id = {messageId} and dequeue count = {messageDequeueCount} in queue {queueUri} after successful processing"
                    [| msg.MessageId; msg.DequeueCount; queueClient.Uri |]
            with ex ->
                // message remains in the queue and will become again visible after visibilityTimeout
                log.Exception
                    (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
                    "Queue message processing failed. Retrying {remainingDequeueCount} more times, then the message will be moved to poison queue. Queue: {queueUri}; Message Body: {messageBody}"
                    ex
                    [|
                        (int64 maxDequeueCount - msg.DequeueCount)
                        queueClient.Uri
                        msg |> QueueMessage.toString
                    |]
    }
    |> Async.Catch
    |> Async.map (function
        | Choice1Of2 _ -> ()
        | Choice2Of2 ex ->
            // log the message body
            log.Exception
                (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
                "Error occurred while performing auxiliary queue message processing (e.g. DeleteMessageAsync). Message Body: {messageBody}"
                ex
                [| msg |> QueueMessage.toString |]
            // and propagate up the exception
            ex.Reraise())

/// Starts an infinite loop for receiving queue messages
let consumeMessages
    (name: string)
    (storageConnectionString: string)
    (queueName: string)
    (poisonQueueName: string)
    (messageBatchMaximumCount: int)
    (visibilityTimeout: TimeSpan)
    (maxPollingInterval: TimeSpan)
    (maxDequeueCount: int)
    (defaultBackOffIntervalMs: int)
    (processMessage: Log -> CancellationToken -> QueueMessage -> Async<unit>)
    (started: ManualResetEvent)
    (log: Log)
    : (CancellationToken -> Async<unit>)
    =
    fun cancellationToken ->
        async {
            let options = QueueClientOptions() // TODO: Configure options?
            let queueClient = QueueClient(storageConnectionString, queueName, options)
            do! queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore

            let poisonQueueClient =
                QueueClient(storageConnectionString, poisonQueueName, options)

            do! poisonQueueClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore

            let mutable backoffTimeMs = defaultBackOffIntervalMs

            started.Set() |> ignore

            log.Info
                (int EventId.QueueProcessorStarting, string EventId.QueueProcessorStarting)
                "Starting with config:\n\
                    \tUri = {queueClientUri}\n\
                    \tMessageBatchMaximumCount = {messageBatchMaximumCount}\n\
                    \tVisibilityTimeout = {visibilityTimeout}\n\
                    \tMaxPollingInterval = {maxPollingInterval}\n\
                    \tMaxDequeueCount = {maxDequeueCount}\n\
                    \tOptions.MessageEncoding = {messageEncoding}\n\
                    \tOptions.Retry.Mode = {retryMode}\n\
                    \tOptions.Retry.Delay = {retryDelay}\n\
                    \tOptions.Retry.MaxDelay = {retryMaxDelay}\n\
                    \tOptions.Retry.MaxRetries = {retryMaxRetries}\n\
                    \tOptions.Retry.NetworkTimeout = {retryNetworkTimeout}\n\
                    \tPoisonQueueName = {poisonQueueName}\n\
                    "
                [|
                    queueClient.Uri
                    messageBatchMaximumCount
                    visibilityTimeout
                    maxPollingInterval
                    maxDequeueCount
                    options.MessageEncoding
                    options.Retry.Mode
                    options.Retry.Delay
                    options.Retry.MaxDelay
                    options.Retry.MaxRetries
                    options.Retry.NetworkTimeout
                    poisonQueueName
                |]

            while not cancellationToken.IsCancellationRequested do
                try
                    // any exception in this block will be caught and logged
                    // because if propagated up they will stop the queue processor/background service => health check alert,
                    // but no automatic restart is currently possible ...

                    let! response =
                        queueClient.ReceiveMessagesAsync(
                            maxMessages = messageBatchMaximumCount,
                            visibilityTimeout = visibilityTimeout,
                            cancellationToken = cancellationToken
                        )
                        |> Async.AwaitTask

                    if response.HasValue && response.Value.Length > 0 then
                        do!
                            response.Value
                            |> Seq.map (fun msg ->
                                async {
                                    // central check for endequeTime
                                    // TODO: Enable this once a proper config/solution is found for the invisibility period, which is *not* exposed as a QueueMessage property (only InsertedOn and NextVisibleOn, but VisibleOn is needed ). See fore more info https://github.com/Azure/azure-sdk-for-net/issues/40147
                                    // do checkMessageEnDequeueTime log queueClient.Name backoffTimeMs msg

                                    return!
                                        doProcessMessage
                                            log
                                            name
                                            cancellationToken
                                            maxDequeueCount
                                            queueClient
                                            poisonQueueClient
                                            processMessage
                                            msg
                                })
                            |> Async.Sequential
                            |> Async.Ignore

                        backoffTimeMs <- defaultBackOffIntervalMs

                        log.Debug
                            (int EventId.WaitingForQueueMessages, string EventId.WaitingForQueueMessages)
                            "{messageCount} messages successfully processed from queue {queueUri}. Waiting for {backoffTimeMs} milliseconds before checking again ..."
                            [| response.Value.Length; queueClient.Uri; backoffTimeMs |]
                    else
                        backoffTimeMs <- Math.Min(backoffTimeMs * 2, int maxPollingInterval.TotalMilliseconds)

                        log.Debug
                            (int EventId.WaitingForQueueMessages, string EventId.WaitingForQueueMessages)
                            "No messages found in queue {queueUri}. Waiting for {backoffTimeMs} milliseconds before checking again ..."
                            [| queueClient.Uri; backoffTimeMs |]

                    do! Async.Sleep(backoffTimeMs)

                with ex ->
                    log.Exception
                        (int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
                        "Exception while processing messages: {ex}"
                        ex
                        [| ex |]
        }
Enter fullscreen mode Exit fullscreen mode

Notes:

  • The implementation is based on the default QueueClient.ReceiveMessagesAsync approach of handling queue messages with the Azure SDK, invoked in an infinite loop with some Thread.Sleep sprinkled in it ... The whole implementation is about 200 LOCs.

Timer Processor

Old code:

type WebJobs(...) =
    [<FunctionName("DoSomethingRegularly")>]
    member this.ExpireCustomerDocuments
        (
            [<TimerTrigger("%DoSomethingRegularlyCrontab%")>] timer: TimerInfo,
            logger: ILogger
        )
        =
        // do something

// Program.fs, configure the host with web jobs
    let configureWebJobs (builder:IHostBuilder) = 
        builder.ConfigureWebJobs(fun b ->
          b.AddAzureStorageCoreServices() |> ignore
          b.AddTimers() |> ignore)   
Enter fullscreen mode Exit fullscreen mode

Notes:

  • Some exotic placeholder format of the crontab placeholder ..

New code:

// Api.Functions.fs, handle the event
    let processTimer1 log (cancellationToken: CancellationToken) (toProcessOn: DateTime) : Async<unit> =
        // do something

        () |> Async.retn
// Api.Wiring.fs, instantiate the processor
    let timerProcessor1 =
        TimerProcessorDef.create
            "TimerProcessor1"
            EnvVars.appName
            EnvVars.Timers.timerProcessorQueueStorageConnectionString
            (TimeSpan.FromSeconds(10) |> Some)
            "* * * * *"
            TimerHandlers.processTimer1

// Program.fs, configure the host with the processors
        |> HostBuilder.configureTimers EnvVars.appName [ Timers.timerProcessor1; Timers.timerProcessor2 ]

Enter fullscreen mode Exit fullscreen mode

Timer Processor Implementation:

module Framework.AzureStorageQueues.TimerProcessing

open System
open System.Text
open System.Threading
open Azure.Storage.Queues
open NCrontab
open Framework
open Framework.AzureStorageQueues.LogEvents
open Framework.Logging.StructuredLog

type TimerMessage = { ToProcessOn: DateTime }

let private maxTimeoutPeriod = TimeSpan.FromDays(3) // could be up to 7 (messages are deleted after 7 days from the queue), but calculating possible downtimes/system recovery

let private calculateNextCheckOn (toProcessOn: DateTime) (now: DateTime) =
    if toProcessOn - now < maxTimeoutPeriod then
        toProcessOn
    else
        now + maxTimeoutPeriod

let private createAndSendNextTimerMessage (queueClient: QueueClient) (crontab: CrontabSchedule) =
    async {
        let toProcessOn = crontab.GetNextOccurrence(DateTime.UtcNow)

        let msg = { ToProcessOn = toProcessOn }
        let encodedMessage = msg |> Json.serialize |> Encoding.base64Encode Encoding.UTF8

        let nextCheckOn = calculateNextCheckOn toProcessOn DateTime.UtcNow
        let visibilityTimeout = nextCheckOn - DateTime.UtcNow // message should become visible always slightly after ToProcessOn because sending the msg to the queues takes some ms

        do!
            queueClient.SendMessageAsync(encodedMessage, visibilityTimeout, TimeSpan.FromSeconds(-1)) // -1 second indicates "infinite" message TTL (i.e. 7 days)
            |> Async.AwaitTask
            |> Async.Ignore

        return msg
    }

/// Starts an infinite loop for receiving queue timer messages
let private consumeMessages
    (log: Log)
    (name: string)
    (queueClient: QueueClient)
    (cancellationToken: CancellationToken)
    (crontab: CrontabSchedule)
    processMessage
    (messageBatchMaximumCount: int)
    (visibilityTimeout: TimeSpan)
    =
    async {
        let mutable firstRun = true

        while not cancellationToken.IsCancellationRequested do
            try
                // any exception in this block will be caught and logged
                // because if propagated up they will stop the timer processor/background service => health check alert,
                // but no automatic restart is currently possible ...
                let! response =
                    queueClient.ReceiveMessagesAsync(
                        maxMessages = messageBatchMaximumCount,
                        visibilityTimeout = visibilityTimeout,
                        cancellationToken = cancellationToken
                    )
                    |> Async.AwaitTask

                let! nextCheckOn =
                    if response.HasValue && (response.Value |> Seq.tryHead |> Option.isSome) then // the message became visible, so it needs to be processed or rescheduled
                        async {
                            let msg = response.Value |> Seq.head // we expect/process only 1 timer message per timer queue!

                            let decodedMessage =
                                msg.Body.ToString() |> Encoding.base64Decode Encoding.UTF8 |> Json.deserialize

                            log.Debug
                                (int EventId.TimerMessageBeingProcessed, string EventId.TimerMessageBeingProcessed)
                                "Timer message received in queue {queueUri} with ToProcessOn = {toProcessOn}."
                                [| queueClient.Uri; decodedMessage.ToProcessOn |> DateTime.toStringIso |]

                            let! toProcessOn =
                                // if ToProcessOn in the past => ready to process!
                                if decodedMessage.ToProcessOn <= DateTime.UtcNow then
                                    async {
                                        do! processMessage log cancellationToken decodedMessage.ToProcessOn

                                        log.Debug
                                            (int EventId.TimerMessageBeingProcessed,
                                             string EventId.TimerMessageBeingProcessed)
                                            "Timer message in queue {queueUri} successfully processed."
                                            [| queueClient.Uri |]

                                        // delete this (all) message(s)
                                        do! queueClient.ClearMessagesAsync() |> Async.AwaitTask |> Async.Ignore

                                        log.Debug
                                            (int EventId.TimerMessageBeingProcessed,
                                             string EventId.TimerMessageBeingProcessed)
                                            "Deleted timer message in queue {queueUri}."
                                            [| queueClient.Uri |]

                                        // schedule next execution in a new message
                                        let! newTimerMessage = createAndSendNextTimerMessage queueClient crontab

                                        log.Debug
                                            (int EventId.TimerMessageBeingProcessed,
                                             string EventId.TimerMessageBeingProcessed)
                                            "New timer message with ToProcessOn = {toProcessOn} sent to queue {queueUri}."
                                            [| newTimerMessage.ToProcessOn; queueClient.Uri |]

                                        return newTimerMessage.ToProcessOn
                                    }
                                else // message visible before ToProcessOn .. make invisible again
                                    async {
                                        let nextCheckOn =
                                            calculateNextCheckOn decodedMessage.ToProcessOn DateTime.UtcNow // message should become visible always slightly after ToProcessOn because sending the msg to the queues takes some ms

                                        let visibilityTimeout = nextCheckOn - DateTime.UtcNow

                                        if visibilityTimeout > TimeSpan.Zero then
                                            do!
                                                queueClient.UpdateMessageAsync(
                                                    msg.MessageId,
                                                    msg.PopReceipt,
                                                    visibilityTimeout = visibilityTimeout
                                                )
                                                |> Async.AwaitTask
                                                |> Async.Ignore

                                            log.Debug
                                                (int EventId.TimerMessageBeingProcessed,
                                                 string EventId.TimerMessageBeingProcessed)
                                                "Timer message in queue {queueUri} with ToProcessOn = {toProcessOn} became visible before ToProcessOn. The message's invisibility was extended."
                                                [| queueClient.Uri; decodedMessage.ToProcessOn |> DateTime.toStringIso |]

                                        return decodedMessage.ToProcessOn
                                    }

                            return calculateNextCheckOn toProcessOn DateTime.UtcNow
                        }
                    elif firstRun then
                        log.Debug
                            (int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage)
                            "No visible message in queue {queueUri}, but first run, so await schedule ..."
                            [| queueClient.Uri |]

                        let toProcessOn = crontab.GetNextOccurrence(DateTime.UtcNow)
                        calculateNextCheckOn toProcessOn DateTime.UtcNow |> Async.retn
                    else
                        log.Debug
                            (int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage)
                            "No visible message in queue {queueUri} after sleep but there should have been one, maybe delayed, so check frequently ..."
                            [| queueClient.Uri |]

                        DateTime.UtcNow.AddMinutes(1) |> Async.retn

                let sleepTimeSpan = (nextCheckOn - DateTime.UtcNow) + TimeSpan.FromSeconds(1) // add 1 second to make 100% sure sleep is over only after the message has already become visible

                log.Debug
                    (int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage)
                    "Sleeping {sleepTimeSpan} before checking again in queue {queueUri}"
                    [| sleepTimeSpan; queueClient.Uri |]

                if sleepTimeSpan > TimeSpan.Zero then
                    do! Async.Sleep(sleepTimeSpan)

            with ex ->
                log.Exception
                    (int EventId.TimerMessageProcessingError, string EventId.TimerMessageProcessingError)
                    "Exception while processing messages: {ex}"
                    ex
                    [| ex |]

            firstRun <- false
    }

/// Creates timer queue messages if missing and starts an infinite loop for receiving queue timer messages
let createAndConsumeMessages
    (name: string)
    (crontab: CrontabSchedule)
    (storageConnectionString: string)
    (queueName: string)
    (visibilityTimeout: TimeSpan)
    (processMessage: Log -> CancellationToken -> DateTime -> Async<unit>)
    (started: ManualResetEvent)
    (log: Log)
    : (CancellationToken -> Async<unit>)
    =
    fun cancellationToken ->
        async {
            let options = QueueClientOptions() // TODO: Configure options?

            let queueClient = QueueClient(storageConnectionString, queueName, options)
            let! response = queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask

            if not (isNull response) then // queue was just created
                let! timerMessage = createAndSendNextTimerMessage queueClient crontab

                log.Info
                    (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
                    "New queue {queueUri} was created and a new timer message with ToProcessOn = {toProcessOn} was sent to it."
                    [| queueClient.Uri; timerMessage.ToProcessOn |> DateTime.toStringIso |]
            else // queue was already existing
                let! response = queueClient.GetPropertiesAsync() |> Async.AwaitTask

                if response.HasValue && response.Value.ApproximateMessagesCount = 0 then // ApproximateMessagesCount, even though not exact, is guaranteed to have a value > 0 if there are messages. Additionally "Approximate messages count will give you an approximate count of total messages in a queue and will include both visible and invisible messages."
                    let! timerMessage = createAndSendNextTimerMessage queueClient crontab

                    log.Info
                        (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
                        "Queue {queueUri} exists, but no timer message found in it. Created timer message with ToProcessOn = {toProcessOn}."
                        [| queueClient.Uri; timerMessage.ToProcessOn |> DateTime.toStringIso; queueClient.Uri |]
                else
                    log.Info
                        (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
                        "Existing queue {queueUri} with timer message found."
                        [| queueClient.Uri |]

            let messageBatchMaximumCount = 1 // 1 queue per timer processor, with 1 message per queue only

            started.Set() |> ignore

            log.Info
                (int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
                "Starting with config:\n\
                    \tUri = {queueClientUri}\n\
                    \tMessageBatchMaximumCount = {messageBatchMaximumCount}\n\
                    \tVisibilityTimeout = {visibilityTimeout}\n\
                    \tOptions.MessageEncoding = {messageEncoding}\n\
                    \tOptions.Retry.Mode = {retryMode}\n\
                    \tOptions.Retry.Delay = {retryDelay}\n\
                    \tOptions.Retry.MaxDelay = {retryMaxDelay}\n\
                    \tOptions.Retry.MaxRetries = {retryMaxRetries}\n\
                    \tOptions.Retry.NetworkTimeout = {retryNetworkTimeout}\n\
                    "
                [|
                    queueClient.Uri
                    messageBatchMaximumCount
                    visibilityTimeout
                    options.MessageEncoding
                    options.Retry.Mode
                    options.Retry.Delay
                    options.Retry.MaxDelay
                    options.Retry.MaxRetries
                    options.Retry.NetworkTimeout
                |]

            do!
                consumeMessages
                    log
                    name
                    queueClient
                    cancellationToken
                    crontab
                    processMessage
                    messageBatchMaximumCount
                    visibilityTimeout
        }
Enter fullscreen mode Exit fullscreen mode

Notes:

  • The implementation is based on an Azure Storage Queue with a single message inside, which is made "invisible" for a certain period of time, which allows for surviving a process crash, having multiple instances running etc. The whole implementation is about 250 LOCs.

SignalR

Old code:

type WebJobs(...) =   
    [<FunctionName("HandleAndPushToClient")>]
    member _.HandleAndPushToClient
        (
            [<EventHubTrigger("",
                              Connection = EnvVars.EventHubs.Xyz.connectionStringKey,
                              ConsumerGroup =
                                  DependencyInjection.EventHubs.Xyz.consumerGroupForWebSocketNotification)>] msg:
                EventData,
            enqueuedTimeUtc: DateTime,
            sequenceNumber: Int64,
            offset: string,
            [<SignalR(HubName = DependencyInjection.SignalR.hubName,
                      ConnectionStringSetting = "AzureSignalRConnectionString")>] signalRMessages:
                IAsyncCollector<SignalRMessage>,
            logger: ILogger
        )
        =
        // transform some internal event to external
        // publish to SignalR using signalRMessages.AddAsync

Enter fullscreen mode Exit fullscreen mode

Notes:

  • The output triggers are generally a killer feature, which is really killing you - instead of invoking a very simple Azure SDK client method, you deal with IAsyncCollector and SignalRTrigger magic, which is completely unnecessary. Go figure out how to send message to a specific user or to all ...

New code (same as for EventHubProcessor above:

    let serviceManager =
        SignalRClient.getServiceManager EnvVars.SignalR.connectionString

    let hub =
        SignalRClient.getHubContext serviceManager EnvVars.SignalR.azureSignalRHubName
        |> Async.RunSynchronously // TODO: Find a way to get rid of this

    let sendToUser = SignalRClient.sendToUser hub

// Api.Functions.fs, handle the event
    let processEvent log sendToUser partitionId (cancellationToken: CancellationToken) event = async {
        // handle the event
        do! sendToUser "SomeTarget" "Some Message" "SomeUserId"

    }
Enter fullscreen mode Exit fullscreen mode

Notes:

  • The Azure SDK for sending SignalR Messages is very very straightforward, when you use it directly ...

Conclusion

Removing a layer of indirection has always generated great satisfaction in me. Not only does it make the whole application easier to understand, but you gain also a lot more control, and get to know the inner workings of the technology, without someone deciding something for you, or translating stuff like configuration for you.

MS seems to always try to make things easier for the developer (patronizing him?) by providing a magical and abstract framework which achieves exactly the opposite. My recommendation to MS would be to try make everything look like a stupid console application instead, with full control of the client developer, who is just using a bunch of simple "helper" functions (or class methods in OOP) from MS, and nothing more.

Hopefully someone can save some time doing something similar based on the ideas and code in this article!


  1. Funny enough in the meantime MS seems to be trying to do something similar, by integrating WebJobs SDK into .NET 9's HostBuilder. 

Top comments (0)