DEV Community

Cover image for Harnessing AWS's Invoke Model State for Efficient Embedding Creation in Workflows
Salam Shaik for AWS Community Builders

Posted on

Harnessing AWS's Invoke Model State for Efficient Embedding Creation in Workflows

Hi everyone,

AWS recently released bedrock integration to step functions. They are providing InvokeModel **and **CreateModelCustomizationJob **states in the step functions. If you can search in the search bar of the step function with the **Bedrock name it will provide a list of the actions available.

Now we can create the workflows by invoking the models available in the Amazon Bedrock from the step functions.

If you are not aware of the embeddings and bedrock service, refer to my previous article using this link
Building a vector-based search engine using Amazon Bedrock and Amazon Open Search Service

Let’s dive into the article

In this article, we are going to create a state machine workflow to generate embeddings from a CSV file stored in an S3 bucket and store the generated embeddings in the OpenSearch index for further processing.

AWS services we are going to use in this experiment

  • S3 bucket for storing the data

  • Step function to create a workflow for generating embeddings and storing them OpenSearch index

  • BedRock contains the models for generating the embeddings

  • Lambda for storing the generating embedding in the OpenSearch

  • OpenSearch for storing the embeddings for implementing semantic search

Step 1: Collecting and storing the data:

Image description

Step 2: Create an open Search Service cluster

  • For storing the embeddings we will create an open search cluster

  • Visit the open search service from the AWS search bar

  • Click on the Create Domain button

  • Provide the Name for the domain, choose standard create, and select dev/test template

Image description

  • Deployment will be without standby as we are not doing this for production purposes.

Image description

  • From the general purpose instances select t3.small.search instances, as we are experimenting and nodes will only have 1

Image description

  • Instead of VPC deploy it publicly and provide a master username and password

Image description

  • Configure the access policy to allow all to access OpenSearch dashboards and endpoints. But for production make it according to your security requirements

Image description

  • Click on Create Domain and wait for a few minutes for the cluster to come online

  • Once the cluster is ready copy the open search endpoints from the dashboard to use in the Lambda function

  • Visit the OpenSearch dashboard and create an index for storing the data

  • Visit dev tools ****from the dashboard and use the following code to create an index

PUT contents
    {
      "settings": {
        "index.knn": true
      },
      "mappings": {
        "properties": {
          "Summary":{
            "type": "text"
          },  
          "Title": {
            "type": "text"
          },
          "Embedding": {
            "type": "knn_vector",
            "dimension": 1536
          }
        }
      }
    }
Enter fullscreen mode Exit fullscreen mode

Step 3: Create a Lambda function for storing the embeddings

  • Visit the Lambda service and create a Lambda function with the Python 3.9 environment

  • Here is the lambda code

    import boto3
    import requests
    from requests_aws4auth import AWS4Auth

    def lambda_handler(event, context):
        # Extract the relevant data
        summary = event['Summary']#summary column of csv file
        title = event['Title']#Title column of the csv file
        embedding = event['output']['Body']['embedding'] #contains embedding generataed for Summary columns

        # Define the document to be indexed
        document = {
            'Summary': summary,
            'Title': title,
            'Embedding': embedding
        }

        #Username and password of the opensearch endpoint
        auth = ('username',"password")

        # OpenSearch domain endpoint
        opensearch_domain_endpoint = "https://search-contents-oflzhkvsjgukdwvszyd5erztza.us-east-1.es.amazonaws.com"  # e.g., https://search-mydomain.us-west-1.es.amazonaws.com
        index_name = 'contents'
        url = f"{opensearch_domain_endpoint}/{index_name}/_doc"

        headers = { "Content-Type": "application/json" }

        # Make the signed HTTP request
        response = requests.post(url, auth=auth, json=document, headers=headers)

        return {
            'statusCode': 200,
            'body': response.text
        }
