DEV Community

Zied Ben Tahar for AWS Community Builders

Posted on • Edited on • Originally published at levelup.gitconnected.com

RAG on media content with Bedrock Knowledge Bases and Amazon Transcribe

Photo by [Pawel Czerwinski](https://unsplash.com/@pawel_czerwinski?utm_source=medium&utm_medium=referral) on [Unsplash](https://unsplash.com?utm_source=medium&utm_medium=referral)

In a previous article, I wrote about building an application capable of generating summaries from YouTube videos. It was fun to build. However, the solution was limited as it was only handling Youtube videos and relied on YouTube’s generated transcripts. I wanted to improve this solution and make it more versatile as well as supporting other media formats.

In this article, I take a different approach: Transcribing media files with Amazon Transcribe and using the generated transcripts as a knowledge base, allowing for retrieval-augmented generation with Amazon Bedrock.

RAG with Amazon Bedrock Knowledge Bases

Retrieval-Augmented Generation (RAG) is a technique that improves model responses by combining information retrieval with prompt construction: When processing a query, the system first retrieves relevant data from custom knowledge bases. It then uses this data in the prompt, enabling the model to generate more accurate and contextually relevant answers.

RAG significantly improves the model’s ability to provide informed, up-to-date answers, bridging the gap between the model’s training data and custom up-to-date information.

Typical retrieve and generate flow

How Bedrock can help ?

Amazon Bedrock’s Knowledge Bases simplify the RAG process by handling much of the heavy lifting. This includes synchronizing content from Amazon S3, chunking, converting it into embeddings, and storing them in vector databases. It also provides endpoints that allow applications to query the knowledge base while generating responses based on the retrieved data. By handling these tasks, Bedrock allows to focus on building AI-powered applications rather than managing infrastructure.

In this article, I will use RAG for media transcripts to generate responses based on audio or video content. These contents can be meeting recordings, podcasts, conference talks, and more.

Solution overview

The architecture of a typical RAG system consists of two components:

  • Knowledge base indexing and synchronisation

  • Retrieval and generation

Architecture overview

Users first request an upload link by invoking the Request Media upload link function, which generates an S3 presigned URL. The user’s request includes the media metadata such as the topic, the link to the media and date. This metadata will be stored and used downstream by Bedrock to apply filtering during the retrieval phase. When the upload process completes in the media bucket, the Start transcription job function is triggered by media bucket event notification.

When a transcription job state changes, EventBridge will publish job completion status events (Success or Failure). The Handle transcription and sync knowledge base function handles only successful events, extracts the transcription content, stores the extracted text transcript in the knowledge base bucket, and triggers a knowledge base sync.

The vector database is an important part of a RAG system. It stores and retrieves text representations as vectors (also known as embeddings). allowing for similarity searches when given a query. Bedrock supports various vector databases, including Amazon OpenSearch, PostgreSQL with the pgvector extension, and Pinecone. Each option has its advantages. In this solution, I chose Pinecone as it is a serverless service that allows for quick and easy setup.

When using Knowledge Bases, the Model used for generating embeddings can differ from the one used for response generation. For example, in this sample, I use “Amazon Titan Text embedding v2” for embedding and “Claude 3 Sonnet” for response generation.

️Note: In this article, I did not include the parts that handle transcription job monitoring and asynchronous job progress notifications to the requester. For insights on building an asynchronous REST APIs, you can refer to this previous article.

Alright, let’s deep dive into the implementation

Solution Details

This time, I am taking a different approach compared to my previous articles: I will be using Rust for lambda code and terraform for IaC.

1- Creating the Pinecone vector database

I try to do IaC whenever possible. The good news is that Pinecone offers a Terraform provider, which simplifies managing Pinecone indexes and collections as code. First we’ll need an API Key:

Creating API Key from pinecone console

Here, I am using the serverless version of Pinecone. We need to set the PINECONE_API_KEY environment variable to the API key we just created so that it can be used by the provider.

terraform {
required_providers {
pinecone = {
source = "pinecone-io/pinecone"
}
}
}
provider "pinecone" {}
resource "pinecone_index" "media_transcriptions" {
name = "${var.application}${var.environment}-media-transcriptions"
dimension = 1024
metric = "cosine"
spec = {
serverless = {
cloud = "aws"
region = "us-east-1"
}
}
}
view raw pinecone.tf hosted with ❤ by GitHub

2- Creating the knowledge base

Creating the knowledge base involves defining two key components: the vector store configuration, which points to Pinecone, and the data source.

The data source dictates how the content will be ingested, including the storage configuration and the content chunking strategy.

resource "aws_bedrockagent_knowledge_base" "this" {
name = "${var.application}-${var.environment}-media-kb"
role_arn = aws_iam_role.kb_role.arn
knowledge_base_configuration {
vector_knowledge_base_configuration {
embedding_model_arn = "arn:aws:bedrock:${data.aws_region.current.name}::foundation-model/amazon.titan-embed-text-v2:0"
}
type = "VECTOR"
}
storage_configuration {
type = "PINECONE"
pinecone_configuration {
connection_string = "https://${pinecone_index.test.host}"
credentials_secret_arn = aws_secretsmanager_secret.pinecone_api_key.arn
field_mapping {
metadata_field = "metadata"
text_field = "text"
}
}
}
}
view raw kb.tf hosted with ❤ by GitHub

For the data source, I am setting the chunking strategy to FIXED_SIZE
resource "aws_bedrockagent_data_source" "this" {
knowledge_base_id = aws_bedrockagent_knowledge_base.this.id
name = "kb_datasource"
vector_ingestion_configuration {
chunking_configuration {
chunking_strategy = "FIXED_SIZE"
fixed_size_chunking_configuration {
max_tokens = 300
overlap_percentage = 20
}
}
}
data_source_configuration {
type = "S3"
s3_configuration {
bucket_arn = aws_s3_bucket.kb_bucket.arn
inclusion_prefixes = ["transcripts"]
}
}
}
view raw datasource.tf hosted with ❤ by GitHub

After deployment, you will be able to view in the console the data source configuration:

Data source configuration on the console

3- Requesting Media upload link

This function is invoked by the API Gateway to generate a presigned URL for media file uploads. A unique identifier is assigned to the object, which will also serve as the transcription job name and as the reference for the knowledge base document.


The request is validated to ensure that the media metadata properties are properly defined. I am using the serde_valid crate to validate the request payload. This crate is very convenient for defining schema validations using attributes.
#[derive(Default, Debug, Clone, PartialEq, Deserialize, Validate)]
#[serde(rename_all = "camelCase")]
pub struct GenerateMediaUploadLinkRequest {
#[validate(min_length = 5)]
pub topic: String,
#[validate(min_items = 1)]
pub presenters: Vec<String>,
#[validate(custom = |v| validate_date_format(v))]
pub date: String,
}
fn validate_date_format(date_str: &str) -> Result<(), serde_valid::validation::Error> {
match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
Ok(_) => Ok(()),
Err(_) => Err(Error::Custom(format!(
"Invalid date format {}. Expected format is yyyy-MM-dd.",
date_str
))),
}
}

