DEV Community

Cover image for Aggregate streaming data within windows of time using Atlas Stream Processing
Rob Walters for MongoDB

Posted on • Updated on

Aggregate streaming data within windows of time using Atlas Stream Processing

Unlike traditional batch processing — where data is collected, stored, and then processed in chunks — streaming data is processed as it is produced, allowing for immediate analysis, response, and decision-making. Window operators within MongoDB Atlas Stream Processing pipelines allow developers to analyze and process specific fixed-sized “windows” of data within a continuous data stream. This bucketing of the data makes it easy to discover patterns and trends. Without window operators, developers have to process every single data point in the stream, and depending on the volume of data, this can be very resource-intensive.

For example, consider the solar farm use case where thousands of sensors across the farm are capturing input wattage every second. Moving every data point across tens of thousands of sensor readings is time-consuming and costly. A better solution is to capture the trend of the data by using a window operator and calculating the average watts over an interval of time. This adds value to the business while conserving storage and network resources.

Currently, Atlas Stream Processing supports two methods for windowing: tumbling windows and hopping windows. Each of these variations treats the actions within the time window slightly differently. Let’s explore these window operators in more detail.

Tumbling windows

Tumbling windows segment the data into non-overlapping, fixed-size windows. This can be helpful for batch-style analysis on discrete chunks of data. For example, if we want to calculate the average price per minute, we can define a tumbling window with an interval of one minute, as shown in the following figure.

Graphic showing non-overlapping, fixed-size tumbling windows.

For every minute, the average price for the LEAF stock will be calculated, and that value will be moved on to the next stage in the Atlas Stream Processing aggregation pipeline.

Using Atlas Stream Processing, we use the $tumblingWindow pipeline operator to define the interval, and aggregate operations we seek to perform within the time window. For example, consider a stream of stock data in JSON format:

 {
    company_symbol: 'SSC',
    company_name: 'SUPERIOR SAFFRON CORPORATION',
    exchange: 'NASDAQ',
    price: 70.14,
    tx_time: ISODate("2023-08-25T06:56:11.129Z")
  },
  {
    company_symbol: 'GTH',
    company_name: 'GRACEFUL TRAINER HOLDINGS',
    exchange: 'NYSE',
    price: 66.78,
    tx_time: ISODate("2023-08-25T06:56:11.129Z")
  },
  {
    company_symbol: 'FIL',
    company_name: 'FRUSTRATING INK LLC',
    exchange: 'NASDAQ',
    price: 83.92,
    tx_time: ISODate("2023-08-25T06:56:11.129Z")
  },
…
Enter fullscreen mode Exit fullscreen mode

Here the data from multiple securities is being streamed once per second. To configure Atlas Stream Processing to perform an average price value over one-minute grouping by the company symbol, you configure the $tumblingWindow pipeline operator as follows:

$tumblingWindow: 
{
            interval: {size: NumberInt(1), unit: "minute"},
            pipeline: [
                {
                    $group: {
                         _id: "$fullDocument.company_symbol",
                        max: {$max: "$fullDocument.price"},
                        avg: {$avg: "$fullDocument.price"}
                    }
                }
            ]
}
Enter fullscreen mode Exit fullscreen mode

Note: The use of $fullDocument is needed here since Atlas Stream Processing is reading from a MongoDB change stream on the collection. The event that comes from the change stream contains metadata about the event and a field called “fullDocument” that includes the data we are interested in. For more information on the change stream event format, check out the Change Events documentation.

The interval units can be: "year", "month", "day", "hour", "minute", “second”, and “ms”. Inside the pipeline, you define aggregation operations on the time interval. In this case, we want to use $group to group the data by company symbol and return the maximum value and the average of the price value during the one-minute interval.

When this pipeline is run, the following results are obtained:

