loading...

How to use Amazon SQS and SNS for inter-service communication - Part 2

jeastham1993 profile image James Eastham ・6 min read

In my last article in the Design.Build.Deploy series I configured my first application event handler using Amazon SNS and SQS.

For those of you who don't want to step back and read that article, the basic premise is that:

  1. A new team record is created in the database
  2. A notification of this creation is sent to an SNS topic
  3. The topic is forwarded on to a dynamically linked SQS queue to be handled by an interesting application

In the first part of these two articles, I setup points one and two. Our team-service can now handle the creation of teams (amongst other CRUD operations) and the notification of events to an SNS service.

One of the complexities of my design for this system is the dynamic queue configuration. I want interested services to be able to startup, create their own SQS queue and subscribe to the required SNS topics.

That will allow me to implement eventual consistency throughout all of my services, without fear of them interrupting each other.

This could all be configured manually through the AWS management console of course, but there would be no fun in that.

To refer back to a diagram from my first article, here is the envisaged design. With each of the end services managing their SQS instance.

Alt Text

With that, let's get to it.

Creating an SQS queue on the fly in GoLang

To quickly create a proof of concept for this structure, I'm going to create an extremely simple non-clean architecture friendly app.

I'm a huge advocate of following best practices for app design where possible, even in the smallest of apps. That said, I'm not even sure if this design is possible at the moment so a small utility seems a good option.

As I've found with developing almost anything within AWS, the documentation is outstanding. Here is the utility code I ended up with to create and subscribe an Amazon SQS queue to a specific SNS topic (comments added for clarity)

package main


import (
    "strings"
    "fmt"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sqs"
    "github.com/aws/aws-sdk-go/service/sns"
)

func main() {
    requiredQueueName := "fixture-service-queue";
    requiredTopic := "leaguemanager-info-newteamcreated"
    queueURL := ""

    // Initialize the AWS session
    sess := session.Must(session.NewSession(&aws.Config{
        Region:      aws.String("eu-west-1"),
        Credentials: credentials.NewSharedCredentials("", "league-manager-sqs"),
    }))

    // Create new services for SQS and SNS
    sqsSvc := sqs.New(sess)
    snsSvc := sns.New(sess)

    // Create a new requst to list queues, first we will check to see if our required queue already exists
    listQueuesRequest := sqs.ListQueuesInput{}

    listQueueResults, _ := sqsSvc.ListQueues(&listQueuesRequest)

    for _, t := range listQueueResults.QueueUrls {
        // If one of the returned queue URL's contains the required name we need then break the loop
        if strings.Contains(*t, requiredQueueName) {
            queueURL = *t
            break
        }
    }

    // If, after checking existing queues, the URL is still empty then create the SQS queue.
    if queueURL == "" {
        createQueueInput := &sqs.CreateQueueInput{
            QueueName: &requiredQueueName,
        }

        createQueueResponse, err := sqsSvc.CreateQueue(createQueueInput)

        if err != nil {
            fmt.Println(err.Error())
        }

        if (createQueueResponse != nil) {
            queueURL = *createQueueResponse.QueueUrl

            fmt.Println(createQueueResponse.QueueUrl)
        }
    }

    // No way to retrieve the queue ARN through the SDK, manual string replace to generate the ARN
    queueARN := convertQueueURLToARN(queueURL)

    protocolName := "sqs"
    topicArn := ""

    listTopicsRequest := sns.ListTopicsInput{}

    // List all topics and loop through the results until we find a match
    allTopics, _ := snsSvc.ListTopics(&listTopicsRequest)

    for _, t := range allTopics.Topics {
        if strings.Contains(*t.TopicArn, requiredTopic) {
            topicArn = *t.TopicArn
            break
        }
    }

    // If the required topic is found, then create the subscription
    if topicArn != "" {
        subscibeQueueInput := sns.SubscribeInput{
            TopicArn: &topicArn,
            Protocol: &protocolName,
            Endpoint: &queueARN,
        }

        createSubRes, err := snsSvc.Subscribe(&subscibeQueueInput)

        if err != nil {
            fmt.Println(err.Error())
        }

        if (createSubRes != nil) {
            fmt.Println(createSubRes.SubscriptionArn)
        }
    }

    policyContent := "{\"Version\": \"2012-10-17\",  \"Id\": \"" + queueARN + "/SQSDefaultPolicy\",  \"Statement\": [    {     \"Sid\": \"Sid1580665629194\",      \"Effect\": \"Allow\",      \"Principal\": {        \"AWS\": \"*\"      },      \"Action\": \"SQS:SendMessage\",      \"Resource\": \"" + queueARN + "\",      \"Condition\": {        \"ArnEquals\": {         \"aws:SourceArn\": \"" + topicArn + "\"        }      }    }  ]}"

    attr := make(map[string]*string, 1)
    attr["Policy"] = &policyContent

    setQueueAttrInput := sqs.SetQueueAttributesInput{
        QueueUrl: &queueURL,
        Attributes: attr,
    }

    _, err := sqsSvc.SetQueueAttributes(&setQueueAttrInput)

    if err != nil {
        fmt.Println(err.Error())
    }
}

