DEV Community

Anthony Accomazzo
Anthony Accomazzo

Posted on • Originally published at blog.sequin.io

Storing Salesforce embeddings with pgvector and OpenAI

We use Salesforce as the hub for all customer data. We pipe notes, call transcripts, and email conversations into Salesforce.

We thought it would be cool to build tooling on top of Salesforce that helped us with product roadmap and direction. We receive feedback and great ideas all the time from our customers. How could we make it easy to see suggested features? To follow-up with the right customers after shipping a request? And spot recurring themes from our customer conversations?

Just building a simple search tool isn’t enough. A standard search query matches on literal words in a string. So, if we wanted to pull up all the notes where a customer requested that Sequin support an integration, we’d have to brute force with a number of search strings, like “request,” “support,” “add integration.” Not only is this tedious, but it’s unlikely to work.

This is where embeddings come in.

Embeddings

An embedding is a vector representation of data. A vector representation is a series of floats, like this:

[-0.016741209, 0.019078454, 0.017176045, -0.028046958, ...]
Enter fullscreen mode Exit fullscreen mode

Embeddings help capture the relatedness of text, images, video, or other data. With that relatedness, you can search, cluster, and classify.

Embeddings enable an advanced search method of our customers notes. First, we can generate embeddings for all notes. Then, we can perform a query using another embedding. The user will type in a query, such as “want sequin to support ecommerce service.” We can take that query, turn it into an embedding, and compare its relatedness to the embeddings of all the notes.

To generate embeddings, you’ll want to rely on a third-party vendor. You can use APIs like OpenAI’s to generate them.

Embeddings search tool

In this post, I’ll show you how to build a tool that will let you do an embeddings search across your Salesforce data. This will let you search semantically instead of literally. You can use a tool like this to filter and find feedback or product suggestions.

You can create embeddings on any Salesforce object, like Case, Task, Note or a custom object that your team uses. In the examples below, I’ll use popular Salesforce objects interchangeably.

This post assumes you already have Salesforce setup with Sequin to sync Salesforce objects to your Postgres database. You should also have an OpenAI account setup.

Prepare your database

To prepare your database, first add the pgvector extension 1:

create extension vector;
Enter fullscreen mode Exit fullscreen mode

Create a separate schema, salesforce_embedding, for your embedding data 2. In your queries, you’ll join your embedding tables to your Salesforce tables.

Here’s an example of creating an embedding table for Salesforce tasks:

create table salesforce_embedding.task (
  id text references salesforce.task(id) on delete cascade,
  embedding vector(1536) not null
)
Enter fullscreen mode Exit fullscreen mode

In this post, I’ll show you how to use OpenAI’s text-embedding-ada-002 model. That model generates embeddings with 1536 dimensions, hence the 1536 parameter above.

Generate embeddings on insert or update

You'll first setup your app to generate embeddings for Salesforce records as they're inserted or updated. Then, I'll show you how to backfill embeddings for your existing records.

You have two options for finding out about new or updated Salesforce objects in your database.

You can use Postgres' listen/notify protocol. It's fast to get started with and works great. But, notify events are ephemeral, so delivery is at-most-once. That means there's a risk of missing notifications, and therefore of there being holes in your data.

Along with a sync to Postgres, Sequin provisions an event stream for you. Sequin will publish events to a NATS stream associated with your sync, sequin-[sync_id] (e.g. sequin-sync_1a107d79). You can write a function in your app that listens for these events and updates the embedding column for the Salesforce object that was updated or inserted. Notably, unlike listen/notify, the NATS stream is durable so you get at-least-once delivery guarantees.

The NATS team maintains client libraries in over 40 languages. Here's the skeleton for a listener for Salesforce upsert events in Elixir:

defmodule MyApp.Sequin.SalesforceStreamConsumer do
  use Jetstream.PullConsumer

  def start_link([]) do
    Jetstream.PullConsumer.start_link(__MODULE__, [])
  end

    @impl PullConsumer
  def init([]) do
      {:ok, nil, connection_name: :gnat, stream_name: "sequin-sync_1a107d79", consumer_name: "my_app_sf_upserts"}
  end

  @impl PullConsumer
  def handle_message(message, state) do
    # TODO
    # event handling code goes here
  end
end
Enter fullscreen mode Exit fullscreen mode

In init/1, you specify the stream name as well as a name for your consumer. handle_message/2 is the function that handles each event on the stream. In this case, that means handle_message/2 will be invoked every time a Salesforce object is inserted or updated.

The consumer_name for this module is my_app_sf_upserts. I’ll show you in a moment how to register this consumer with NATS.

