DEV Community

Cover image for Why DynamoDB Streams is Very Useful
awedis for AWS Community Builders

Posted on

Why DynamoDB Streams is Very Useful

In this article we are going to learn about Amazon DynamoDB Streams.

The main parts of this article:

  1. About Amazon DynamoDB
  2. About DynamoDB Streams
  3. Example

1. About Amazon DynamoDB

Amazon DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

DynamoDB uses a NoSQL database model, which is nonrelational, allowing documents, graphs and columnar among its data models. A user stores data in DynamoDB tables, then interacts with it via GET and PUT queries, which are read and write operations, respectively. DynamoDB supports basic CRUD operations and conditional operations. Each DynamoDB query is executed by a primary key identified by the user, which uniquely identifies each item.

DynamoDB makes it simple and cost-effective to store and retrieve any amount of data, as well as serve any level of request traffic. All data items are stored on solid-state drives, which provide high I/O performance and can more efficiently handle high-scale requests.

2. About DynamoDB Streams

DynamoDB Streams writes stream records in near-real time so that you can build applications that consume these streams and take action based on the contents. When you enable a stream on a table, DynamoDB captures information about every modification to data items in the table.

Through DynamoDB streams you can capture changes to items stored in a DynamoDB table, at the point in time when such changes occur. For instance let's say you have a popular mobile application that modifies data in a DynamoDB table, at the rate of thousands of updates per second, another application captures and stores these data updates, providing near-real-time usage metrics for the mobile app.

Some keynotes:

The StreamSpecification parameter determines how the stream is configured.
StreamEnabled specifies whether a stream is enabled (true) or disabled (false) for the table.
StreamViewType specifies the information that will be written to the stream whenever data in the table is modified:
  • KEY_ONLY only the key attributes of the modified item.
  • NEW_IMAGE the entire item, as it appears after it was modified
  • OLD_IMAGE the entire item, as it appeared before it was modified.
  • NEW_AND_OLD_IMAGES both the new and the old images of the item.

3. Example

In this part we are going to build a simple API that captures data every time our DynamoDB is updated, and in order to keep the functionality simple we will only log the result in CloudWatch.

Amazon DynamoDB is integrated with AWS Lambda so that you can create triggers; pieces of code that automatically respond to events in DynamoDB Streams. With triggers, you can build applications that react to data modifications in DynamoDB tables.

  • First of all let us enable the streams on our DynamoDB table, the below screen shot shows the status (in my case I am using the new and old images) Image description

📋 Note: I am using the Serverless Framework, if you want to know more about it you can visit this link

  • routes.yml we will add two routes, one for the API that will add new item in DynamoDB, and the other one for the Stream
addProduct:
  handler: src/modules/Stream/controller/stream.add
  events:
    - http:
        method: post
        path: product
        cors: true
streamGetData:
  handler: src/modules/Stream/controller/stream.getStream
  events:
    - stream:
      arn: arn:aws:dynamodb:${env:region}:${env:accountId}:table/${env:dynamoTable}/stream/${env:streamId}
      batchWindow: 1
      startingPosition: LATEST
      maximumRetryAttempts: 1
Enter fullscreen mode Exit fullscreen mode
  • Our Two Lambda functions
"use strict";
const { addProduct } = require('../services/dynamo.service');

function Response(statusCode, data) {
  return {
    statusCode,
    body: JSON.stringify(data, null, 2),
  };
}

module.exports.add = async (event) => {
  try {
    console.log('event =>', event);
    const body = JSON.parse(event.body);

    const addResult = await addProduct(body);
    console.log('addResult =>', addResult);

    return Response(200, 'Product added');
  } catch (error) {
    console.log('error =>', error);
  }
};

module.exports.getStream = async (event) => {
  try {
    console.log('event =>', event);
  } catch (error) {
    console.log('error =>', error);
  }
}
Enter fullscreen mode Exit fullscreen mode

We are just adding an item in DynamoDB, now once the item is added the DynamoDB Streams will trigger a Lambda function.

  • The following code should be added in iamRoleStatements, our Lambda function needs to have the following Roles, in order to be able to add it as trigger function.
- Effect: Allow
      Action:
        - "dynamodb:DescribeStream"
        - "dynamodb:GetRecords"
        - "dynamodb:GetShardIterator"
        - "dynamodb:ListStreams"
      Resource:
        - "arn:aws:dynamodb:${env:region}:${env:accountId}:table/Product/stream/${env:streamId}"
Enter fullscreen mode Exit fullscreen mode
  • product.schema.js (I am using Dynamoose inside my project) (Dynamoose is a modeling tool for Amazon DynamoDB)
const dynamoose = require('dynamoose');

const string_required = {
  type: String,
  required: true,
};

const number_required = {
  type: Number,
  required: true,
};

const ProductSchema = new dynamoose.Schema({
  PK: string_required,
  SK: string_required,
  name: string_required,
  price: number_required,
  category: string_required,
});

module.exports.Product = dynamoose.model('Product', ProductSchema);
Enter fullscreen mode Exit fullscreen mode
  • dynamo.service.js
const dynamoose = require('dynamoose');
const { v4: uuidv4 } = require('uuid');
const { Product } = require('./product.schema');

dynamoose.aws.sdk.config.update({
  accessKeyId: process.env.access_key_id,
  secretAccessKey: process.env.secret_access_key,
  region: process.env.region,
});

module.exports.addProduct = async (body) => {
  const {
    name,
    price,
    category,
  } = body;

  return await Product.create({
    PK: `Product#${uuidv4()}`,
    SK: `User#Product`,
    name,
    price,
    category,
  });
}
Enter fullscreen mode Exit fullscreen mode
  • My API Body that will add the product:
{
    "name": "my product",
    "price": 10,
    "category": "furniture"
}
Enter fullscreen mode Exit fullscreen mode

Once the new item is added you should be able to see inside Amazon CloudWatch the captured data console.log output, which will be executed from the triggered Lambda that we configured previously streamGetData

Conclusion

In This article we saw how Amazon DynamoDB Streams can help you to capture the data that is being changed on your table, this is a very useful feature that can be used in a variety of cases 🙂

Latest comments (0)