Hi everyone!
While learning Rust I stumbled upon problem not covered by popular online tutorials. When they talk about tokio
and MPSC
(multi-producer single-consumer) channels they usually connect spawned threads in some fixed way. However in my project I have to match dynamically asynchronous producers and consumers in various configurations. So let me share useful pattern I've discovered in my Rust journey.
Let's say we have a restaurant:
$ cargo init restaurant
Creating binary (application) package
$ cd factory
$ cargo add tokio --features=full
Updating crates.io index
Adding tokio v1.38.0 to dependencies
...
As a manager we can assign different cooking stands to asynchronously prepare different types of food (don't worry about undefined values for now):
async fn cooking_stand (food: char) {
loop {
somewhere.send(food.clone()).await;
}
}
Food should be delivered to tables awaiting it.
async fn table (number: u8) {
loop {
let food = somehow.recv().await;
println!("Got {} at table {}", food, number);
}
}
Now we can organize our restaurant:
#[tokio::main]
async fn main () {
// cooking stands
tokio::spawn(cooking_stand('π₯')); // salad
tokio::spawn(cooking_stand('π')); // burger
...
// tables for guests
tokio::spawn(table(1));
tokio::spawn(table(2));
...
// keep our restaurant open for 1s
sleep(Duration::from_millis(1000)).await;
}
Problem
For simplicity let's assume we accept orders through application. So restaurant manager (main thread) knows for example that table 1
is waiting for π₯ and table 3
is waiting for π. But how to actually fullfil those orders?
Naive approach
cooking_stand -> π₯π₯π₯π₯π₯ -> -> table 1
cooking_stand -> πππππ -> manager -> table 2
cooking_stand -> πππππ -> -> table 3
If we force manager to do the job he can wait for π₯ cooking stand to prepare salad and then pass it to table 1
. Then wait for π cooking stand to prepare burger and carry it to table 3
. This is obviously flawed design:
- Cooking stands produce food whether it is needed or not.
- If cooking stand is slow then manager will be waiting for food to be prepared.
- Manager should not do the heavy lifting because it affects his responsiveness.
We need waiters
Fortunately tokio gives perfect tool for the job - oneshot channels. Those channels are designed and optimized to pass single value one time.
let (waiter_rx, waiter_tx) = oneshot::channel::<char>();
To make waiter deliver π₯ to table 1
first we need to modify our cooking stands:
async fn cooking_stand (
product: char,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>
) {
while let Some(waiter) = waiters.recv().await {
waiter.send(product.clone());
}
}
Where tokio::sync::mpsc::Receiver<oneshot::Sender<char>>
is a queue of waiters. Yes, you read it right. You can send oneshot channels through other channels. When waiter arrives at cooking stand then cooking stand prepares food and gives it to waiter for being delivered to table. Let's do the same for tables, but they should get receiving part of specific waiter who will bring food to them:
async fn table (
number: u8,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>
) {
while let Some(waiter) = waiters.recv().await {
let food = waiter.await.unwrap();
println!("Got {} at table {}", food, number);
}
}
When waiter is assigned to table customer waits for this waiter to deliver food produced by food stand. And to complete puzzle let's modify our main
function. Manager, instead of doing the heavy lifting himself, can hire waiters and assign them to pairs of cooking stands and tables to fullfill food orders.
#[tokio::main]
async fn main () {
// used by manager to send waiters to cooking stands
let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
// set up cooking stands
tokio::spawn(cooking_stand('π₯', stand_salad_rx));
tokio::spawn(cooking_stand('π', stand_pizza_rx));
tokio::spawn(cooking_stand('π', stand_burger_rx));
// used by manager to send waiters to tables
let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();
// set up tables
for number in 1..=4 {
let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
tables.push(table_tx);
tokio::spawn(table(number, table_rx));
}
}
Let's check if it works by adding following code at the end of our main
:
// create waiter
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// send him for food to salad stand
stand_salad_tx.send(waiter_tx).await;
// send him to deliver food to table `1`
tables.get(0).unwrap().send(waiter_rx).await;
// manager can go back to doing his stuff
// keep our restaurant open for 1s
sleep(Duration::from_millis(1000)).await;
When ran it produces following output:
Got π₯ at table 1
Yay!
Conclusions
This pattern of sending two halves of oneshot channels through regular channels to tokio spawns can be used to implement all kind of traffic control. Passing messages with given ratio, with throttling, etc.
Is it efficient? Very! I was surprised how well oneshot channels are optimized. Single core of my Ryzen 6800U processor was able to create over
5_000_000
oneshot channels and send them to corresponding spawns per second. That's crazy fast.How to scale it? There will be situations when manager may encounter overfill of channels (in tokio all channels are capped) and will not be able to immediately send oneshot channel. In that cases you may for example increase amount of producers/consumers by issuing more spawns. Like add another burger stand and send oneshots to them in round robin order. Everything depends on what your spawns are actually doing.
What about error handling? You must have oneshot channel behavior in mind: If the Receiver is closed before receiving a message which has already been sent, the message will remain in the channel until the receiver is dropped, at which point the message will be dropped immediately. So even if two halves of oneshot channels were sent to corresponding spawns it still does not mean it's purpose will be fullfilled. Error handling in this case depends on which scenario you implement and how you need to react on delivery issues.
Thanks for reading
This is my first Rust post and I'm still discovering its features. If you think something could/should be better implemented then let me know in comments.
Top comments (0)