Introduction
Rust is well suited for creating low-latency lambda functions. It comes well with OpenSearch and lets build extremely performant integrations.
I plan to create a serverless application, that would use OpenSearch, and document my learnings along the way. I will create a few blog posts to keep things short and sweet.
Project structure
The code for this blog post is here (branch 1-local-setup)
Dependencies
Infrastructure will be handled with AWS CDK.
For lambda-related tasks, I am going to use great cargo lambda
Architecture
To be honest I haven't decided yet how I want to use created lambdas. It is not crucial for the local setup though.
Run OpenSearch locally and connect with the Rust client
Folders structure
I would like to start development locally because there is a lot of stuff I can build and test before going to the cloud. I love to have a short feedback loop.
Having that in mind I start by creating a root project as AWS CDK because eventually, I want to use it as my IaC.
cdk init --language typescript
Now I define a new folder functions
and subfolders query
and opensearch_service
. functions
is my root folder for lambda functions.
Run OpenSearch locally
A local instance of OpenSearch is pretty handy. There are different ways to achieve it but for me, the easiest way to run OpenSearch locally was to use docker-compose.yml
from official site
I just copied docker-compose file to the root folder of my project and ran docker compose up
. I needed also to apply some changes in the system configuration, but it wasn't hard at all.
Once all containers are running, I can go to http://localhost:5601/app/home#/ and start playing with the OpenSearch dashboard (user: admin, password: admin)
That's nice!
I need data to test my functions, so I go to Add sample data
and pick Sample Flights Data
to be loaded
Now I go to dev tools
and confirm, that data was loaded
My local OpenSearch instance is ready.
Rust OpenSearchService
I plan to put functionalities related to OpenSearch in a separate library, so I can use them in different functions later on.
I use the cargo feature which is a workspace
. I can create a few crates in a single workspace (eg. different lambda functions, common libraries, etc.)
In the functions/
folder I put Cargo.toml
file with the following content:
[workspace]
members = [
"query",
"opensearch_service",
]
I go to the functions/opensearch_service
folder and run cargo init --lib
. Here I will place functionalities I want to be shared by different functions. For now, I need a client to connect to OpenSearch and perform a dummy query.
I will use the following dependencies: opensearch, serde, serde_json, anyhow
. For me using cargo add
is the most convenient way.
cargo add serde -F derive
cargo add opensearch -F aws-auth
cargo add anyhow serde_json
The OpenSearchService
at this point is simple:
pub struct OpenSearchService {
client: OpenSearch,
}
I implement a function for creating a client with a local connection:
// ...
impl OpenSearchService {
pub fn local_client() -> Self {
let url = Url::parse("https://localhost:9200").unwrap();
let conn_pool = SingleNodeConnectionPool::new(url);
let credentials =
opensearch::auth::Credentials::Basic("admin".to_string(), "admin".to_string());
let cert_validation = opensearch::cert::CertificateValidation::None;
let transport = TransportBuilder::new(conn_pool)
.cert_validation(cert_validation)
.auth(credentials)
.build().unwrap();
let client = OpenSearch::new(transport);
Self { client }
}
To be honest I struggled a lot with setting up this connection. It turned out that I needed to use a custom URL (the default one in use is http://) and configure the certificate validation flow.
Once the client is ready, I build the function that runs an empty must
query and parses the results:
// ...
// impl OpenSearchService {
// ...
pub async fn query_all_docs<T>(&self, index: &str, limit: i64) -> anyhow::Result<Vec<T>>
where T: DeserializeOwned
{
let response = self.client
.search(SearchParts::Index(&[index]))
.size(limit)
.from(0)
.body(json!({
"query": {
"bool": {
"must": []
}
}
}))
.send()
.await?;
let response_body = response.json::<Value>().await?;
let result = response_body["hits"]["hits"]
.as_array()
.unwrap()
.iter()
.map(|raw_value| {
serde_json::from_value::<T>(raw_value["_source"].clone()).unwrap()
})
.collect::<Vec<_>>();
Ok(result)
}
The query function is generic, so it can be used for different types in a type-safe manner. A thing to remember is that we need to add some bounds for the generic type to make sure, that it can be deserialized where T: DeserializeOwned
. If it is missing, the compiler in cooperation with stack overflow would help us close the gap.
Ok, the initial implementation of the OpenSearchService
is in place, let's create a lambda function.
Rust query
lambda function
I go to the functions/query
and run cargo lambda init
. When prompted I select no
for the question about creating http handler, and I leave the event type empty.
In the Cargo.toml
just created library can be added using the path
# ...
[dependencies]
opensearch_service = { path = "../opensearch_service"}
#...
I will use also serde
and serde_json
.
To be able to query my sample dataset I need to define a type for it. I create FlightData
struct and copy fields from the OpenSearch dashboard.
In cases like this I usually just paste the commented json and start creating a struct - Code Whisperer is clever enough to figure out what I need
Finally, the structure looks like this:
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct FlightData {
flight_num: String,
dest_country: String,
origin_weather: String,
origin_city_name: String,
avg_ticket_price: f32,
distance_miles: f32,
flight_delay: bool,
dest_weather: String,
dest: String,
flight_delay_type: String,
origin_country: String,
#[serde(rename = "dayOfWeek")]
day_of_week: u8,
distance_kilometers: f32,
#[serde(rename = "timestamp")]
timestamp: String,
dest_location: Location,
#[serde(rename = "DestAirportID")]
dest_airport_id: String,
carrier: String,
cancelled: bool,
flight_time_min: f32,
origin: String,
origin_location: Location,
dest_region: String,
#[serde(rename = "OriginAirportID")]
origin_airport_id: String,
origin_region: String,
dest_city_name: String,
flight_time_hour: f32,
flight_delay_min: i64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Location {
lat: String,
lon: String,
}
For initial testing, request, and response can be simple:
#[derive(Deserialize)]
struct Request {
limit: i64,
}
#[derive(Serialize)]
struct Response {
flights: Vec<FlightData>,
}
async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// ...
Once types are in place, lambda itself is pretty straightforward
// ...
async fn function_handler(os_client: &OpenSearchService, event: LambdaEvent<Request>) -> Result<Response, Error> {
let limit = event.payload.limit;
let index = "opensearch_dashboards_sample_data_flights";
let result = os_client.query_all_docs::<FlightData>(index, limit).await?;
// Prepare the response
let resp = Response {
flights: result,
};
Ok(resp)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();
let os_client = opensearch_service::OpenSearchService::local_client();
run(service_fn(|event: LambdaEvent<Request>| {
function_handler(&os_client, event)
})).await
}
//...
Testing lambda locally
Once all is prepared I run the function and see if it works. I create a dummy test event in the functions/query/events/flights.json
file
{
"limit": 5
}
Testing lambda locally with cargo lambda is a pleasant experience. I open two terminal windows and place them one next to the other. I run cargo lambda watch
in the right one, and cargo lambda invoke -F events/flights.json
in the left.
As you might imagine, I needed a few iterations before I got the FlightData
deserialization properly defined. With cargo lambda
iterating is super smooth - in the watch mode lambda is rebuilt automatically on save and can be invoked right away.
Summary
In the current post, I prepared a local environment for building a lambda function that consumes data from OpenSearch. This is a nice starting point.
As the next steps, I will implement more real-life queries, write an indexer function to populate data, and finally, deploy the whole solution to AWS. Bear with me if you are interested in this topic.
Top comments (0)