DEV Community

Cover image for Testing Event-Driven Serverless Architectures doesn't have to be difficult. 💥🚀
Elias Brange for AWS Community Builders

Posted on • Originally published at eliasbrange.dev

Testing Event-Driven Serverless Architectures doesn't have to be difficult. 💥🚀

1. Introduction

Testing Serverless applications end-to-end is difficult. Testing distributed systems is difficult. But with the right tools, it can be made easier.

Imagine you have an OrderService that emits an OrderCreated event to EventBridge each time an order is created. To test this flow end-to-end, you must subscribe to the EventBridge bus and verify that the correct event has been published.

There is a new player in town. The team at Momento has built a Topics service. I've recently found the time to play around with this service, and I'm impressed.

In this post, I will show you how you can use Momento Topics to test that your application(s) produces the events you expect.

You can find the sample application and a step-by-step guide on GitHub.

2. Momento Topics

Momento Topics is a serverless pub/sub service that makes publishing and subscribing to events simple for any application. This is what Momento themselves say about the service:

Momento Topics is a serverless event messaging service that allows for real-time communication between parts of a distributed application. Instead of spending cycles defining topic resources and dealing with the pain of a tightly-coupled system, push to an unlimited number of channels on the fly without any topic management.

It is a fully managed service that enables a wide range of use cases in distributed systems. For example, you can use it to build an interactive live reaction app to spice up your online presentations.

Being serverless, it also comes with the pay-as-you-go pricing model that we all love and a generous free tier.

Momento has SDKs for a wide range of languages and an HTTP API, which enables the use case in this post. We will integrate EventBridge with this API with the help of API Destinations.

3. Sample application

Our sample application mimics a simple Order Service. It consists of an API Gateway with a single POST / endpoint that triggers a Lambda function. This Lambda function generates a new order with a random ID and publishes an OrderCreated event to EventBridge.

The simple application architecture.

The Lambda function, in this case, is a simple function with the following handler:

export const handler = async (): Promise<APIGatewayProxyResult> => {
  const order = {
    id: ulid(),
    name: 'test order',
  };

  await eventBridgeClient.send(
    new PutEventsCommand({
      Entries: [
        {
          EventBusName: EVENT_BUS,
          Source: 'OrderService',
          DetailType: 'OrderCreated',
          Detail: JSON.stringify(order),
        },
      ],
    }),
  );
  return {
    statusCode: 201,
    body: JSON.stringify(order),
  };
};
Enter fullscreen mode Exit fullscreen mode

When the API receives a POST / request, an event with the following structure is published to EventBridge:

{
  "version": "0",
  "id": "11111111-2222-4444-5555-666666666666",
  "detail-type": "OrderCreated",
  "source": "OrderService",
  "account": "123456789012",
  "time": "2023-10-19T08:00:00Z",
  "region": "eu-west-1",
  "resources": [],
  "detail": {
    "id": "01F9ZQZJZJZJZJZJZJZJZJZJZJ",
    "name": "test order"
  }
}
Enter fullscreen mode Exit fullscreen mode

How can you test this application? In a more real-life scenario, you would probably persist an order to a database. If that's the case, you could verify that the service has persisted the order in the database. Your service might have external consumers that rely on the events it emits. You must ensure that your service publishes the OrderCreated event to EventBridge and has the correct format.

There are different ways to do this. You could integrate EventBridge with services like SNS or AppSync subscriptions and subscribe to those in your tests. But, there is an easier way: Momento Topics.

4. Integrate EventBridge and Momento Topics

The first thing that enables this pattern is the API Destinations feature of EventBridge. API Destinations allow you to send events from EventBridge to an HTTP endpoint. You can configure an API Destination to use Basic, OAuth, and API Key authorization. The authorization configuration, if any, is securely stored in Secrets Manager.

