I recently needed to get CloudWatch Logs to an AWS hosted Elasticsearch cluster via Firehose, and I came across a few sticking points that were not as well documented as I would have hoped. There are quite a few AWS resources involved in getting all of this done. Logs were originally generated by a Lambda function, which wrote to a log group in CloudWatch Logs. I put a subscription filter on that log group with a CloudWatch destination in a different account. That destination forwarded the logs to a firehose delivery stream which used a Lambda function acting as a transformer before the logs reached their final destination of Elasticsearch. Like I said, quite a few resources. The "Lambda function acting as a transformer" is called a "processor" in CloudFormation but it is referred to in documentation about data transformation. I will be referring to this Lambda processor as a transformer since that is its main purpose for the intent of this post.
Getting the CloudWatch Subscription Filter and Destination set up were not too difficult, and there is decent documentation which you can find here. The one caveat that I will note is that in this tutorial, AWS is building the destination for a Kinesis Data Stream, not a Kinesis Data Firehose (sometimes referred to as a Delivery Stream), so in step 5 where they create the permissions policy with "Action": "kinesis:PutRecord"
change kinesis
to firehose
like this "Action": "firehose:PutRecord"
. (After looking back through that tutorial it appears that they have added an example for Firehose on the same page. The goal for right now is just to get CloudWatch Logs to Firehose however you would like.)
By the time I got to Firehose, all of the infrastructure had already been set up by someone else. There were still a few remaining problems with the Lambda transformer that I had to stumble through though. Firstly, was the part where I missed how the payload from the transformer should be returned. I originally converted out of Base64, transformed the logs, converted back to Base64, and returned. I thought that Firehose would want the records to be formatted the same way they went in. After scouring Elasticsearch for my logs, I determined that I was wrong. I learned that there is a specific format that all transformed records need to be in. The returned payload needs to send back a flat JSON object that looks like this:
{
"recordId": "<recordId from original record>",
"result": "<Ok | Dropped | ProcessingFailed>",
"data": "<base64 transformed record to send>"
}
I looked a little deeper into my mistake and how I should format the data
field. CloudWatch Logs sent from a Subscription Filter come in batches but there was only one data
field and one recordId
for each batch. After a record from a CloudWatch Subscription Filter comes in and is Base64 decoded and unzipped it looks like this:
{
"owner": "123456789012",
"logGroup": "CloudTrail",
"logStream": "123456789012_CloudTrail_us-east-1",
"subscriptionFilters": [
"Destination"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
}
]
}
That's when I found a page mentioning combining multiple JSON documents into the same record. This gave me an answer to how I would convert multiple log events into a single Base64 string for the returned data
field. After I successfully rewrote the transformer to output the transformed logEvents
in single-line JSON, I tested it out. This worked whenever logEvents
was only one object long, but it did not work whenever larger batches were passed in. I went back to the drawing board and read something about Elasticseach bulk inserts needing to be newline (\n
) delimited. I added a newline in after every JSON object instead of leaving them as single-line JSON just to test it out. This time I got errors back from Elasticsearch saying Malformed content, found extra data after parsing: START_OBJECT
. At least this was something concrete to go off of.
I started Googling and found a Reddit thread and AWS support ticket that were tied together. It turns out you need to reingest the records individually so ElasticSearch only receives one log at a time, and I found an example in a deep dark rabbit hole. A solution is mentioned that can be found through a lot of digging on AWS's site, and I found the GitHub repo for that code. There is an example of reingestion near the bottom of that application. I ended up using putRecordBatch
from the AWS SDK to put the logEvents
back into Firehose as individual records.
When I converted over to reingesting logEvents
as individual records, I finally saw logs in Elasticsearch. But now I saw too many logs. Strange. The logs were being duplicated. When I changed result
to Dropped
in the response for the original CloudWatch Logs batched record, I changed data
to a hardcoded string. What a previously mentioned link (https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-status-model) fails to mention is that you can’t have data
if result
is Dropped
or Firehose will reprocess the record. Once I got rid of the hardcoded string in data
in the transformation Lambda’s response, everything clicked and I started seeing my logs in Elasticsearch without being duplicated.
The code in the aforementioned GitHub repo does something that I wanted to highlight. After Base64 decoding and uncompressing a record, check for record.messageType === 'DATA_MESSAGE'
(Javascript). This condition signifies that the record is coming straight from CloudWatch Logs but a record without messageType
means that the record is being reingested. Making this check also allows Firehose log producers to directly PUT to Firehose as we do in the transformer for reingestion. I would suggest making all of your transformations on the reingested logs only and do not make any logic along the lines of checking how many logEvents
are coming from CloudWatch Logs. Just automatically reingest and handle the transformation on the reingested logs, it will make the logic much easier to debug.
There were numerous errors that I had to debug while getting this setup, and the answers were vague and difficult to find. I hope this can be your last stop if you are running into the same issues I was.
Top comments (0)