{
  _id: 'IST',
  max: 89.77,
  avg: 89.50254545454546,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T11:02:00.000Z,
    windowEndTimestamp: 2023-08-25T11:03:00.000Z
  }
}
{
  _id: 'DPP',
  max: 51.38,
  avg: 51.23148148148148,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T11:02:00.000Z,
    windowEndTimestamp: 2023-08-25T11:03:00.000Z
  }
}
{
  _id: 'RAC',
  max: 60.63,
  avg: 60.47611111111111,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T11:02:00.000Z,
    windowEndTimestamp: 2023-08-25T11:03:00.000Z
  }
}
Enter fullscreen mode Exit fullscreen mode

Notice there is an extra field, “_stream_meta,” included as part of the result. This data describes the time interval for the aggregation. This output is explained in the section “Window output” later in this post since it applies to the other supported window in Atlas Stream Processing, a hopping window.

Hopping windows

A hopping window, sometimes referred to as a sliding window, continuously moves over the data stream in a fixed size as new data arrives. This is useful for ongoing analysis and determining trends — for example, if we want to calculate the average price over the past hour in 30-minute increments as shown in the following figure. Stated another way, at time one hour, you get an average over the past hour. Then at time one hour and 30 minutes, you get an average over the past one hour (which is the calculation of time between 30 minutes and the current time of 90 minutes).

Graphic showing overlapping, fixed-size hopping windows.

Given our stock example, we can create a hopping window pipeline operator that averages over the past minute every 30 seconds as follows:

hoppingwindow=
    { $hoppingWindow: {
      interval: {size: 1, unit: "minute"}, 
      hopSize: {size: 30, unit: "second"},
      pipeline: 
      [
        { $group: {
            _id: "$fullDocument.company_symbol",
            max: { $max: "$fullDocument.price" },
            min: { $min: "$fullDocument.price" },
            avg: { $avg: "$fullDocument.price" }
        }},
        { $sort: { _id: 1 }}
      ]
    }}
Enter fullscreen mode Exit fullscreen mode

As with the tumbling window, we start by specifying an interval. But unique to the hopping window, we also specify “hopSize” to define the time segment the pipeline will be evaluated over. An example output of the hopping window defined above is as follows:

{
  _id: 'IST',
  max: 89.69,
  min: 89.44,
  avg: 89.59533333333334,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T10:43:00.000Z,
    windowEndTimestamp: 2023-08-25T10:44:00.000Z
  }
}
{
  _id: 'IST',
  max: 89.69,
  min: 89.27,
  avg: 89.53133333333334,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T10:43:30.000Z,
    windowEndTimestamp: 2023-08-25T10:44:30.000Z
  }
}
{
  _id: 'IST',
  max: 89.8,
  min: 89.27,
  avg: 89.53566666666667,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T10:44:00.000Z,
    windowEndTimestamp: 2023-08-25T10:45:00.000Z
  }
}
Enter fullscreen mode Exit fullscreen mode

The above result set was filtered to only show one of the stock symbols, “IST”, so you can observe the data as it is returned per the “hopSize” defined in the query. The first result was from the interval 43:00 to 44:00, then the minute 43:30 to 44:30, then the minute from 44:00 to 45:00. Note these computations are in 30-second “hops.”

Window output

For every message being emitted from a window, some implicit projections are made. This allows the developer to understand the bounds of the window being emitted. Output is automatically projected into a "_stream_meta" field for the message.

For example, the output of a single tumblingWindow from the earlier example is as follows:

{
  _id: 'TRF',
  max: 29.64,
  avg: 29.541632653061225,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T09:50:00.000Z,
    windowEndTimestamp: 2023-08-25T09:51:00.000Z
  }
}
{
  _id: 'DPP',
  max: 51.28,
  avg: 51.13448979591837,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T09:50:00.000Z,
    windowEndTimestamp: 2023-08-25T09:51:00.000Z
  }
}
{
  _id: 'GCC',
  max: 60.41,
  avg: 60.30142857142857,
  _stream_meta: {
    sourceType: 'atlas',
    windowStartTimestamp: 2023-08-25T09:50:00.000Z,
    windowEndTimestamp: 2023-08-25T09:51:00.000Z
  }
}
Enter fullscreen mode Exit fullscreen mode