And here is are details of the generate_presigned_request_uri
async fn generate_presigned_request_uri(
s3_client: &Client,
media_bucket_name: &str,
task_id: &str,
) -> Result<String, Error> {
let key = format!("media-uploads/{}", task_id);
let expires_in = Duration::from_secs(15 * 60);
let presigned_request = s3_client
.put_object()
.bucket(media_bucket_name)
.key(key)
.metadata("task_id", task_id)
.presigned(PresigningConfig::expires_in(expires_in)?)
.await?;
let presigned_request_uri = presigned_request.uri();
Ok(presigned_request_uri.to_string())
}

4- Handling media upload and starting transcription job

This function is triggered by an S3 event whenever a new file is successfully uploaded. As a convention, I am using the media object key as the transcription job name which is the unique identifier of the task.

async fn start_transcription_job(
event: LambdaEvent<S3Event>,
transcribe_client: &aws_sdk_transcribe::Client,
) -> Result<(), Error> {
for record in event.payload.records {
let object_key = record.s3.object.key.unwrap();
let task_id = object_key.split("/").last().unwrap();
let output = transcribe_client
.start_transcription_job()
.transcription_job_name(task_id)
.settings(
Settings::builder()
.show_speaker_labels(true)
.max_speaker_labels(5)
.build(),
)
.identify_language(true)
.media(
Media::builder()
.media_file_uri(format!(
"s3://{}/{}",
record.s3.bucket.name.unwrap(),
&object_key
))
.build(),
)
.tags(
Tag::builder()
.key("task_id")
.value(task_id)
.build()
.unwrap(),
)
.send()
.await;
if let Err(err) = output {
return Err(Box::new(err));
}
}
Ok(())
}

