Introduction:
This short article talks about handling a specific scenario with AppSync when it comes to dealing with sending real-time updates for your event-driven serverless applications using GraphQL. If you would like to, more on GraphQL subscriptions can be explored here.
Subscription events via AppSync especially for those updates that are not triggered through a direct mutation call (calls made by your system resulting in data changes) has been addressed previously if you were to do a quick Google check. In short, the general working is that a subscription response is returned when the relevant mutation call is made. So for out-of-band updates, after triggering a mutation call that does nothing, we’re able to return the response data to the subscriber. How that mutation call is made depends on your flow and the way it fits in your event-driven way of doing it efficiently.
In this article, I just show how we could go one step further and implement the whole flow with no code (without a Lambda function) using Eventbridge Pipes!
Eventbridge Pipes, if you’re unfamiliar with it, enables connecting point-to-point integrations between your source and target with added capabilities for filtering and enrichment flows without needing Lambda(s) in between for those operations. In a distributed event-driven system, Pipes can be a great tool to reduce overhead and streamline flows considerably.
Flow set-up:
Let’s say there is a requirement to send subscription updates for a specific product ID whenever a new order is created by the system. All new orders are persisted into a DynamoDB table, and this table will be our source for triggering the flow using Pipes.
DynamoDB streams is what we will configure as the source for our pipe.
Since we’re interested in processing only those events that have been inserted for the first time into the table, we will set up the filtering to look for “INSERT” event type only.
There is no specific requirement for enrichment, so we can move on to setting up our target, which is API Destination. Note that the AppSync API and schema would need to be already created before configuring the API destination.
It’s here where we will need to provide the AppSync endpoint and set up the payload transformation to build a mutation request which will be used to hit the endpoint.
Our GraphQL schema will look something like this. (For simplicity, I have set this up to only focus on the subscription flow and nothing else)
schema {
mutation: Mutation
query: Query
subscription: Subscription
}
type Mutation {
createOrder(createOrderInput: CreateOrderInput!): OrderResponse
}
type Query {
test(id:String): String
}
type Subscription {
onOrderCreate(productId: String!): OrderResponse
@aws_subscribe(mutations: ["createOrder"])
}
input CreateOrderInput {
userId: String!
orderId: String!
productId: String!
}
type OrderResponse {
productId: String
}
And that’s pretty much it! You can do a quick test by making a subscription call at the AppSync console by providing a product ID in the input, and then insert a record at DynamoDB containing the same product ID value. The response should be immediately reflected in the console.
Here is the complete repository set-up for reference.
Quick tip: When working with pipes, first set up the flow manually at the console and test it out. You can then export the cloudformation template from the console and add it to your serverless.yml file. This way you don’t spend time writing the IaC code for it!
Conclusion:
Pipes is a great tool to work with especially when you need to connect different AWS services to talk to each other and perform payload transformations in between. Pipes support for sources is mostly with streams but the target list is comparatively extensive. The usage of pipes makes it conducive to set up event-driven systems that are consistent across your applications, and therefore is worth exploring the possibility of incorporating it within your services.
Top comments (0)