The second thing that enables this pattern is the Momento HTTP API. The API lets you authenticate via an API Key and publish events to a Momento Topic. For a deeper dive into integrating EventBridge and Momento, refer to the official Momento documentation.

Architecture with API destination to Momento topic.

The following standalone CloudFormation template shows the required resources to integrate EventBridge and Momento Topics:

AWSTemplateFormatVersion: '2010-09-09'
Description: Momento destination for EventBridge

Parameters:
  EventBusName:
    Type: String
  MomentoEndpoint:
    Type: String
  MomentoAuthToken:
    Type: String
    NoEcho: true
  TopicName:
    Type: String
  CacheName:
    Type: String

Resources:
  DLQ:
    Type: AWS::SQS::Queue

  Connection:
    Type: AWS::Events::Connection
    Properties:
      AuthorizationType: API_KEY
      AuthParameters:
        ApiKeyAuthParameters:
          ApiKeyName: Authorization
          ApiKeyValue: !Sub ${MomentoAuthToken}

  Destination:
    Type: AWS::Events::ApiDestination
    Properties:
      ConnectionArn: !GetAtt Connection.Arn
      HttpMethod: POST
      InvocationEndpoint: !Sub ${MomentoEndpoint}/topics/${CacheName}/${TopicName}
      InvocationRateLimitPerSecond: 300

  TargetRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: events.amazonaws.com
            Action: sts:AssumeRole
      Path: /service-role/
      Policies:
        - PolicyName: destinationinvoke
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - events:InvokeApiDestination
                Resource: !GetAtt Destination.Arn

  Rule:
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !Ref EventBusName
      EventPattern:
        source:
          - prefix: ''
      State: ENABLED
      Targets:
        - Id: topicPublish-rule
          Arn: !GetAtt Destination.Arn
          RoleArn: !GetAtt TargetRole.Arn
          DeadLetterConfig:
            Arn: !GetAtt DLQ.Arn
Enter fullscreen mode Exit fullscreen mode

The template consists of a few resources:

  • DLQ: A dead-letter queue to catch errors when publishing events to Momento.
  • Connection: Specifies the authorization type and parameters to use with a destination endpoint. Here, we specify that it should use API_KEY validation, and we provide the API key value.
  • Destination: Specifies the endpoint to send events to. Here, we specify the method and endpoint to call. In this case, we construct the URL from the MomentoEndPoint, CacheName, and TopicName parameters.
  • TargetRole: An IAM role that allows EventBridge to invoke the API destination.
  • Rule: An EventBridge rule on the EventBusName bus that sends events to the API destination. Here we use the pattern "source": [{"prefix": ""}] to match all events.

With this stack in place, every event sent to the EventBridge bus will be sent to the Momento Topic. We can now use this in our End-to-End tests to verify that the application publishes the events we expect to EventBridge.

5. Test the application End-to-End

Now, let's get to the magic part. With the EventBridge and Momento integration in place, we can now test our application and be confident that it emits the correct event(s). The architecture below shows the flow of events:

End-to-end testing flow.

  1. A test calls the API Gateway endpoint that triggers the Lambda to send an event.
  2. The API destination relays the event to the Momento topic.
  3. The test receives the event from the Momento topic and verifies that it matches the expected event.

So, we need a test that subscribes to the topic, calls the API, and verifies that the event is received. The test could look like this:

describe('When an order is created', async () => {
  const subscription = await subscribe();
  let order: { id: string; name: string };

  beforeAll(async () => {
    const response = await fetch(API_URL, {
      method: 'POST',
    });
    expect(response.status).toBe(201);

    order = await response.json();
  });

  afterAll(async () => {
    subscription.unsubscribe();
  });

  it('It should publish an OrderCreated event to EventBridge', async () => {
    const message = await subscription.waitForMessageMatching({
      source: 'OrderService',
      'detail-type': 'OrderCreated',
      detail: {
        id: order.id,
        name: order.name,
      },
    });

    expect(message).not.toBeNull();
  }, 5000);
});
Enter fullscreen mode Exit fullscreen mode