The windowStartTimestamp is the first timestamp of the window, and its data is inclusive in the calculation. The windowEndTimestamp is the last timestamp of the window, and its data is exclusive in the calculation.

Blocking versus non-blocking stages

Keep in mind that the window operations mentioned previously are used within the context of a stream processing query. This query is a MongoDB Aggregation pipeline that can contain other operators. For example, a typical stream processing query includes a $source, indicating the source of the data stream, and an $emit or $merge stage that describes where to write the stream data. To make it easy to build pipelines, we can define variables for these stages. When ready to process the query, we simply pass the stages in an array. To illustrate, consider the following variables:

A source:

sourceStocks={$source: { 
connectionName: "stockdb",
db: "StockData",
coll:"Stocks", 
allowedLateness: { unit: 'minute', size: 1 },
timeField: { $dateFromString:{"dateString":"fullDocument.$tx_time"}}}
Enter fullscreen mode Exit fullscreen mode

Our hopping window from earlier:

hoppingwindow=
    { $hoppingWindow: {
      interval: {size: 1, unit: "minute"}, 
      hopSize: {size: 30, unit: "second"},
      pipeline: 
      [
        { $group: {
            _id: "$fullDocument.company_symbol",
            max: { $max: "$fullDocument.price" },
            min: { $min: "$fullDocument.price" },
            avg: { $avg: "$fullDocument.price" }
        }},
        { $sort: { _id: 1 }}
      ]
    }}
Enter fullscreen mode Exit fullscreen mode

And a $merge stage:

mergeStocks={$merge: {      
            into: {
                connectionName: "stockdb",
                db: "StockData",
                coll: "stocksummary"            }
            }}
Enter fullscreen mode Exit fullscreen mode

Now, when we create the stream processor, we simply issue the:

sp.createStreamProcessor("StockSummary",[sourceStocks,hoppingwindow,mergeStocks])
Enter fullscreen mode Exit fullscreen mode

When building pipelines for Atlas Stream Processing, there are certain operators— such as $group, $sort, $count, and $limit — that are considered blocking stages of a pipeline. This means that the process waits for all of the input data set to arrive and accumulate before processing the data together. In the context of a data stream, these blocking operations do not make sense to be executed on individual data points that arrive in the stream, since data is flowing continuously and value is obtained from more than one data point.

Other aggregation pipeline operators are not blocking, such as $addFields, $match, $project, $redact, $replaceRoot, $replaceWith, $set, $unset, and $unwind, to name a few. These non-blocking operators can be used anywhere within the stream processing pipeline. But blocking stages, such as $avg, must be used within the tumbling or hopping window pipeline operators.

Defining a window gives the stream processor the bounded context it needs (the interval and hop size, if applicable) to appropriately process your data. Thus, the following $group would not be valid if it was outside of a tumbling or hopping window operator:

Var g={$group: {
            _id: "$fullDocument.company_symbol",
            max: { $max: "$fullDocument.price" },
            min: { $min: "$fullDocument.price" },
            avg: { $avg: "$fullDocument.price" }
        }}
Enter fullscreen mode Exit fullscreen mode

Also, note that Atlas Stream Processing does not support the $setWindowFields operator. Behind the scenes, $setWindowFields produces a different output document schema and uses different window boundary semantics. The window operators $tumblingWindow and $hoppingWindow used within Atlas Stream Processing are purpose-built to handle streams of data and common issues such as out-of-order and late-arriving data.

Summary

Window operations in Atlas Stream Processing provide developers with an easy way to aggregate streaming data through both tumbling and hopping windows. MongoDB is investing heavily in data streams and supporting event-driven architectures with Atlas Stream Processing.

Explore Atlas Stream Processing content on the MongoDB Developer Center

Now that you’ve learned about window operators in Atlas Stream Processing, check out the windowing tutorial and other content on the Developer Center.

Try Atlas Stream Processing today

Login to MongoDB Atlas to get started

Top comments (1)

Collapse
 
clrkgg_20 profile image
clrkgg

Great post!