DEV Community

szymon-szym
szymon-szym

Posted on

OpenSearch with AWS Lambda Rust (part 1) - set up the local environment

Introduction

Rust is well suited for creating low-latency lambda functions. It comes well with OpenSearch and lets build extremely performant integrations.

I plan to create a serverless application, that would use OpenSearch, and document my learnings along the way. I will create a few blog posts to keep things short and sweet.

Project structure

The code for this blog post is here (branch 1-local-setup)

Dependencies

Infrastructure will be handled with AWS CDK.
For lambda-related tasks, I am going to use great cargo lambda

Architecture

To be honest I haven't decided yet how I want to use created lambdas. It is not crucial for the local setup though.

Run OpenSearch locally and connect with the Rust client

Folders structure

I would like to start development locally because there is a lot of stuff I can build and test before going to the cloud. I love to have a short feedback loop.

Having that in mind I start by creating a root project as AWS CDK because eventually, I want to use it as my IaC.

cdk init --language typescript
Enter fullscreen mode Exit fullscreen mode

Now I define a new folder functions and subfolders query and opensearch_service. functions is my root folder for lambda functions.

Run OpenSearch locally

A local instance of OpenSearch is pretty handy. There are different ways to achieve it but for me, the easiest way to run OpenSearch locally was to use docker-compose.yml from official site

I just copied docker-compose file to the root folder of my project and ran docker compose up. I needed also to apply some changes in the system configuration, but it wasn't hard at all.

Once all containers are running, I can go to http://localhost:5601/app/home#/ and start playing with the OpenSearch dashboard (user: admin, password: admin)

Image description

That's nice!

I need data to test my functions, so I go to Add sample data and pick Sample Flights Data to be loaded

Image description

Now I go to dev tools and confirm, that data was loaded

Image description

My local OpenSearch instance is ready.

Rust OpenSearchService

I plan to put functionalities related to OpenSearch in a separate library, so I can use them in different functions later on.
I use the cargo feature which is a workspace. I can create a few crates in a single workspace (eg. different lambda functions, common libraries, etc.)
In the functions/ folder I put Cargo.toml file with the following content:

[workspace]

members = [
    "query",
    "opensearch_service",
]
Enter fullscreen mode Exit fullscreen mode

I go to the functions/opensearch_service folder and run cargo init --lib. Here I will place functionalities I want to be shared by different functions. For now, I need a client to connect to OpenSearch and perform a dummy query.

I will use the following dependencies: opensearch, serde, serde_json, anyhow. For me using cargo add is the most convenient way.

cargo add serde -F derive
cargo add opensearch -F aws-auth
cargo add anyhow serde_json
Enter fullscreen mode Exit fullscreen mode

The OpenSearchService at this point is simple:

pub struct OpenSearchService {
    client: OpenSearch,    
}
Enter fullscreen mode Exit fullscreen mode

I implement a function for creating a client with a local connection:

// ...
impl OpenSearchService {

