DEV Community

Cover image for SNS SUBSCRIBER FOR YOUR API
Alejandro Pereira
Alejandro Pereira

Posted on

SNS SUBSCRIBER FOR YOUR API

In this post we are going to see how to add an API subscriber for SNS notifications. For this case the scenario will be as follows:

  • API is implemented in Flask
  • AWS Budget will be the message producer
  • We have a budget configured with a threshold of 90%
  • Subscriber will be as a http/https type

This post will be explained in this steps:

  1. Understanding how an API subscriber for SNS notifications should process a message.
  2. Authenticate a message from AWS.
  3. Confirm the subscription for a SNS topic.
  4. How to confirm the subscription for a budget alert.
  5. Execute the action.

Let’s go!


PROCESSING AN SNS NOTIFICATION

An API subscriber for SNS notifications should be able to perform this three actions:

  1. Check that message is coming from AWS.
  2. Confirm the subscriptor.
  3. Trigger the desired action.

AWS SIGNATURE VERIFICATION

AWS has defined the necessary steps in order to do a successfully signature verification. You can read about it here. I wrote this step as a decorator since is likely to be used in futures subscribers:

SNS_MESSAGE_HEADER_TYPE = 'x-amz-sns-message-type'
SUBCRIPTION_TYPE = 'SubscriptionConfirmation'
SUBCRIPTION_FORMAT = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
NOTIFICATION_TYPE = 'Notification'
NOTIFICATION_FORMAT = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]

def needs_sns_notification_auth(f):

    @wraps(f)
    def verify_aws_origin(*args, **kwargs):
        if request.headers.get(SNS_MESSAGE_HEADER_TYPE) == SUBCRIPTION_TYPE:
            msg_format = SUBCRIPTION_FORMAT
        elif request.headers.get(SNS_MESSAGE_HEADER_TYPE) == NOTIFICATION_TYPE:
            msg_format = NOTIFICATION_FORMAT
        else:
            logger.error("Message not valid")
            return jsonify({'message': "Not valid"}), 401
        try:
            message_json = request.get_json(force=True)  # Notification will come without Content-type header
            decoded_signature = b64decode(message_json.get("Signature"))
            canonical_message = _message_builder(message_json, msg_format)
            amazon_url = message_json.get("SigningCertURL")
            if not urlparse(amazon_url).hostname.endswith('.amazonaws.com'):
                return jsonify({'message': "Not authorized"}), 403
            cert_content = urlopen(amazon_url).read()
            cert = x509.load_pem_x509_certificate(cert_content, default_backend())
            pubkey = cert.public_key()
            pubkey.verify(
                decoded_signature,
                canonical_message.encode(),
                padding.PKCS1v15(),
                hashes.SHA1()
            )
        except InvalidSignature as e:
            logger.error(e)
            return jsonify({'message': "Not authorized"}), 403
        except Exception as e:
            logger.error(e)
            error_msg = "Couldn't authenticate sns notification."
            return jsonify({'message': error_msg}), 500

        return f(*args, **kwargs)
    return verify_aws_origin
Enter fullscreen mode Exit fullscreen mode

Lot of code there. Let’s explain it a little bit:

  1. So the first thing to do is to know what type is it. The type of message is going to be indicated by the header x-amz-sns-message-type. So the only two possible options are SubscriptionConfirmation and Notification. This is important because the message body will be different. We also define the proper message format.
  2. The way to verificate aws message is to check that request hostname domain is amazonaws.com. Also in the message we’ll get the message signed by amazon and the X509 certificate url (pem) that was used to sign it. So we build the message with a specific format that AWS define and sign it with a key that we get from this certificate.
  3. In case the signature doesn’t match we catch an InvalidSignature signal and handle it as we need. We also catchException for unexpected errors.
  4. If everything ran ok, we execute the decorated function.

SUBSCRIPTION CONFIRMATION

For this step we can assume that signature is valid. I’ve also wrote it as a decorator for same reasons:

def sns_subscriber(f):

    @wraps(f)
    def confirm_subscription(*args, **kwargs):
        if request.headers.get(SNS_MESSAGE_HEADER_TYPE) == SUBCRIPTION_TYPE:
            client = boto3.client('sns')
            json_data = request.get_json(force=True)  # Notification will come without Content-type header
            topic_arn = json_data.get('TopicArn')
            token = json_data.get('Token')
            try:
                response = client.confirm_subscription(
                    TopicArn=topic_arn,
                    Token=token
                )
            except ClientError as e:
                code = e.response.get('Error').get('Code')
                failed_msg = f"Subscription failed: {code}"
                logger.error(failed_msg)
                return jsonify({'message': failed_msg}), 409
            except Exception as e:
                logger.error(e)
                error_msg = f"Couldn't not process subscription for {request.endpoint} to {topic_arn}"
                logger.error(error_msg)
                return jsonify({'message': error_msg}), 500

            return jsonify({'message': success_msg})

        return f(*args, **kwargs)

    return confirm_subscription
Enter fullscreen mode Exit fullscreen mode
  1. We check type of message.
  2. There’s two options to confirm the subscription. One is by doing a GET request to a confirmation URL that comes in the message. An another is by doing an AWS API request with the topic ARN and the token we get from the message. I considered a more secure way by doing this step through the API call. So i extracted the information and used boto3 to make the call.
  3. In case the call fails, i catch the error by ClientError exception. This is the way to catch boto3 errors. You can see more information about it in the library docs.
  4. I considered different kind of responses in each case. In the end of this post i’ll leave a link to the complete code of this which contains detailed docstrings.

BUDGET CONFIRMATION

Signature validation and SNS confirmation should be already be done. Since the message will be sent originally from a budget, when we add a SNS topic subscriber in his alert it will automatically trigger a message in which our subscriber need to respond. Oh and it’ll be also a decorator 🙂 :

def budget_subscriber(f):

    @wraps(f)
    def confirm_subscription(*args, **kwargs):
        if request.headers.get(SNS_MESSAGE_HEADER_TYPE) == NOTIFICATION_TYPE:
            json_data = request.get_json(force=True)
            message_subject = json_data.get('Subject')
            message = json_data.get('Message')
            topic_arn = json_data.get('TopicArn')
            if message_subject == 'SNS Topic Verified!':
                success_msg = f"Successfully subscribed to budget"
                return jsonify({'message': success_msg})

        return f(*args, **kwargs)

    return confirm_subscription
Enter fullscreen mode Exit fullscreen mode

This is a more simpler step than the previous one. We just need to read the message. If everything is ok then we respond with a 200 code.

SNS SUBSCRIBER CONDITIONS MET, TRIGGER ACTION

For my subscriber i’ll send an email to let know the user that his budget threshold has been reached:

@app.route('/budget-subscriber', methods=['POST'])
@needs_sns_notification_auth
@sns_subscriber
@budget_subscriber
def notify_threshold():
    json_data = request.get_json(force=True)
    user_summary = process_notification(json_data)
    if not user_summary:
        error_msg = "Not able to process the sns notification."
        logger.error(error_msg)
        return jsonify({'message': error_msg}), 400

    user_email = user_summary.get('user_email')
    user_name = user_summary.get('user_name')
    threshold = user_summary.get('threshold')
    amount = user_summary.get('budget_amount')

    subject = f"Cost control alert - {threshold}% warning threshold exceeded"
    body = (f"Hi {user_name},<br/><br/>You have exceeded {threshold}% of the ${amount} authorized for the current month. "
                  )
    message_html = f'{body}'
    try:
        sent = send_email([user_email], subject, message_html)
    except ClientError as e:
        error_code = e.response.get('Error').get('Code')
        error_message = f"notification email to user failed: {error_code}"
        logger.error(error_message)
        return jsonify({'message': error_message}), 409

    if not sent:
        error_message = f"Unexpected error trying to send notification to user."
        logger.error(error_message)
        return jsonify({'message': error_message}), 500

    return jsonify({'message': "success"})
Enter fullscreen mode Exit fullscreen mode

This code do the following:

  1. I’ll decorate this endpoint with the defined decorators. In this way before the code gets here it will execute the previous steps needed.
  2. I collect some information needed to build the message that i’ll send to the user with process_notification. Below i’ll leave a link to the complete code sample.
  3. Build the message.
  4. Send the message with the custom function send_email.
  5. Error handling.

CONCLUSION

As you can see there’s a bit of work to do before we just trigger our subscriber action. AWS provide to us a security layer in which we for sure need to listen. It could take more time and effort to complete our task but it will be worth the effort in the long run. Internet security issues can be a real problem these days if we don’t have serious considerations. All must be assumed.

I hope you enjoyed this post. If you like it, please share!

Get the code here!

Top comments (0)