DEV Community

ledux
ledux

Posted on

Search in Kinesis

Introduction

We once couldn't process data records from a Kinesis data stream with a certain value in a field. After we fixed the bug, we needed to reprocess these records.

This blog post describes how I read the data records from the stream, filtered them by the value in question and prepared them for reprocessing.

In this post I will cover the following topics

  • What is Kinesis
  • How to read records from Kinesis, using the aws cli
  • How to process json using the jq cli
  • How to read only records which match certain criteria

What is Kinesis

Kinesis data stream is an AWS service for data processing. Producer of data send data records to Kinesis, consumers can then process them.

A data stream consists of one or more shards. A shard is a sequence of data records and are used for scaling the throughput. Ideally every shard has its own consumer.

On which shard a data records will be stored, is determined by the PartitionKey. The partition key is defined by the producer and can be any field in the payload. Kinesis will hash this value and based on the outcome assign it to a shard.

Every data record gets a SequenceNumber. This number identifies the data record inside the shard. It is an increasing number but not evenly distributed.

ShardIterator

Kinesis is designed to read the ingested data in order it came in. There is no mechanism to search or filter the data. A consumer can only read the data records forward in sequence.

To read the data, the consumer needs a ShardIterator. It represents the position from where the consumer will start reading.

Depending on how the first record is defined, there are different iterator types:

  • AT_SEQUENCE_NUMBER: starts at the record with the exact number provided or the record with the next higher number
  • AFTER_SEQUENCE_NUMBER: starts at the record with the next higher number than the provided one
  • AT_TIMESTAMP: starts at the record with the exact timestamp provided or the next higher one
  • LATEST: starts with the next record created
  • TRIM_HORIZON: starts the the first available record in the shard

AWS CLI

AWS provides a command line interface (CLI) to interact with their services, thus also with Kinesis Data Streams.

To read from Kinesis a ShardIterator is needed. Here I create one of type AT_TIMESTAMP:

stream=stream-name
timestamp=$(date --date='2023-03-13 12:00:00' +%s)
sharditerator=$(aws kinesis get-shard-iterator --stream-name $stream --shard-id 0 --shard-iterator-type AT_TIMESTAMP --timestamp $timestamp | jq .ShardIterator)
Enter fullscreen mode Exit fullscreen mode

This shard iterator allows to read data records from Kinesis:

aws kinesis get-records --shard-iterator $sharditerator --limit 10
Enter fullscreen mode Exit fullscreen mode

This gives an object with an array called Records, where the Kinesis data records are listed. Additionally, there is the next shard iterator and how many milliseconds this batch is behind the latest data record.

A data record contains the identification (SequenceNumber), the timestamp, when the record was sent to Kinesis, the Id of the shard, and the actual data. The data is base64 encoded.

{
  "Records": [
    {
      "SequenceNumber": "49633478129523018598145441545878647674003909456640868354",
      "ApproximateArrivalTimestamp": "2023-03-03T13:22:56.498000+01:00",
      "Data": "ewogICJmb3Jtc0RhdGEiOiB7CiAgICAiZmlyc3ROYW1lIjogIkVyaWMiLAogICAgImxhc3ROYW1lIjogIklkbGUiCiAgfSwKICAibWV0YWRhdGEiOiB7CiAgICAicGFyZW50SWQiOiAiNzgwN2IzNzMtNTRlNy00ZTAzLWE1ZGUtMGI0MDE2OGFmNTRiIgogIH0KfQo=",
      "PartitionKey": "0"
    },
    {
      "SequenceNumber": "49633478129523018598145441601514622818488773411761815554",
      "ApproximateArrivalTimestamp": "2023-03-03T13:26:30.057000+01:00",
      "Data": "ewogICJmb3Jtc0RhdGEiOiB7CiAgICAiZmlyc3ROYW1lIjogIkdyYWhhbSIsCiAgICAibGFzdE5hbWUiOiAiQ2hhcG1hbiIKICB9LAogICJtZXRhZGF0YSI6IHsKICAgICJpbmZvbWVldGluZ0lkIjogImt3MzlrenNhZjMiCiAgfQp9Cg==",
      "PartitionKey": "0"
    }
  ],
  "NextShardIterator": "AAAAAAAAAAHrZiW4CusNNQOgyPkpxttbi9hAPH35qT91FVx7RcmZKmSuzulLh0t16SAlG9jPUO+NJ0RPxfaWZaCjwusIjzxI3MBdGvKbJt/MX2bJHv2FTqiyArEDvuFBI0cvdNeX+T18wcnljCEZ3etm7tBkr9l84O0+1KakYygljotcBba49QLuvW3f90OXxXV9bam5HY3CmbxEr5fK5quRhoBgvhrvxBXUCvMoRCGzVn7krSr9EhZD79DwynYJ9qL3JY5/ZyVAMeh4a20ENkt6PR7MdUikElbjeyuvmeLBpOj+demEps/1NaHh2i5r1i/BRiemgj/5sii+bKcWGPqhkeEujl5+",
  "MillisBehindLatest": 953238000
}
Enter fullscreen mode Exit fullscreen mode