    pub fn local_client() -> Self {
        let url = Url::parse("https://localhost:9200").unwrap();
        let conn_pool = SingleNodeConnectionPool::new(url);
        let credentials =
            opensearch::auth::Credentials::Basic("admin".to_string(), "admin".to_string());
        let cert_validation = opensearch::cert::CertificateValidation::None;
        let transport = TransportBuilder::new(conn_pool)
            .cert_validation(cert_validation)
            .auth(credentials)
            .build().unwrap();
        let client = OpenSearch::new(transport);

        Self { client }
    }
Enter fullscreen mode Exit fullscreen mode

To be honest I struggled a lot with setting up this connection. It turned out that I needed to use a custom URL (the default one in use is http://) and configure the certificate validation flow.

Once the client is ready, I build the function that runs an empty must query and parses the results:

// ...
// impl OpenSearchService {
// ... 
pub async fn query_all_docs<T>(&self, index: &str, limit: i64) -> anyhow::Result<Vec<T>> 
    where T: DeserializeOwned
    {
        let response = self.client
            .search(SearchParts::Index(&[index]))
            .size(limit)
            .from(0)
            .body(json!({
                "query": {
                    "bool": {
                        "must": []
                    }
                }
            }))
            .send()
            .await?;

        let response_body = response.json::<Value>().await?;

        let result = response_body["hits"]["hits"]
            .as_array()
            .unwrap()
            .iter()
            .map(|raw_value| {
                serde_json::from_value::<T>(raw_value["_source"].clone()).unwrap()
            })
            .collect::<Vec<_>>();

        Ok(result)
    }
Enter fullscreen mode Exit fullscreen mode

The query function is generic, so it can be used for different types in a type-safe manner. A thing to remember is that we need to add some bounds for the generic type to make sure, that it can be deserialized where T: DeserializeOwned. If it is missing, the compiler in cooperation with stack overflow would help us close the gap.

Ok, the initial implementation of the OpenSearchService is in place, let's create a lambda function.

Rust query lambda function

I go to the functions/query and run cargo lambda init. When prompted I select no for the question about creating http handler, and I leave the event type empty.

In the Cargo.toml just created library can be added using the path

# ...
[dependencies]

opensearch_service = { path = "../opensearch_service"}
#...
Enter fullscreen mode Exit fullscreen mode

I will use also serde and serde_json.

To be able to query my sample dataset I need to define a type for it. I create FlightData struct and copy fields from the OpenSearch dashboard.
In cases like this I usually just paste the commented json and start creating a struct - Code Whisperer is clever enough to figure out what I need

Image description

Finally, the structure looks like this:

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct FlightData {
    flight_num: String,
    dest_country: String,
    origin_weather: String,
    origin_city_name: String,
    avg_ticket_price: f32,
    distance_miles: f32,
    flight_delay: bool,
    dest_weather: String,
    dest: String,
    flight_delay_type: String,
    origin_country: String,
    #[serde(rename = "dayOfWeek")]
    day_of_week: u8,
    distance_kilometers: f32,
    #[serde(rename = "timestamp")]
    timestamp: String,
    dest_location: Location,
    #[serde(rename = "DestAirportID")]
    dest_airport_id: String,
    carrier: String,
    cancelled: bool,
    flight_time_min: f32,
    origin: String,
    origin_location: Location,
    dest_region: String,
    #[serde(rename = "OriginAirportID")]
    origin_airport_id: String,
    origin_region: String,
    dest_city_name: String,
    flight_time_hour: f32,
    flight_delay_min: i64,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Location {
    lat: String,
    lon: String,
}
Enter fullscreen mode Exit fullscreen mode

For initial testing, request, and response can be simple:

#[derive(Deserialize)]
struct Request {
    limit: i64,
}

#[derive(Serialize)]
struct Response {
    flights: Vec<FlightData>,
}

async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// ...
Enter fullscreen mode Exit fullscreen mode

Once types are in place, lambda itself is pretty straightforward

// ...
async fn function_handler(os_client: &OpenSearchService, event: LambdaEvent<Request>) -> Result<Response, Error> {

    let limit = event.payload.limit;

    let index = "opensearch_dashboards_sample_data_flights";

    let result = os_client.query_all_docs::<FlightData>(index, limit).await?;
    // Prepare the response
    let resp = Response {
        flights: result,
    };

    Ok(resp)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing::init_default_subscriber();

    let os_client = opensearch_service::OpenSearchService::local_client();


    run(service_fn(|event: LambdaEvent<Request>| {
        function_handler(&os_client, event)
    })).await
}
//...
Enter fullscreen mode Exit fullscreen mode

Testing lambda locally

Once all is prepared I run the function and see if it works. I create a dummy test event in the functions/query/events/flights.json file

{
    "limit": 5
}
Enter fullscreen mode Exit fullscreen mode

Testing lambda locally with cargo lambda is a pleasant experience. I open two terminal windows and place them one next to the other. I run cargo lambda watch in the right one, and cargo lambda invoke -F events/flights.json in the left.

Image description

As you might imagine, I needed a few iterations before I got the FlightData deserialization properly defined. With cargo lambda iterating is super smooth - in the watch mode lambda is rebuilt automatically on save and can be invoked right away.

Summary

In the current post, I prepared a local environment for building a lambda function that consumes data from OpenSearch. This is a nice starting point.

As the next steps, I will implement more real-life queries, write an indexer function to populate data, and finally, deploy the whole solution to AWS. Bear with me if you are interested in this topic.

Top comments (0)