The test uses a subscribe function to set up a subscription to the topic. It then calls the API and saves the ID of the created order. It then verifies that it receives an OrderCreated event with the format via the waitForMessageMatching function.

The test explicitly sets a timeout of five seconds. This should be ample time for the service to publish the event and let it propagate to the Momento topic. If no matching event is received within that duration, the test fails.

So, what magic is going on in these functions? Not that much, actually. The Momento SDK is a joy to work with:

import 'dotenv/config';
import {
  TopicClient,
  TopicConfigurations,
  CredentialProvider,
  TopicItem,
  TopicSubscribe,
} from '@gomomento/sdk';
import { ReplaySubject, firstValueFrom } from 'rxjs';
import { filter } from 'rxjs/operators';
import * as _ from 'lodash';

const topicClient = new TopicClient({
  configuration: TopicConfigurations.Default.latest(),
  credentialProvider: CredentialProvider.fromEnvironmentVariable({
    environmentVariableName: 'MOMENTO_API_KEY',
  }),
});
const CACHE_NAME = process.env.CACHE_NAME || '';
const TOPIC_NAME = process.env.TOPIC_NAME || '';

export const subscribe = async () => {
  const messages = new ReplaySubject(100);

  const subscription = await topicClient.subscribe(CACHE_NAME, TOPIC_NAME, {
    onError: (error) => {
      throw error;
    },
    onItem: (item: TopicItem) => {
      try {
        const message = JSON.parse(item.valueString());
        messages.next(message);
      } catch (error) {
        console.log('Error parsing message from Momento Topic', item);
      }
    },
  });

  if (!(subscription instanceof TopicSubscribe.Subscription)) {
    throw new Error('Failed to subscribe to topic');
  }

  const unsubscribe = async () => {
    subscription.unsubscribe();
  };

  const waitForMessageMatching = async (expected: object) => {
    const predicate = (message: unknown) => {
      if (typeof message !== 'object' || message === null) {
        return false;
      }

      return _.isMatch(message, expected);
    };

    const data = messages.pipe(filter((message) => predicate(message)));
    return firstValueFrom(data);
  };

  return {
    unsubscribe,
    waitForMessageMatching,
  };
};
Enter fullscreen mode Exit fullscreen mode

Let's break this down.

We first create a TopicClient and configure it to read the Momento API Key from the environment variable MOMENTO_API_KEY. We also fetch the name of the cache and topic from environment variables.

Inside the subscribe function, we first create an RxJS ReplaySubject. We use this to store incoming events from the Momento topic.

Then, we use the client to subscribe to our topic. In onItem, we parse the incoming event and store it by calling messages.next(message). Every event sent to the topic will be stored during the test.

In waitForMessageMatching, we filter out any messages that do not match the expected event. It returns the first matching event that is received, if any. The test fails if no matching event is received within the test timeout.

Looking back at the actual test case, we are expecting an OrderCreated event from the OrderService that contains the id and name of the order that was created:

const message = await subscription.waitForMessageMatching({
  source: 'OrderService',
  'detail-type': 'OrderCreated',
  detail: {
    id: order.id,
    name: order.name,
  },
});

expect(message).not.toBeNull();
Enter fullscreen mode Exit fullscreen mode

With this, we can be confident that our application works correctly end-to-end.

6. Conclusion

One of the most common arguments against Serverless is that testing is difficult. This partly comes from a misconception that you should test everything locally. With serverless, you should embrace testing in the cloud. This is where the application runs, and this is where you should test it.

Distributed systems are complex to test, no matter if they are built with Serverless technologies or not. But, with the right tools and patterns, you can make it easier. In this post, you have learned how to use EventBridge and Momento Topics to test that your applications produce the events you expect.

You can find the sample application and a step-by-step guide on GitHub.

Top comments (0)