This function needs transcribe:startTranscriptionJob permission in order to be able to start a transcription task.

Once the task is started, we can monitor the transcription job process in the console:

Transcription job details

5- Subscribing to transcription success events and syncing knowledge base

Let’s first have a look into the event bridge rule definition in terraform:

resource "aws_cloudwatch_event_target" "transcription_success" {
rule = aws_cloudwatch_event_rule.transcription_success.name
target_id = "handleTranscriptionSuccess"
arn = aws_lambda_function.handle_successful_transcription.arn
dead_letter_config {
arn = aws_sqs_queue.transcription_dlq.arn
}
retry_policy {
maximum_event_age_in_seconds = 60 * 60
maximum_retry_attempts = 10
}
input_transformer {
input_paths = {
transcriptionJob: "$.detail.TranscriptionJobName"
}
input_template = <<TEMPLATE
{
"transcriptionJob":"<transcriptionJob>"
}
TEMPLATE
}
}

Which translates to the following configuration in the AWS console:

Transcription success event bridge rule

Here, the Handle successful transcription function is invoked each time a transcription is successfully completed. I am only interested in having the transcription job name, as I will use it as the data source object key:

async fn handle_transcription_job(
event: LambdaEvent<Value>,
transcribe_client: &aws_sdk_transcribe::Client,
s3_client: &aws_sdk_s3::Client,
bedrock_agent_client: &aws_sdk_bedrockagent::Client,
kb_bucket_name: &str,
kb_id: &str,
data_source_id: &str,
media_bucket_name: &str,
) -> Result<(), Error> {
let e: TranscriptionSuccessEvent = serde_json::from_value(event.payload)?;
let job_name = e.transcription_job;
let file_url = transcribe_client
.get_transcription_job()
.transcription_job_name(&job_name)
.send()
.await?
.transcription_job
.ok_or_else(|| Error::from("Transcription Job error"))?
.transcript
.ok_or_else(|| Error::from("Transcript error"))?
.transcript_file_uri
.ok_or_else(|| Error::from("Transcript file uri error"))?;
match reqwest::get(file_url).await {
Ok(resp) => {
let transcription_result = resp.json::<TranscriptionResult>().await?;
let transcription_content: Vec<String> = transcription_result
.results
.transcripts
.iter()
.map(|t| t.transcript.clone())
.collect();
let result = transcription_content.join(" ");
let metadata =
get_staging_media_metadata(s3_client, media_bucket_name, &job_name).await?;
store_metadata_content(
s3_client,
kb_bucket_name,
&job_name,
&result,
&metadata.to_string(),
)
.await?;
bedrock_agent_client
.start_ingestion_job()
.knowledge_base_id(kb_id)
.data_source_id(data_source_id)
.send()
.await?;
}
Err(err) => {
error!({ %err }, "downloading transcription");
return Err(Box::new(err));
}
};
Ok(())
}

This function first retrieves the transcription result content available at the transcript_file_uri, extracts the important part and stores it in the knowledge base bucket as well as its metadata and then triggers a start_ingestion_job. If the operation fails, it will be retried by EventBridge and eventually put into a dead letter queue.

☝️**Note: **I opted against using a step function for this part since the transcribe output could exceed 256 KB.

6- Chatting with the knowledge base in the console

Before building the function that queries the knowledge base, we can already test it from the console. I used this awesome first believe in serverless podcast episode as a data source:

Testing the knowledge base from the console

The console also provides a way to test and adjust the generation configuration, including choosing the model, using a custom prompt, and adjusting parameters like temperature, top-p. This allows you to tailor the configuration to your specific use case requirements.

Alright, let’s now create the endpoint to see how we can query this knowledge base

7- Querying the knowledge base

This function requires the bedrock:RetrieveAndGenerate permission for accessing the knowledge base and the bedrock:InvokeModel permission for the Claude 3 sonnet model arn used during the generation phase. It returns an output result along with the source URL associated with the retrieved chunks that contributed to the output:

async fn query_knowledge_base(
event: Request,
bedrock_agent_runtime_client: &aws_sdk_bedrockagentruntime::Client,
knowledge_base_id: &str,
model_arn: &str,
) -> Result<Response<Body>, Error> {
//request validation, redacted...
let configuration =
build_retrieve_and_generate_configuration(knowledge_base_id, model_arn, &query)?;
let input = RetrieveAndGenerateInput::builder()
.text(query.input)
.build()?;
let result = bedrock_agent_runtime_client
.retrieve_and_generate()
.retrieve_and_generate_configuration(configuration)
.input(input)
.send()
.await?;
if result.output.is_none() {
return Ok(Response::builder()
.status(404)
.header("content-type", "application/json")
.body("Not found".into())
.map_err(Box::new)?);
}
let (output_text, sources) = unwrap_result(result);
let resp = Response::builder()
.status(200)
.header("content-type", "application/json")
.body(
json!({
"output": output_text,
"sources": sources
})
.to_string()
.into(),
)
.map_err(Box::new)?;
Ok(resp)
}

The build_retrieve_and_generate_configuration function prepares the necessary parameters for calling the retrieveAndGenerate endpoint.

As an example, I am applying a retrieval filter to the topic attribute.

fn build_retrieve_and_generate_configuration(
knowledge_base_id: &str,
model_arn: &str,
query: &Query,
) -> Result<RetrieveAndGenerateConfiguration, Error> {
let q = query.clone();
let filter = RetrievalFilter::Equals(
FilterAttribute::builder()
.key("topic")
.value(q.topic.into())
.build()?,
);
let vector_search_config = KnowledgeBaseVectorSearchConfiguration::builder()
.filter(filter)
.build();
let retrieval_config = KnowledgeBaseRetrievalConfiguration::builder()
.vector_search_configuration(vector_search_config)
.build();
let rng_config = KnowledgeBaseRetrieveAndGenerateConfiguration::builder()
.retrieval_configuration(retrieval_config)
.knowledge_base_id(knowledge_base_id)
.model_arn(model_arn)
.build()
.map_err(Box::new)?;
let configuration = RetrieveAndGenerateConfiguration::builder()
.r#type(RetrieveAndGenerateType::KnowledgeBase)
.knowledge_base_configuration(rng_config)
.build()
.map_err(Box::new)?;
Ok(configuration)
}

Et voilà ! Let’s call our api straight from postman:

Testing the RAG endpoint from postman

Wrapping up

I’ve only scratched the surface when it comes to building RAG systems. Bedrock simplifies the process considerably. There’s still plenty of room for improvement, such as optimising retrieval methods and refining prompts. I had also fun building this in Rust — using Cargo Lambda makes creating Lambdas in Rust a breeze, check it-out!

As always, you can find the full code source, ready to be adapted and deployed here:
ziedbentahar/rag-on-media-content-with-bedrock-and-transcribe

Thanks for reading ! Hope you enjoy it

Resources

Amazon Web Services (AWS) - Pinecone Docs
Rust functions on AWS Lambda made simple
Create a knowledge base

Top comments (4)

Collapse
 
jasondunn profile image
Jason Dunn [AWS]

An excellently-constructed article! Nicely done.

Collapse
 
zied profile image
Zied Ben Tahar

Thank you Jason ! 🤩
I really enjoyed building and writing this one 🦀

Collapse
 
gouthamsayee profile image
Goutham

Love the article

Collapse
 
zied profile image
Zied Ben Tahar

Thank you Goutham !