func convertQueueURLToARN(inputURL string) (string) {
    // Awfully bad string replace code to convert a SQS queue URL to an ARN
    queueARN := strings.Replace(strings.Replace(strings.Replace(inputURL, "https://sqs.", "arn:aws:sqs:", -1), ".amazonaws.com/", ":", -1), "/", ":", -1)

    return queueARN
}

To run through this code in a little more step by step detail.

  1. First, a session with AWS is initialized and the required SNS and SQS service clients are created
  2. I then want to check to see if the required queue already exists, and if it doesn't create it
  3. As far as I can tell with the API docs, there is no way to return a Queue ARN from AWS. Yet, the ARN is needed for creating the SNS subscription. The URL and ARN are very similar, so I created a string replace function to convert between the two
  4. Retrieve the ARN for the required topic
  5. Create the subscription
  6. Add required permissions to the SQS queue so that SNS is allowed to publish

IMPORTANT

Probably the most important part of that above code are the last 6 lines.

Creating a new SQS queue and subscribing it to an SNS topic is pretty trivial and really well documented.

What Amazon neglects to tell you, is that SNS cannot publish to an SQS topic unless it is given permission to do so.

The only way (I can find) to do that securely, is to allow 'everyone' permissions to send a message to the queue and then add a condition restricting to the specific ARN.

The AddPermission SDK method doesn't support that, so my current fix is to create policy JSON on the fly and add that to the SQS attributes. Not perfect, but functional.

So that's part one of the concepts proved. But can we actually retrieve and process data from the queue?

Retrieving and processing SQS messages in GoLang

Before I dispose of this little utility application, I want to run a test to make sure that it all links up correctly.

For that, I'm going to build and start-up my team-service locally. From there, I'm going to make use of a goroutine and add this little bit of code to my utility (note the refactoring of the initialization method into a separate function)

func main() {
    // Initialize the AWS session
    sess := session.Must(session.NewSession(&aws.Config{
        Region:      aws.String("eu-west-1"),
        Credentials: credentials.NewSharedCredentials("", "league-manager-sqs"),
    }))

    // Create new services for SQS and SNS
    sqsSvc := sqs.New(sess)
    snsSvc := sns.New(sess)

    requiredQueueName := "fixture-service-queue";
    requiredTopic := "leaguemanager-info-newteamcreated"

    queueURL := createAndSubscribeSqsQueueToSnsTopic(*sqsSvc, *snsSvc, requiredQueueName, requiredTopic)

    go checkMessages(*sqsSvc, queueURL)

    fmt.Scanln()
}


func checkMessages(sqsSvc sqs.SQS, queueURL string) {
    for ; ; {
        retrieveMessageRequest := sqs.ReceiveMessageInput{
            QueueUrl: &queueURL,
        }

        retrieveMessageResponse, _ := sqsSvc.ReceiveMessage(&retrieveMessageRequest)

        if len(retrieveMessageResponse.Messages) > 0 {

            processedReceiptHandles := make([]*sqs.DeleteMessageBatchRequestEntry, len(retrieveMessageResponse.Messages))

            for i, mess := range retrieveMessageResponse.Messages {
                fmt.Println(mess.String())

                processedReceiptHandles[i] = &sqs.DeleteMessageBatchRequestEntry{
                    Id: mess.MessageId,
                    ReceiptHandle: mess.ReceiptHandle,
                }
            }

            deleteMessageRequest := sqs.DeleteMessageBatchInput{
                QueueUrl: &queueURL,
                Entries: processedReceiptHandles,
            }

            _,err := sqsSvc.DeleteMessageBatch(&deleteMessageRequest)

            if err != nil {
                fmt.Println(err.Error())
            }
        }

        if len(retrieveMessageResponse.Messages) == 0 {
            fmt.Println(":(  I have no messages")
        }

        fmt.Printf("%v+\n", time.Now())
        time.Sleep(time.Minute)
    }
}

That piece of code is now going to check the respective SQS queue once a minute. On finding the message, I'm simply going to write the contents of the message to the console, and then delete it.

Running a POST request to my team-service API successfully gives me a message in my SQS queue

Alt Text

Then starting up and running my utility app, gives me this wonderful little console line:

Alt Text

I think that my friends is a concept well and truly proved.

What Next

Now that I know my design of the pub/sub messaging component of the application is feasible, I'm going to hold the development of new services there.

I'm going to switch back to the development of the team service to get that to a fully CI/CD enabled deployable app.

I've found in the past, that finalizing the complete end to end of a single service massively speeds up the development of new services thereafter. Whether that's the simplicity of just duplicating CI/CD pipelines or understanding concepts a bit better, it certainly is a time saver.

So, next week I'll be covering Automated AWS Deployments of a GoLang application.

Go Lang - thoughts so far

A very short aside, but I'm finding Go to be a truly fantastic language to work with. Coming from a .NET and only .NET background, picking up a new language is a pretty new experience.

There are some things that are frustrating (I'm continually wrapping if statements in brackets and adding semi-colons to the end of every line).

Slight syntactical inconveniences aside though, it's a fantastic language to work with. I'd encourage any backend dev with an interest in picking up something new to give it a go.

Discussion

pic
Editor guide