In handle_message/2, you make an API request to OpenAI. The body specifies the input for the embedding and the model to use. For the input, you’ll want to generate the embedding based on a different field or combination of fields for each embedding. So, you can implement a get_embedding_input/2 for each collection you care about. The following example handles one table, task:

@impl true
def handle_message(message, state) do
  event = Jason.decode!(message.body)

 %{ id => id, collection => collection } = event

  body = %{
    input: get_embedding_input(collection, id),
    model: "text-embedding-ada-002"
  }

  req =
    Req.new(
      url: "https://api.openai.com/v1/embeddings",
      headers: [
        {"Content-Type", "application/json"},
        {"Authorization", "Bearer <<secret>>"}
      ],
      json: body
    )

  {:ok, resp} = Req.post(req)
  %{ data => [%{ embedding => embedding }] } = resp.body

  upsert_embedding(collection, id, embedding)

  {:ack, state}
end

defp get_embedding_input(task, id) do
  Salesforce.Task.get!(id, select: :description)
  |> Map.fetch!(:description)
End

defp upsert_embedding(task, id, embedding) do
    %Salesforce.TaskEmbedding{id: id}
    |> Salesforce.TaskEmbedding.changeset(%{ embedding: embedding })
    |> MyApp.Repo.insert!(on_conflict: :replace_all, conflict_target: [:id])
end

# handle other collection types here
Enter fullscreen mode Exit fullscreen mode

At the end of handle_message/2 is a call to upset_embedding/3 which upserts the record to the database. Shown in the example are handler functions for the Task collection. You can add whatever handler functions you need for the collections you want to have embeddings for.

This example does not handle issues with the OpenAI API gracefully. A more robust solution would have some error handling around that call.

Now, register this consumer you just wrote with your NATS event stream. You can filter on only upserted events (you don’t want your handler to be invoked for deleted events):

nats consumer add --pull --deliver=all --creds /path/to/your.creds sequin-sync_1a107d79 ghola --filter sequin.sync_1a107d79.salesforce.*.upserted
Enter fullscreen mode Exit fullscreen mode

This example uses NATS cli, which is nice for one-off commands like this one.

With this listener deployed, when a record inserts, your consumer will populate its embedding column. And when a record updates, your consumer will regenerate its embedding column.

The next step is to backfill all the records with null values for embedding in the database.

Backfill the embedding column for existing records

You have two primary options for backfilling the embedding column:

Create a batch job

You can write a one-off batch job that paginates through your table and kicks off API calls to fetch the embeddings for each record.

You can paginate through each table like this 3:

select id, description from salesforce.task order by id asc limit 1000 offset 0;
Enter fullscreen mode Exit fullscreen mode

You can send multiple strings at once to OpenAI’s embedding API. After grabbing a set of rows, here’s how you might fetch the embeddings for those records:

defp fetch_and_upsert_rows(rows) do
  inputs = Enum.map(rows, &get_embedding_input/1)

  body = %{
    input: inputs,
    model: "text-embedding-ada-002"
  }

  req =
    Req.new(
      url: "https://api.openai.com/v1/embeddings",
      headers: [
        {"Content-Type", "application/json"},
        {"Authorization", "Bearer <<secret>>"}
      ],
      json: body
    )

  embeddings = req.body[data] |> Enum.map(& &1[embedding])
  upsert_embeddings(rows, embeddings)
end

defp get_embedding_input(%Salesforce.Task{} = task) do
  task.description
end

# … write other `get_embedding_input/1` clauses

defp upsert_embeddings(rows, embeddings) do
  records = Enum.zip_with(tasks, embeddings, fn task, embedding ->
    %{
      id: task.id,
      embedding: embedding
    }
  end)

  MyApp.Repo.insert_all(
    SalesforceTaskEmbedding,
    records,
    on_conflict: :replace_all,
    conflict_target: [:id]
  )
end
Enter fullscreen mode Exit fullscreen mode

Use a Sequin job

Alternatively, you can have Sequin do the record pagination and collection part for you. This will let you use your existing event handling code to backfill your table.

You can kick-off a backfill of your events stream via the Sequin console. Sequin will paginate your Postgres tables and fill your stream with events that have the same shape as the update and insert events:

{ "id": "note-8hUjsk2p", "table_name": “note”, { “data”: [] } }
Enter fullscreen mode Exit fullscreen mode

Assuming you don’t have any other consumers listening to the sequin.sync_1a107d79.salesforce.*.upserted topic, you can reuse this topic for the backfill 4. You can backfill each of your collections, like task and account.

Create a Postgres query for finding matches

With your embeddings setup in Postgres, you’re ready to create a mechanism for querying them.

