DEV Community

Anish Jain
Anish Jain

Posted on

Building a Scalable SQS Consumer in Go

Introduction

When building distributed systems, message queues like Amazon SQS play a crucial role in handling asynchronous workloads. In this post, I'll share my experience implementing a robust SQS consumer in Go that handles user registration events for Keycloak. The solution uses the fan-out/fan-in concurrency pattern to process messages efficiently without overwhelming system resources.

The Challenge

I faced an interesting problem: process around 50,000 SQS events daily to register users in Keycloak. A naive approach might spawn a new goroutine for each message, but this could quickly lead to resource exhaustion. We needed a more controlled approach to concurrency.

Why Fan-out/Fan-in?

The fan-out/fan-in pattern is perfect for this use case because it:

  • Maintains a fixed pool of worker goroutines
  • Distributes work evenly across workers
  • Prevents resource exhaustion
  • Provides better control over concurrent operations

Implementation Deep Dive

1. The Consumer Structure

First, let's look at our basic consumer structure:

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

Enter fullscreen mode Exit fullscreen mode

2. Message Processing Pipeline

The implementation consists of three main components:

  1. Message Receiver: Continuously polls SQS for new messages
  2. Worker Pool: Fixed number of goroutines processing messages
  3. Message Channel: Connects the receiver to workers

Here's how we start the consumer:

func StartPool[requestBody any](
    serviceFunc func(c context.Context, dto *requestBody) error,
    consumer *Consumer) {

    ctx := context.Background()
    params := &sqs.ReceiveMessageInput{
        MaxNumberOfMessages: 10,
        QueueUrl:           aws.String(consumer.QueueName),
        WaitTimeSeconds:    20,
        VisibilityTimeout:  30,
        MessageAttributeNames: []string{
            string(types.QueueAttributeNameAll),
        },
    }

    msgCh := make(chan types.Message)
    var wg sync.WaitGroup

    // Start worker pool first
    startPool(ctx, msgCh, &wg, consumer, serviceFunc)

    // Then start receiving messages
    // ... rest of the implementation
}

Enter fullscreen mode Exit fullscreen mode

3. Key Configuration Parameters

Let's examine the crucial SQS configuration parameters:

  • MaxNumberOfMessages (10): Batch size for each poll
  • WaitTimeSeconds (20): Long polling duration
  • VisibilityTimeout (30): Grace period for message processing

4. Worker Pool Implementation

The worker pool is where the fan-out pattern comes into play:

func startPool[requestBody any](
    ctx context.Context,
    msgCh chan types.Message,
    wg *sync.WaitGroup,
    consumer *Consumer,
    serviceFunc func(c context.Context, dto *requestBody) error) {

    processingMessages := &sync.Map{}

    // Start 10 workers
    for i := 0; i < 10; i++ {
        go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc)
    }
}

Enter fullscreen mode Exit fullscreen mode

5. Duplicate Message Handling

We use a sync.Map to prevent processing duplicate messages:

if _, loaded := processingMessages.LoadOrStore(*msg.MessageId, true); loaded {
    wg.Done()
    continue
}

Enter fullscreen mode Exit fullscreen mode

Best Practices and Learnings

  1. Error Handling: Always handle errors gracefully and log them appropriately
  2. Message Cleanup: Delete messages only after successful processing
  3. Graceful Shutdown: Implement proper shutdown mechanisms using context
  4. Monitoring: Add logging at key points for observability

Performance Considerations

  • Worker Count: Choose based on your workload and available resources
  • Batch Size: Balance between throughput and processing time
  • Visibility Timeout: Set according to your average processing time

Future Improvements

  1. Dynamic Worker Scaling: Adjust worker count based on queue depth
  2. Circuit Breaker: Add circuit breaker for downstream services
  3. Metrics Collection: Add Prometheus metrics for monitoring
  4. Dead Letter Queue: Implement DLQ handling for failed messages
  5. Retries: Add exponential backoff for transient failures

Conclusion

The fan-out/fan-in pattern provides an elegant solution for processing high-volume SQS messages in Go. By maintaining a fixed worker pool, we avoid the pitfalls of unbounded goroutine creation while ensuring efficient message processing.

Remember to always consider your specific use case when implementing such patterns. The configuration values shown here (worker count, timeout values, etc.) should be adjusted based on your requirements and resource constraints.


Source code: [Link to your repository if available]

Tags: #golang #aws #sqs #concurrency #distributed-systems

Top comments (0)