Enter fullscreen mode Exit fullscreen mode
  • This is how we are going to send the data to Lambda from the state machine
 {
      "Summary": "value1",
      "Title": "value2",
      "output": {
        "Body": {
          "embedding": "[0,1,2,3]"
        }
      }
    }
Enter fullscreen mode Exit fullscreen mode

Step 4: Create a state machine in step functions

  • Visit step functions from the AWS search bar

  • From the flow, we are using **Map **for iterating through the CSV file records

Image description

  • From the bedrock section Invoke Model action

Image description

  • **Invoke Lambda **action for sending embeddings to Open Search

Image description

  • Create a workflow like the below picture. You can drag and drop items from the left menu

Image description

  • After creating, configure each state like this

  • Map: configure it to use the S3 bucket CSV file we stored in the first step and make it iterate through the **Summary, Title **Columns of the document

  • Output of the Map function to another s3 bucket for avoiding limit exceed errors from the state machine

Image description

Image description

Image description

Image description

  • Configure **the Invoke Model **state to use **amazon.titan-embed-text-v1 **model and input from the Map function like this

Image description

Image description

Image description

  • Configure the **Lambda Invoke **state with the function name

  • This is the final code of the state machine after configuring all the states
 {
      "Comment": "A description of my state machine",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "ItemProcessor": {
            "ProcessorConfig": {
              "Mode": "DISTRIBUTED",
              "ExecutionType": "STANDARD"
            },
            "StartAt": "Invoke Model",
            "States": {
              "Invoke Model": {
                "Type": "Task",
                "Resource": "arn:aws:states:::bedrock:invokeModel",
                "Parameters": {
                  "ModelId": "arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1",
                  "Body": {
                    "inputText.$": "$.Summary"
                  }
                },
                "Next": "Lambda Invoke",
                "ResultPath": "$.output"
              },
              "Lambda Invoke": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "OutputPath": "$.Payload",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": "arn:aws:lambda:us-east-1:556343216872:function:dump-embeddings:$LATEST"
                },
                "Retry": [
                  {
                    "ErrorEquals": [
                      "Lambda.ServiceException",
                      "Lambda.AWSLambdaException",
                      "Lambda.SdkClientException",
                      "Lambda.TooManyRequestsException"
                    ],
                    "IntervalSeconds": 1,
                    "MaxAttempts": 3,
                    "BackoffRate": 2
                  }
                ],
                "End": true
              }
            }
          },
          "Label": "Map",
          "MaxConcurrency": 10,
          "ItemReader": {
            "Resource": "arn:aws:states:::s3:getObject",
            "ReaderConfig": {
              "InputType": "CSV",
              "CSVHeaderLocation": "GIVEN",
              "MaxItems": 10,
              "CSVHeaders": [
                "Summary",
                "Title"
              ]
            },
            "Parameters": {
              "Bucket": "netflix-titles-csv",
              "Key": "Hydra-Movie-Scrape.csv"
            }
          },
          "End": true,
          "ResultWriter": {
            "Resource": "arn:aws:states:::s3:putObject",
            "Parameters": {
              "Bucket": "embeddings-ouput",
              "Prefix": "output"
            }
          }
        }
      }
    }
Enter fullscreen mode Exit fullscreen mode

Step 5: Execute the state machine workflow and check the output

  • From the step function console top right corner click on the **Execute **button

  • Leave the input of the state machine as it is and click on execute

  • After the successful execution of the workflow, you can see the results like this

Image description

  • Visit the OpenSearch dashboard and click on Query Workbench to execute the query and check whether embeddings are stored or not

Image description

  • As embeddings are a very large dataset they won’t be visible on the dashboard

That’s it. We successfully create a workflow to generate embedding using a bedrock model.

If you want to use the generated embeddings for further implementing search engine, you can visit the link provided at the start of the article. It will contain the steps on how to build the vector search engine in OpenSearch

If you are having any doubts or need more clarification or help, Please feel free to comment on this article. Will surely get back to you.

Thanks :)

Top comments (0)