DEV Community

loading...
AWS Community Builders

IoT/TimeSeries event processing using AWS Serverless Services and AWS Managed Kafka Streaming

prasanth_mathesh profile image prasanth mathesh Updated on ・7 min read

Introduction

The TimeSeries and IoT data share most of the characteristics except a few like the timestamp attribute. The time-series data arrival is predefined but IoT events can be in a random window. The IoT event analytics can be done either with IoT analytics or with Kafka but both have certain data processing limitations. The solution I have considered for this article has a mix of Kafka Streaming and IoT Analytics. The integration of AWS MSK with Kinesis Data Analytics or AWS Glue will be covered in detail in another article.

Architecture

Alt Text

Why IoT Core?

Kafka Protocol was built on top of TCP/IP whereas standard IoT devices support MQTT connections. MQTT was built considering bad network and communication. AWS IoT Core Device Gateway supports both HTTP and MQTT protocol, provides bi-directional communication with IoT devices and can filter and route data across the enterprise applications. It enables device registration in a simple manner thus accelerating IoT application development.

Why AWS MSK?

Apache Kafka is distributed messaging framework that provides fast, scalable and throughput ingestions. Kafka can replay IoT events, provides long-term storage, acts as a buffer when there is high-velocity data, provide easy integration with other enterprise applications. Real-time device monitoring is not possible in IoT Analytics whereas Kafka streaming along with an event processing framework can trigger actions for the anomalies in real-time. No downtime during Kafka cluster upgrade and clusters can be provisioned in 15 mins. Another main advantage is no charge for data replication traffic across AZ and this is a key factor when compared with expensive highly available self-hosted Kafka clusters.

Why IoT Analytics?

AWS IoT Analytics can enrich the IoT data, perform ad-hoc analysis and build dashboards using QuickSight. It is a simple and serverless way to do data prep, clean and feature engineering and can be integrated with notebooks, AWS SageMaker to build machine learning models. Custom analysis code packaged in a container can also be executed on AWS IoT Analytics for use cases like understanding the performance of devices, predicting device failures, etc.

Why TimeStream?

Timestream can easily store and analyze trillions of events per day. The data retention can be controlled based on the analytics need. It has built-in time-series analytics functions, helping you identify trends and patterns in your data in near real-time. Timestream can provide 1000x faster query performance along with 1/10th the cost of relational databases. When the same record is received for a timestamp, timestream can deduplicate it which is a common problem with streaming events. When there is a need for high volume of ingestion, events need to be written into canonical datastore like kinesis firehose and then it should be written to S3 for long-term storage but TimeStream can be used as serving db.

Why AWS Lambda?

The AWS Lambda can process data from an event source like Apache Kafka. The lambda is very cheap and its memory can be scaled from 128MB to 10240MB. Also, the processing timeout can be set for Lambda functions. The IoT device's payload will be lesser in size and real-time device control operations based on incoming payload value can be easily done with AWS Lambda rather than using serverless services like AWS Glue or AWS Kinesis Data Analytics. Lambda can be triggered by an event source like AWS MSK or it can be scheduled in AWS.

Let's Get Started

The IoT device registration and IoT message simulation during development are critical tasks in IoT development and there is a need for the data engineering team to simulate the IoT events using various SaaS providers like MQTTLab, Cumulocity IoT etc.
The device registration and IoT Event data simulation for this article were done through AWS IoT simulator. The simulator provides device type registrations, controlling the number of devices to simulate the data and to define payload structure for each device. The Automotive telemetry data was considered and simulated as shown in the steps below.

Add Device Type
Create a simulation stack in an AWS region and add custom devices. The automotive telemetry payload attributes are inbuilt and can’t be changed.

Simulate IoT Data
Automotive Telemetry data is quite comprehensive and the simulator we considered here can publish messages on three topics

Alt text

Telemetry Topic Payload

{
"name": "speed",
"value": 47.4,
"vin": "1NXBR32E84Z995078",
"trip_id": "799fc110-fee2-43b2-a6ed-a504fa77931a",
"timestamp": "2018-02-15 08:20:18.000000000"
}

Trip Topic Payload

{"vehicle_speed_mean":64.10065503146477,"engine_speed_mean":3077.59476197646,"torque_at_transmission_mean":210.70915084517395,"oil_temp_mean":237.417022870719,"accelerator_pedal_position_mean":28.819512721817887,"brake_mean":4.268754736044446,"high_speed_duration":0,"high_acceleration_event":3,"high_braking_event":0,"idle_duration":75323,"start_time":"2021-03-06T07:40:02.454Z","ignition_status":"run","brake_pedal_status":false,"transmission_gear_position":"fifth","odometer":35.27425650210172,"fuel_level":97.7129231345363,"fuel_consumed_since_restart":0.9155057461854811,"latitude":38.938734,"longitude":-77.269385,"timestamp":"2021-03-06 08:13:04.981000000","trip_id":"c6bacef5-1bfc-4a72-8261-6c1272772f13","vin":"5JO226H6QR3J3T7TI","name":"aggregated_telemetrics"}