Kinesis will only return the data records in a certain time frame. Often times not even the amount specified with the --limit parameter. To get the next batch of records, the same request must be sent, but with the NextShardIterator. Also, to filter the data records on a field in the Data, we need act on the JSON.

There is a nice little powerful tool called jq which can do that.

Excursion jq

jq is a lightweight and flexible command-line JSON processor.

There is a nice tutorial, but I'll give you the basics here:

Just passing json to jq, it will pretty print the it, including syntax highlighting:

json='[{"name":"John","age":30,"address":{"street":"123 Main St","city":"Anytown","country":"USA"}},{"name":"Jane","age":25},{"name":"Bob","age":40,"address":{"street":"456 Oak St","city":"Othertown","country":"USA"}}]'
echo $json | jq .
Enter fullscreen mode Exit fullscreen mode

To get an entry in the array, we can use the array notation.
To get the entries from the end, use negative numbers (-1 for the last, -2 for the second to last, etc.)

echo $json | jq '.[0]'
Enter fullscreen mode Exit fullscreen mode

We can also transform the data and for example just get the firstname and the city:
The | operator in jq feeds the output of one filter into the input of another.

echo $json | jq '.[0] | { firstname: .name, city: .address.city }'
Enter fullscreen mode Exit fullscreen mode

To act on all entries in the array, just omit the index:

echo $json | jq '.[] | { firstname: .name, city: .address.city }'
Enter fullscreen mode Exit fullscreen mode

To put the result into an array, instead of using independent objects, just wrap the whole query in brackets:

echo $json | jq '[.[] | { firstname: .name, city: .address.city }]'
Enter fullscreen mode Exit fullscreen mode

Besides filtering and modifying json, there are numerous built-in functions and operators one can use.
In our example we need to select data records which have a certain property.

The first function we can use is select(boolean_expression), which can filter lists:

echo $json | jq '.[] | select(.age > 26)'
Enter fullscreen mode Exit fullscreen mode

The second one is has(key), which checks for the presence of a property in a json object:

echo $json | jq '.[1] | has("address")'
Enter fullscreen mode Exit fullscreen mode

To get all objects in the array, which have an address, we can combine those filters:

echo $json | jq '.[] | select(has("address"))'
Enter fullscreen mode Exit fullscreen mode

Those are the basics of jq we need to filter all data records which have a certain property.

Read and filter records from Kinesis

To filter the Kinesis data records by the presence of a property in the payload, the payload must be decoded first.
Then we can filter it with jq as described above.
Afterwards we need to use the next shard iterator to fetch the next batch.
To not call Kinesis twice, I cache the batch in a temporary file.

kinesisBatch=$(mktmp)
aws kinesis get-records --sharditerator $sharditerator --limit 50 > $kinesisBatch
cat $kinesisBatch | jq '.Records[].Data' | base64 -d | jq -s '.[] | select(.metadata | has("parentId"))'
sharditerator=$(cat $kinesisBatch | jq '.NextShardIterator')
aws kinesis get-records --sharditerator $sharditerator --limit 50 > $kinesisBatch
# etc.
Enter fullscreen mode Exit fullscreen mode

Now we need to repeat this until a break condition is reached.
This can be a timestamp or a SequenceNumber.

stream=stream-name
fromTime=$(date --date='2023-03-13 12:00:00' +%s)
timeOfLastEntry=$(date --date='2023-03-13 12:00:00' +%s)
toTime=$(date --date='2023-03-15 15:00:00' +%s)
sharditerator=$(aws kinesis get-shard-iterator --stream-name $stream --shard-id 0 --shard-iterator-type AT_TIMESTAMP --timestamp $timestamp | jq .ShardIterator)
kinesisBatch=$(mktmp)

while [ $toTime -ge $timeOfLastEntry ]; do
    aws kinesis get-records --shard-iterator $sharditerator --limit 50 > $kinesisBatch
    jq '.Records[].Data' $kinesisBatch | base64 -d | jq -s '.[] | select(.metdata | has("parentId"))' >> foundEntries.json
    lastArrival=$(jq '.Records[-1].ApproximateArrivalTimestamp' $kinesisBatch | tr -d '"')
    timeOfLastEntry=$(date --date=$lastArrival +%s)
    sharditerator=$(jq '.NextShardIterator' $kinesisBatch)
done
Enter fullscreen mode Exit fullscreen mode

Wrap up

Even though Kinesis is storing the data persistently, it is not a database. It stores all records in sequence and there is no way to query the data in Kinesis itself. If you want to act on only a subset of the records, you need consume all records and filter them locally. In this blogppost, I outlined how this can be done using the aws cli, in conjunction with jq.

The above snippet can be put in a file an be called from the console. The next step would be, to add parameters, so it can be used more interactively, for example

  • --stream-name stream-name
  • --starting-timestamp yyyy-mm-dd
  • --end-timestamp yyyy-mm-dd
  • --jq-filter-on-data 'select(.metdata | has("parentId"))'

Top comments (0)