Introduction
AWS Timestream is a fast, scalable, and serverless time series database service for IoT and operational applications. Lambda is a computing service that lets you run code without provisioning or managing servers. This guide will walk you through creating a Timestream database and inserting the data from the s3 bucket's Excel file using a Lambda function.
1. Create an initial lambda function
Follow Developing AWS Lambda Functions In Locally to create the initial lambda function.
2. Create additional scripts
I have created a Python script to create a S3 bucket and upload the Excel file into it.
create_and_upload.py
import boto3
import botocore
import os
def create_s3_bucket(bucket_name, region=None):
s3_client = boto3.client('s3', region_name=region)
try:
if region is None:
s3_client.create_bucket(Bucket=bucket_name)
else:
location = {'LocationConstraint': region}
s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
except botocore.exceptions.ClientError as e:
print(f"Error occurred while creating bucket: {e}")
return False
return True
def upload_file_to_s3(file_name, bucket_name, object_name=None):
# Check if the file exists
if not os.path.exists(file_name):
print(f"Error: The file {file_name} does not exist.")
return False
if object_name is None:
object_name = os.path.basename(file_name) # Extracts just the file name from the full file path
s3_client = boto3.client('s3')
try:
s3_client.upload_file(file_name, bucket_name, object_name)
print(f"File '{file_name}' has been uploaded to bucket '{bucket_name}' as '{object_name}'")
return True
except Exception as e:
print(f"Error occurred while uploading file: {str(e)}")
return False
def main():
bucket_name = 's3-bucket-name' # Replace with your unique bucket name
region = 'region-name' # Replace with your desired region
excel_file_path = r'excel-file-path.xlsx' # Replace with your local Excel file path
# Create S3 bucket
if create_s3_bucket(bucket_name, region):
print(f"Bucket '{bucket_name}' created successfully.")
# Upload file to S3
if upload_file_to_s3(excel_file_path, bucket_name):
print(f"File '{excel_file_path}' uploaded successfully to '{bucket_name}'.")
if __name__ == '__main__':
main()
3. Write the lambda function
3.1 File Structure
CSVTimestream
|
|--events
| |---event.json
|
|--timestream
| |---app.py
|
|---samconfig.toml
|---template.yaml
My template.yaml file will be the as follows.
3.2 Codes
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
CSVTimestream
Sample SAM Template for CSVTimestream
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 300
MemorySize: 128
Resources:
TimestreamLambdaFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: timestream/
Handler: app.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Role: !GetAtt LambdaExecutionRole.Arn
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: [ lambda.amazonaws.com ]
Action: [ 'sts:AssumeRole' ]
Policies:
- PolicyName: TimestreamAccessPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [ 'timestream:*' ]
Resource: '*'
- PolicyName: S3BucketAccessPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [ 's3:GetObject' ]
Resource: '*'
My lambda function will be the as follows.
app.py
import boto3
import pandas as pd
from botocore.exceptions import ClientError
from io import BytesIO
# Initialize clients
s3_client = boto3.client('s3')
timestream_write = boto3.client('timestream-write')
# Constants
database_name = 'timestream-db-name'
BUCKET_NAME = 's3-bucket-name'
OBJECT_KEY = 'excel-file-name.xlsx'
def create_database(database_name):
try:
timestream_write.create_database(DatabaseName=database_name)
print(f"Database {database_name} created.")
except ClientError as e:
print(f"Database creation failed: {e}")
def create_table(table_name):
try:
retention_properties = {
'MemoryStoreRetentionPeriodInHours': 24,
'MagneticStoreRetentionPeriodInDays': 7
}
timestream_write.create_table(DatabaseName=database_name, TableName=table_name,
RetentionProperties=retention_properties)
print(f"Table {table_name} created.")
except ClientError as e:
print(f"Table creation failed: {e}")
def get_excel_file(bucket, key):
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=bucket, Key=key)
return BytesIO(response['Body'].read())
def process_excel_file(excel_file):
# Read the Excel file
xls = pd.ExcelFile(excel_file)
# Process each sheet in the Excel file
for sheet_name in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet_name)
# Create a table for each sheet
create_table(sheet_name)
# Write records to Timestream
write_records(df, sheet_name)
def write_records(df, table_name):
records = []
version = 1 # Start with a base version number
for index, row in df.iterrows():
time_str = row['time'].replace('"', '')
time_dt = pd.to_datetime(time_str)
timestamp_ms = int(time_dt.timestamp() * 1000)
# measure_value = row['measure_value::double']
# Build the list of dimensions.
dimensions = [
{'Name': 'col_1_name', 'Value': str(row['col_1_name'])},
{'Name': 'col_2_name', 'Value': str(row['col_2_name'])}
.
.
.#continue this based on your Excel file columns
]
# Include additional dimensions based on the sheet structure.
if 'addi_col' in df.columns:
dimensions.append({'Name': 'addi_col', 'Value': str(row['addi_col'])})
record = {
'Dimensions': dimensions,
'MeasureName': row['col_name'],
'MeasureValue': str(row['col_name::double']), # i have added this based on my Excel file
'MeasureValueType': 'DOUBLE',
'Time': str(timestamp_ms),
'Version': version # Adding a version number
}
records.append(record)
version += 1 # Increment the version for each record
try:
result = timestream_write.write_records(DatabaseName=database_name, TableName=table_name,
Records=records, CommonAttributes={})
print(
f"Records written to table {table_name} successfully with status: {result['ResponseMetadata']['HTTPStatusCode']}")
except timestream_write.exceptions.RejectedRecordsException as e:
print("Error writing records:", e)
for rejected_record in e.response['RejectedRecords']:
print("Rejected Record:", rejected_record)
except ClientError as e:
print(f"Error writing records: {e}")
def lambda_handler(event, context):
# Create the Timestream database
create_database(database_name)
# Get the Excel file from S3
excel_file = get_excel_file(BUCKET_NAME, OBJECT_KEY)
# Process the Excel file
process_excel_file(excel_file)
return {
'statusCode': 200,
'body': "Data loaded to Timestream successfully."
}
samconfig.toml will be as follows.
samconfig.toml
# More information about the configuration file can be found here:
# https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html
version = 0.1
[default]
[default.global.parameters]
stack_name = "CSVTimestream"
[default.build.parameters]
cached = true
parallel = true
[default.validate.parameters]
lint = true
[default.deploy.parameters]
capabilities = "CAPABILITY_IAM"
confirm_changeset = true
resolve_s3 = true
s3_prefix = "CSVTimestream"
region = "aws-region"
image_repositories = []
[default.package.parameters]
resolve_s3 = true
[default.sync.parameters]
watch = true
[default.local_start_api.parameters]
warm_containers = "EAGER"
[default.local_start_lambda.parameters]
warm_containers = "EAGER"
4. Finally
Deploy the lambda function and test it using the local invoke
command. You'll see the Timestream DB has been created and its tables with data.
Top comments (0)