In the previous part we prepared a local environment for testing lambda function integration with OpenSearch. The code for the first part is here
The code for this part is on branch 2-implement-logic
Prerequisites
I assume, that the OpenSearch cluster runs locally. Please check the Part 1 if you want to follow the same steps as I did.
Goals
With the lambda function and OpenSearch running locally, I can now iterate over the code and add business logic.
I would like my lambda to receive an event with a list of fields to use in the OpenSearch query. Later, I could connect the function to the REST API Gateway or use it as a data source for AppSync.
My next steps are the following:
- prepare OpenSearch query
- update lambda input type
- test lambda locally
OpenSearch query
In my case, a query to OpenSearch looks more or less like this:
{
"size": 2,
"query": {
"bool": {
"must": [
{
"range": {
"AvgTicketPrice": {
"gte": 500,
"lte": 900
}
}
},
{
"match": {
"DestWeather": "Sunny"
}
}
]
}
}
}
I have some matches and range. For the sake of my example, it is enough.
My idea is to create a struct
that reflects the structure of the query and to implement a builder pattern, so I can construct the query with ease.
Let's go to the functions/opensearch_service/src/lib.rs
and start creating types. Going from the top I create uery->bool->must
structure
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenSearchQuery {
query: BoolQuery,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BoolQuery {
bool: MustQuery,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MustQuery {
must: Vec<QueryStatement>,
}
Thanks to serde
and serde_json
it's enough to annotate structs with Serialize
and Deserialize
to have both operations covered.
In my case QueryStatement
might have two variants: match
or range
. Rust provides a native way to define discriminated unions ("OR types"):
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum QueryStatement {
MatchStatement(MatchStatement),
RangeStatement(RangeStatement),
}
Enums can be represented in JSON in different ways. I let serde
know that I want to keep them untagged
. This way they "disappear" in the JSON leaving only properties of enumerated types.
#[derive(Debug, Serialize, Deserialize)]
pub struct MatchStatement {
#[serde(rename = "match")]
match_statement: Value,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RangeStatement {
range: Value,
}
The word "match" is a reserved keyword. I use match_statement as a struct property and rename it only during serialization.
Value
is a type provided by serde_json
. It might represent various types available in JSON. I probably could spend more time on making my domain a bit more precise, however, I decided to leave it as this. The main reason is that I will use a builder to construct the query, so I don't expect any ambiguity in the final object.
pub struct OpenSearchQueryBuilder {
query: OpenSearchQuery,
}
impl OpenSearchQueryBuilder {
pub fn new() -> Self {
Self {
query: OpenSearchQuery {
query: BoolQuery {
bool: MustQuery { must: vec![] },
},
},
}
}
pub fn with_must_match(mut self, field: &str, value: String) -> Self {
if value.is_empty() {
return self;
}
self.query
.query
.bool
.must
.push(QueryStatement::MatchStatement(MatchStatement {
match_statement: json!({
field: value
}),
}));
self
}
pub fn with_must_range(mut self, field: &str, from: Option<f64>, to: Option<f64>) -> Self {
let range = json!({
field: {
"gte": from,
"lte": to
}
});
self.query
.query
.bool
.must
.push(QueryStatement::RangeStatement(RangeStatement { range }));
self
}
pub fn build(self) -> OpenSearchQuery {
self.query
}
}
We are almost done here. I updated my first iteration of query_all_docs
funtion so it can be used with created OpenSearchQuery
type
pub async fn query<T>(
&self,
index: &str,
limit: i64,
offset: i64,
query: OpenSearchQuery,
) -> anyhow::Result<Vec<T>>
where
T: DeserializeOwned,
{
let query_json = json!(query);
println!("query: {}", query_json);
let response = self
.client
.search(SearchParts::Index(&[index]))
.size(limit)
.from(offset)
.body(query_json)
.send()
.await?;
let response_body = response.json::<Value>().await?;
let result = response_body["hits"]["hits"]
.as_array()
.unwrap()
.iter()
.map(|raw_value| serde_json::from_value::<T>(raw_value["_source"].clone()).unwrap())
.collect::<Vec<_>>();
Ok(result)
}
Looks good. Now let's update lambda function code.
Lambda function
In the functions/query/src/main.rs
First of all, I update the Request
type.
#[derive(Deserialize, Clone, Copy)]
struct Pagination {
limit: Option<i64>,
offset: Option<i64>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Request {
destination_city_name: Option<String>,
origin_city_name: Option<String>,
destination_weather: Option<String>,
origin_weather: Option<String>,
max_avg_ticket_price: Option<f64>,
min_avg_ticket_price: Option<f64>,
pagination: Option<Pagination>,
}
I can query by destination city and weather, the same for the origin city. There is a way to define average ticket price limits. Finally, we have some basic pagination.
The main point is that all properties are optional, so the caller has a lot of flexibility in querying data.
The function handler stays straightforward - the only new action is building the query
async fn function_handler(os_client: &OpenSearchService, event: LambdaEvent<Request>) -> Result<Response, Error> {
let request_body = event.payload;
let index = "opensearch_dashboards_sample_data_flights";
let limit = request_body.pagination.and_then(|p| p.limit).unwrap_or(10);
let offset = request_body.pagination.and_then(|p| p.offset).unwrap_or(0);
let query = OpenSearchQueryBuilder::new()
.with_must_match("OriginWeather", request_body.origin_weather.unwrap_or("".to_string()))
.with_must_match("DestWeather", request_body.destination_weather.unwrap_or("".to_string()))
.with_must_match("DestCityName", request_body.destination_city_name.unwrap_or("".to_string()))
.with_must_match("OriginCityName", request_body.origin_city_name.unwrap_or("".to_string()))
.with_must_range("AvgTicketPrice", request_body.min_avg_ticket_price, request_body.max_avg_ticket_price)
.build();
let query_result = os_client.query::<FlightData>(index, limit, offset, query).await?;
// Prepare the response
let resp = Response {
flights: query_result,
};
Ok(resp)
}
Testing locally
Using cargo lambda
I run the function
cd functions/query
cargo lambda watch
Now I can invoke it with the provided input. I update my events/flights.json
{
"destinationCityName": "London",
"maxAvgTicketPrice": 600,
"minAvgTicketPrice": 400,
"pagination": {
"limit": 1,
"offset": 10
}
}
And, in the second terminal window, I run
cargo lambda invoke -F events/flights.json
🎉 🎉 🎉
It works.
Now I can update test events, or create more events, to test my query.
Summary
In this part, we implemented an OpenSearchQuery
with a builder to be used by the lambda function. We also updated the lambda function itself, so now it prepares queries based on incoming Request
.
For now, we didn't need to touch the AWS cloud. The whole integration was created locally thanks to OpenSearch docker compose
and cargo lambda
.
I believe that the ability to work locally with the short feedback loop helps boost the developer's performance and feels really nice.
Next steps
In the next part, I plan to define IaC and deploy the solution to the AWS.
Stay tuned!
Top comments (0)