Diagnostics Topic Payload

{"timestamp":"2021-03-06 08:14:33.087000000","trip_id":"a4c55e6e-a7eb-4c3e-b0a0-dfcace119e03","vin":"MQYK4Z8WGTJDFDA05","name":"dtc","value":"P0404"}

After completing the exercises, stop the simulation.

It is evident from the below image that the costs for IoT are cheap.
Alt Text

IoT Analytics

IoT Analytics setup is a simple process and it can quickly create channel, pipeline and datastore etc. in a single click as shown below.
Alt Text
The above setup was created to select all telemetry payloads from the subscribed topic.

The pipeline can be used to filter any unwanted attributes from the payload before creating a datastore.

S3 is used as an underlying data store and the data format can be parquet or JSON.

Setup IoT Rule and Action
Create a Rule and Action for the Telemetry Payload as given below. The objective is to select all records and ingest them into Telemetry Channel.
Alt Text
Alt Text

Once the simulation is started, the ingestion process can be monitored in the IoT Analytics window.

The dataset can be created and scheduled to be refreshed from the datastore. The dataset can be refreshed from the last import timestamp using the delta time option.

The IoT Analytics dataset can be used in aws sagemaker notebooks and can be used as a data source for QuickSight.

QuickSight

Create a new IoT analytics data source using the telemetry_dataset
Alt Text
Once the import is completed, the records can be enriched or filtered.
Alt Text
The import process can be scheduled for auto-refresh.

The imported telemetry dataset values can be visualized in
QuickSight.
Alt Text

SageMaker

Create a notebook and open the notebook using the sagemaker instance.

Import dataset and analyze the data for building machine learning models for predictive maintenance of devices.
Alt Text
A basic working version of SageMaker code is kept in Github. The feature engineering, model training and inference pipeline for near real-time data will be covered in a separate article in a detailed manner.

TimeStream

Pricing of timestream is 1 million writes of 1KB size = $0.50. Avg size of telemetry data is 145 bytes. Avg size of aggregated trip data is 800 bytes. By merging all the dimensions like speed, fuel_level etc at the VIN level, we can reduce the number of writes per batch and cost. A detailed explanation about payload structure and batching the writes have been explained in my previous article. Create a Rule and Action for Timestream if the data is aggregated at the source itself or process IoT events stored in MSK using the Kinesis Data Analytics for Flink or using a custom apache-spark application.

AWS MSK

AWS MSK was recently introduced as one of the Actions for IoT Core. IoT Rule Action can act as a producer for Managed Streaming Kafka. Create a VPC destination and add the cluster details and authentication mechanisms of the Kafka cluster to receive the messages.

Enable error logging for action
Alt Text
Enable CloudWatch Logs and cloud trail for IoT and AWS MSK to analyze the log and API calls.

Lambda

Create a lambda function to consume and process the data.
for partition in partitions:
consumer.seek_to_beginning(partition) #sample read. use offset

Alt Text
The diagnostic data is less in volume but needs immediate action. The lambda will consume, check for a problem code and issue STOP signal to the device by publishing a payload to MQTT topic meant for a shadow device.

response = IOTCLIENT.publish(topic=iottopic, qos=0, payload=json.dumps(msg))
Alt Text
The working version of the lambda function is kept in GitHub kafka_consumer.py

Conclusion

IoT data can be processed and stored by various AWS services based on the use cases. The serverless AWS services can aid the rapid development of large-scale complicated architecture in an easily scalable manner. The managed services like AWS MSK along with a choice of distributed processing frameworks like Apache Flink based Kinesis data analytics or Apache Spark based AWS Glue and AWS Lambda will help in building scalable highly available and cost-effective processing for IoT/TimeSeries events.

Discussion (4)

pic
Editor guide
Collapse
starpebble profile image
starpebble

Neat. My feedback: I really appreciate the discussion about Lambda. AWS makes it possible to use Lambda from IoT though there is a necessary service in the middle, AWS MKS. This is why computing with Lambda isn't so simple right now in bigger pictures. A service helps with reliability like MKS does here. TimeSeries is new, it's neat because it makes a nice website possible with visualizations. It's possibly easy to connect a website to TimeSeries with Amplify.

Collapse
jasondunn profile image
Jason Dunn

Excellent level of detail!

Collapse
emil profile image
Emil

Is there a specific reason why you choose MSK over Kinesis?

Collapse
prasanth_mathesh profile image
prasanth mathesh Author

The main advantage with MSK to migrate on-premise kafka clusters to the cloud without much development effort. Once Kinesis Data Analytics for Flink supports Python SDK, it will be a game changer.