In this post, we are going to see how to create a serverless analytics architecture based off an event-driven architecture. This is made possible with the help of services like Kinesis Data Firehose, EventBridge, Athena, Glue, Lambda and S3.
To avoid rewriting yet another definition of this type of architecture, let's turn to AWS's definition: "An event-driven architecture relies on events to trigger communication between decoupled services. This type of architecture is common in modern applications based on microservices. An event can be a state change or update, such as adding an item to the cart on an e-commerce site. Events can refer to a state (purchased item, modified price, and entered delivery address) or consist of identifiers (shipment notification of an order).
Event-driven architectures have three key elements:
- Event producers
- Event routers
- Event consumers
A producer sends an event to the router, which filters the events and then passes them on to consumers. The producer and consumer services are decoupled, which allows them to be scaled, updated, and deployed independently." (source: https://aws.amazon.com/event-driven-architecture/)
Now that you are familiar with this type of architecture (or at least understand the principle), it is important to understand that events become the heart of our application. Without them, the application no longer makes sense, and it stops functioning as it is no longer receiving input. We quickly realize that storing these events is essential. The marketing team would like to know how many products were sold in the last few hours, days, months, or years. On the other hand, architects would like to create a new service that would consume one or more existing events. They would therefore like to know the number of events produced in x time.
If you are also convinced that storing and analyzing these events is essential, you're lucky because you've come to the right place.
The complete code presented in this article is available here : https://gitlab.com/bootstrap-stack/serverless-analytics
Producing events on AWS
The central service of our architecture is AWS EventBridge, the event bus service released by AWS in July 2019. This service is widely used and in fact, used internaly by other AWS services. Config rules, for example, are triggered by events passing through AWS event buses. Each service has its own events that are emitted into EventBridge (from the success/failure of a Lambda function to a scale up or down in an auto scaling group), and a lot of information passes through these buses (example for auto scaling events: https://docs.aws.amazon.com/autoscaling/ec2/userguide/automating-ec2-auto-scaling-with-eventbridge.html)
But how do we create our own data bus in EventBridge and then send events into? It's simple - you can do it directly in the console or using your favorite infrastructure-as-code tool. In this article, I will use AWS Cloud Development Kit (CDK) to deploy my infrastructure (the code used will be in TypeScript)
const businessEventsBus = new EventBus(app, ‘IpponBlogDemo’)
Now that we have created our bus, let me tell you about the formalism to use when sending events to EventBridge. An event consists of 5 fields:
- time: the time and date of our event that we are producing.
- source: the source corresponds to the domain from which your event was sent (e.g. sales domain).
- detailType: defines the action related to the event. If we stick to the perspective of an e-commerce store, we could have "item.sold" for example in this field.
- detail: the content of the event, this is where we will put specific fields (product name, price, etc.).
- event bus name: the name of the bus in which we want to send the event.
All of these fields are encapsulated in a JSON object and in our case, sent via a TypeScript class that uses the AWS SDK to call the EventBridge API.
import { EventBridgeClient, PutEventsCommand } from '/opt/nodejs/node_modules/@aws-sdk/client-eventbridge'
export default class EventSenderEventBridge {
private client = new EventBridgeClient({})
private busName = ‘myBusName’
send = async () => {
const query = new PutEventsCommand({
Entries: [{
EventBusName: this.busName,
Source: “sales”,
DetailType:”item.sold”,
Detail: {“name”: “toothbrush”, “price”: 9.99, “quantity”: 1},
}],
})
await this.client.send(query)
}
}
How to store our data?
As I am a huge fan of the serverless world and managed services (yes, it does make life so much easier), having a small wallet, I turned to a flexible stack.
We will use S3 to store our data. This support is perfect for our use case. It's cheap, ultra-flexible, and 100% managed by AWS.
We have the support for our data, we know how to easily produce events. Now, we need that little glue between our receptacle and the events.
This glue will be provided by the Kinesis service, particularly Kinesis Data Firehose. Indeed, there is an existing integration between Kinesis Data Firehose and EventBridge with S3 bucket as the destination!
First, you may not know the Kinesis Data Firehose service. Here's the AWS definition: Amazon Kinesis Data Firehose is an Extract, Transform, and Load (ETL) service that captures, transforms, and reliably delivers streaming data to data lakes, data stores, and analytics services. (https://aws.amazon.com/kinesis/data-firehose/)
It may not be very clear yet, so here's a diagram illustrating how the service works:
We can see on the left, the "20+ AWS Services" that contains the EventBridge service. So, our events will be transmitted to KDF (Kinesis Data Firehose), where we can easily transform them using a lambda or tools provided by AWS. The result will finally be transmitted to the destination, in our case, an S3 bucket.
How to create and configure a Kinesis Data Firehose stream
Now, let's move on to the creation of our infrastructure with CDK, and I'll try to explain the different fields required when creating our stream.
First, you need to create an IAM role with certain permissions. To avoid writing too much code, I will only create the role and add an IAM policy to it to show you how to do it:
const deliveryStreamRole = new Role(
construct, props.eventTypeUppercased + 'DeliveryStreamRole', {
assumedBy: new ServicePrincipal('firehose.amazonaws.com'),
},
)
deliveryStreamRole.addToPolicy(new PolicyStatement({
resources: ['*'],
actions: [
'logs:CreateLogGroup',
'logs:PutLogEvents',
'logs:CreateLogStream',
],
}))
Now, you need to add certain IAM policies (depending on your needs):
- For the S3 service: AbortMultipartUpload, GetBucketLocation, GetObject, ListBucket, ListBucketMultipartUploads, PutObject.
- For the Lambda service (if you want to transform your input objects into a format different from that of EventBridge): InvokeFunction, GetFunctionConfiguration.
- For the Glue service (we will see the usefulness of this service later): GetDatabase, GetTable, GetPartition*, GetTableVersions.
Permissions are now created. It's time to create our S3 bucket to store our data:
const destinationBucket = new Bucket(
construct,
‘BlogIpponDemoBucket’,
{ bucketName: ‘blog-ippon-destination-repository’ }
)
Let's move on to the big part, the creation of our Kinesis Data FireHose (KDF) stream. I will break down the creation into several steps to explain what each of them corresponds to.
First, in the CDK document for the KDF service, we need to choose a "destination configuration" to pass as an object in the constructor. Here is a link to the documentation: https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_kinesisfirehose.CfnDeliveryStream.html#construct-props
There are different destinations that can be found there, but the one we are interested in is S3, more specifically the extendedS3DestinationConfiguration object. The following code will be the configuration of this object.
For readability reasons, we will add intermediate variables for certain parts. Two variables will be needed, the first concerns the processingConfiguration field and the second concerns the dataFormatConversionConfiguration field.
const processingConfiguration = {
enabled: true,
processors: [
{
type: 'Lambda',
parameters: [
{
parameterName: 'LambdaArn',
parameterValue: ‘BlogIpponDemoLambdaTransformationArn’,
},
],
},
{
type: 'AppendDelimiterToRecord',
parameters: [
{
parameterName: 'Delimiter',
parameterValue: '\\n',
},
],
}
],
}
We are now in the processingConfiguration section. This section comes into play once the data has passed through the buffer (which I will explain later). Either we do nothing, in which case the data goes directly to our destination, or we decide to transform it before storing it. In our case, our source data is an EventBridge event.
We would like to be able to transform the source event into something more meaningful, something that will make sense when we come to analyze it. In this case, we will use a small lambda that we will have built. It's a very basic lambda that takes JSON as input and transforms it into another JSON as output (our business event).
As a result, we have streamlined the source data by removing unnecessary fields. The second processor is just there to add a delimiter between each of our records. The data is now ready to go to our S3 bucket.
const dataFormatConversionConfiguration = {
enabled: true,
inputFormatConfiguration: { deserializer: { openXJsonSerDe: {} } },
outputFormatConfiguration: { serializer: { parquetSerDe: {} } },
schemaConfiguration: {
catalogId: props.glueDatabase.catalogId,
roleArn: deliveryStreamRole.roleArn,
databaseName: props.glueDatabaseName,
tableName: props.glueTableName,
},
}
Our second variable is dataFormatConversionConfiguration. It allows us to configure the format part of data conversion (input and output) and to define a schema for our data (this is where Glue comes in).
Here, our input events use the openXJsonSerDe serializer and for the output, in order not to store the data as is, we will store it in Apache Parquet format, which is a columnar format (which will reduce costs). We will then use the parquetSerDe deserializer.
Now it's time to define a schema for our data. This schema is contained in a Glue table, which itself is contained in a database. Here we only specify where our schema is stored (part schemaConfiguration).
It is now time to assemble everything in our configuration object by adding the other fields.
const deliveryStream = new CfnDeliveryStream(construct, props.eventTypeUppercased + 'DeliveryStream', {
extendedS3DestinationConfiguration: {
bucketArn: props.destinationBucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
bufferingHints: {
intervalInSeconds: 60,
},
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: 'kinesis-log-group',
logStreamName: 'kinesis-logs',
},
prefix: 'year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}custom_partition=!{partitionKeyFromLambda:custom_partition}/',
errorOutputPrefix: 'errors/!{firehose:error-output-type}/',
processingConfiguration: processingConfiguration,
dataFormatConversionConfiguration: dataFormatConversionConfiguration,
dynamicPartitioningConfiguration: { enabled: true, retryOptions: { durationInSeconds: 10 } },
},
}
)
The first two fields correspond to the ARN of the destination S3 bucket and the ARN of the IAM role to use.
The bufferingHints field is interesting as it allows us to configure the management of our stream buffer. The buffer corresponds to either the data retention time or the minimum amount of data to be sent.
Two fields are available, intervalInSeconds which corresponds to the time part (how long to wait). The value of this field can range from 60s to 900s and defaults to 300s (we wait at least 1 minute between each send and up to 15 minutes). Then we have the sizeInMBs field, which corresponds to the amount of data we should wait for before sending. This amount varies between 1 and 128 with a default value of 5.
We now move on to the part concerning the logs. Here we have a cloudWatchLoggingOptions object that allows us to activate logs in CloudWatch by setting the name of the CloudWatch Group and Stream.
We now have one of the crucial parts to define our stream, the prefix to use (prefix) to know where to send our data in our S3 bucket (and especially, with which partitions). Here, I wanted to have a separation by datetime with a compartment per year/month/day and then by a custom partition that we will see later. We use a partitioning mode based on Hive to take advantage of dynamic partitioning. It's just necessary to know that we need to define our partition keys with this formalism "partition_key=!{We retrieve the key here}". We do the same for the output of processing or sending errors with the errorOutputPrefix keyword. The fact that errors are sent directly to the S3 bucket is super convenient, we can easily see if there are errors. We can also automate the detection of these errors by building a lambda that will be triggered when a file is added to this error compartment.
To finish creating our stream, we will activate and configure dynamic partitioning using the dynamicPartitioningConfiguration keyword. Dynamic partitioning is recent and offers many advantages. The first is to be able to query our data according to keys that we define ourselves. These keys will allow us to build SQL queries with WHERE clauses. This will allow Athena (the service we will use later to query) to only pick the data we really care about (and therefore not scan unnecessary data). So we will have an improvement in performance, cost, and the number of scanned data. The downside is that this option is recent and not necessarily well documented at the moment.
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
How to create and configure a Glue table to use with KDF
Here, the use of Glue that we make is based on the definition of a data schema in a database. It should be noted that Glue is a serverless data integration service that allows analytics users to easily discover, prepare, move, and integrate data from multiple sources. So it is much more complete and complex than the use we make of it here.
We are almost there, just two more steps and we can finally query our business events from EventBridge. Let's move on to the creation of a Glue database and table:
const glueDatabase = new CfnDatabase(scope, 'glue-database', {
catalogId: ‘IpponBlogDemoAwsAccountId’,
databaseInput: {
name: ’IpponBlogDemoGlueDatabase’,
},
}
)
There's no need to explain this step, as the code is pretty basic with what you have seen in the article by now.
For the creation of the table, the interesting configuration will be located in the tableInput object.
Since this object is quite large, we will have to use a few intermediate variables. These variables will be partitionKeys and columns.
const partitionKeys = [
{
'name': 'year',
'type': 'string',
},
{
'name': 'month',
'type': 'string',
},
{
'name': 'day',
'type': 'string',
},
{
‘name’ : ‘custom_partition’,
‘type’: ‘string’,
}
]
This first object allows us to define partition keys. These keys correspond to what we put in the prefix section when configuring our stream earlier. The "custom_partition" field here is just to remind us of the partition that we added thanks to the transformation lambda.
const columns = [
{
‘name’: ‘name’,
‘comment’: 'Name of the item’,
‘type’: 'string'
},
{
‘name’: ‘price’,
‘comment’: 'Price of the item’,
‘type’: 'string'
},
{
‘name’: ’quantity’,
‘comment’: ’Quantity of item’,
‘type’: 'string'
},
]
This field will allow us to define the schema that our data will have in our S3 bucket. Here we will have a name, a price, and a quantity for each item.
We can now assemble everything in our CDK object.
const glueTable = new CfnTable(scope, 'glue-table-for-athena', {
databaseName: ’IpponBlogDemoGlueDatabase’’,
catalogId: ‘IpponBlogDemoAwsAccountId’,
tableInput: {
name: ‘IpponBlogDemoGlueTable’,
tableType: 'EXTERNAL_TABLE',
partitionKeys: partitionsKeys,
storageDescriptor: {
compressed: true,
inputFormat: 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
outputFormat: 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
serdeInfo: {
serializationLibrary: 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe',
},
columns: columns,
location: ‘IpponBlogDemoS3BucketUrl’,
},
},
})
The storageDescriptor field will allow us to describe how the data will be stored. Here, we will specify that our data will be compressed, our input format will be Apache Parquet, and our output format will also be Apache Parquet. As I am not an expert in this area, I will not go into the details of this configuration, but it has the merit of working. We will simply need to define the location of our data (where it is stored) using the location keyword (we will pass in the URL of our S3 bucket).
We still need to describe to Glue what these partition keys correspond to (i.e. the associated type, possible values, and the number of digits possible, for example). To do this, we will use the addPropertyOverride() function on the table we created earlier.
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.enabled', true)
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.year\\.type', 'integer')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.year\\.range', '2022,2050')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.year\\.digits', '4')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.month\\.type', 'integer')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.month\\.range', '1,12')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.month\\.digits', '2')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.day\\.type', 'integer')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.day\\.range', '1,31')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.day\\.digits', '2')
glueTable.addPropertyOverride('TableInput.Parameters.projection\\.custom_partition\\.type', 'injected')
glueTable.addPropertyOverride('TableInput.Parameters.storage\\.location\\.template', 's3://ippon-blog-demo-analytics-repository/year=${year}/month=${month}/day=${day}/custom_partition=${custom_partition}')
In our case, we specify that the year partition is a 4-digit integer between 2022 and 2050. We repeat the process for months and days and let Glue define the type of our custom partition itself using the injected keyword. It's very convenient but has a big disadvantage: we can only perform equalities in SQL queries on this field afterwards (which can be annoying when we want to retrieve information for a specific period and not just a fixed date, for example).
And there you have it, the job is done.
How to query data in S3?
We now have a functional stack that allows us to retrieve events sent to EventBridge, transform them into a business-oriented format, and send them to an S3 bucket with beautiful partitions. The only thing left to do is to query all this data. When it comes to analytics and S3, we often think of the AWS Athena service. Athena is a service that allows you to easily query an S3 bucket (and many other data sources), using SQL language and Apache Spark under the hood. In addition to that, Athena is a completely managed service by AWS, no infrastructure management required and that's awesome.
The best part of the story? You don't need to do anything: once you get to the Athena console, simply choose your Glue database, select the table in question, configure the destination for query results and you'll be ready to query your data.
What's magical is that thanks to storing our data in the parquet format combined with dynamic data partitioning, this entire stack will cost you very little money. Unless, of course, you produce a huge number of events (because yes, Athena can be expensive). And when I say huge, I really mean a lot. On a personal project, I'm at around 100,000 events per day and the stack doesn't cost me a penny with AWS's free plan. The query over 10 days, or ~1m events, represents 80MB of scanned data. Considering that Athena's price is €5 per terabyte of scanned data, this query represents a little less than 0.0005 cents. Of course, it all depends on the size of your events.
In conclusion, it is possible to optimize performance when querying with Athena. Indeed, the fewer files we have in our S3 destination bucket, the faster Athena will execute the query. It is therefore possible, with the help of an EMR (Elastic MapReduce) job, to join the files together to have fewer, but larger files.
In terms of costs, you also need to be careful with the Athena query result bucket and not hesitate to set lifecycle configurations on the buckets to expire objects that you no longer need (queries can quickly reach gigabytes of results). You should also think carefully about what you want to store. Storing just for the sake of storing is not very useful (either for the business or for the wallet). Only store what can add value to the queries you are going to make, the rest is certainly not useful for analytics.
Top comments (8)
Extraordinary introduction. Thanks!
For some reason, when I try to deploy, I get the error "Error: DemoBlogIppon/EventAnalytics/glue-database-for-athena [AWS::Glue::Database] is missing required property: catalogId at Object.requireProperty "
Guessing I need to pre-create some Athena resources on the CLI?
I'm fairly new to Athena and higher level constructs. Appreciate any trailheads or tips.
Ah, going off the gitlab repo, I realize now, have to explicitly add env variable ACCOUNT_ID=1234567 in front of synth and deploy commands on CLI.
Hello Vincent, thanks a lot for your comment.
I might have missed some step since I didn't start it off from the beginning on a new computer. I will do it this day and provide you information about what to do and/or fix errors directly onto the repo. Will come back to you asap :)
I have just updated the readme and a part of the lambda code. It should work now !
Terrific! All the best.
Great write-up. We also recently published an article on how to bridge Backend and Data Engineering teams using Event Driven Architecture - packagemain.tech/p/bridging-backen...
Great write-up with lot of details. Thanks for sharing
Thanks mate. It's a pleasure