Supabase has a great post on embeddings in Postgres. I’ve adapted their similarity query below. You can use the cosine distance operator (<=>) provided by pgvector to determine similarity. Here’s a query that grabs a list of tasks over a match_threshold, ordered by most similar to least similar:

select
  task.id,
  task.content,
  1 - (embedding_task.embedding <=> query_embedding) as similarity
from salesforce.task as task
join salesforce_embedding.task as embedding_task on task.id = embedding_task.id
where 1 - (embedding_task.embedding <=> query_embedding) > match_threshold
order by similarity desc
limit match_count;
Enter fullscreen mode Exit fullscreen mode

Build the tool

With your data model and search function squared away, you can build your tool.

When the user enters a query, you’ll first convert their search query into an embedding using OpenAI. Then, you’ll use the SQL query above to find the Salesforce objects that are the closest match.

Below is a simple example of this tool. Here’s a demonstration of a search for Notes that mention a SaaS platform that a customer or prospect is hoping we add to Sequin:

Note that the word “integrated” didn’t appear at all in our filter query, yet we still found a match for “interest in seeing ServiceNow integrated into Sequin…”

This strategy works great for shorter text fields. But it will break down with longer call notes, Intercom conversations, and extended email threads. In those situations, it’s often not enough to find the matching record. You also want to know where in that record the match occurred.

To advance our tool in order to address this, we sliced the text fields of our Salesforce objects into smaller, overlapping “windows.” This meant we could compare each of these smaller embeddings to our query embedding to identify regions of high similarity.

To achieve this, you can split your objects across multiple embedding records. Your table could look something like this, with an added idx column:

create table salesforce_embedding.task (
  id text references salesforce.task(id) on delete cascade,
  idx integer not null,
  embedding vector(1536) not null,
  primary key (id, idx)
);
Enter fullscreen mode Exit fullscreen mode

The idx (or index) is the window index. One Salesforce object could be split over an arbitrary number of embedding records, according to whatever window size seems to work best for your application.

In the application, you’ll display the relevant windows that scored highest in similarity. That will let the user easily see the sentences or paragraphs that are a match. Clicking on the window can bring them to the whole Note, but at the specific location that was a high match.

Writing back to Salesforce

As we were filtering and reading through Salesforce Tasks and Notes, we realized in addition to search we wanted two pieces of functionality:

The ability to rate objects on a scale of 1-5, according to how deep or insightful the conversation was.
The ability to tag notes based on product themes, recurring problems, etc.

With Sequin’s write support, this update is trivial. You can add custom fields to your objects (like Rating__c and Tags__c). Then, you can make write requests back to Salesforce like this:

def update_tags(%Salesforce.Task{} = task, tags) do
  task
  |> Salesforce.Task.update_changeset(%{ tags__c: tags })
  |> Repo.update!
end
Enter fullscreen mode Exit fullscreen mode

Changes are applied synchronously to Salesforce’s API, so if there’s a validation error it will be raised in your code.

Conclusion

Consolidating customer feedback and call notes into one location is only half the battle. The next piece is creating tools and workflows that let you use this information to guide your product and keep customers in the loop while doing so.

Embeddings are a powerful tool for achieving this. You can use a machine to help you find similar notes and cluster ideas. With a little effort, you can build your own tool, which gives you far more power and flexibility than you’d find using Salesforce AI.

Your team will need to centralize their notes to make this work great, however! In a future post, I’ll detail the strategies we use for making data capture easy (e.g. drop a call note into Slack). Subscribe to get notified when we write that post.


  1. pgvector is included in most of the latest distributions of Postgres.
    If you're on AWS RDS, be sure you upgrade to Postgres 15.2+ to get access to the vector extension. 

  2. You can mix and match fields from different tables to generate embeddings. To start, you can keep it simple and generate embeddings that correspond to a single Salesforce object. For most objects, you’ll probably choose to create an embedding for just one field. For example, you don't need to create an embedding for the whole Note object, just the body field. 

    A few tables might warrant creating a blended embedding with more than one field. For example, Tasks have both a subject and a description. You can concatenate the two fields together into a newline-separated string, and generate the embedding on that.

    In the future, you can blend more fields or objects together to let you build on your data in novel ways.

  3. Normally a pagination strategy like this wouldn't be safe unless IDs were auto-incrementing. But this will work fine in all situations, because we don't care if we miss records that are inserted mid-pagination -- those are being handled by our event handler above! 

  4. If you need to, you can use a different topic name for this populator, e.g. jobs.backfill_sf_embeddings.[collection]. You’ll just need to register a different consumer, as each consumer can only be subscribed to one topic. 

Top comments (0)