DEV Community

ProgramCrafter
ProgramCrafter

Posted on

Rust async is not very scary!

If you know architecture of your code in advance, you can easily manage to make the code asynchronous!

Let's examine such an example: you want a function that downloads data from given URL, caches it for future use, and has a timeout for loading (so if remote server hangs the program can keep running). There are basically two solutions:

  1. Create just an async caching-downloading function, then race it against timeout. This creates problem when function is not written in cancellation-safe way: it can be terminated at any .await point, which may be after data loading but before putting it into cache.
  2. Apply timeout exactly to downloading part! We can do this even without factoring out a separate function - that is, without overhead on creating additional Future objects.

Coding

Let's suppose we have a separate cache for each user. If so, the downloader struct will look as below:

use tokio::time::{sleep, Duration};
use std::collections::HashMap;

mod mock;
use mock::{AsyncInChannel, AsyncOutChannel, Url};

struct CacheForUser {
    cache: HashMap<Url, String>,
    channel: AsyncOutChannel,
}
Enter fullscreen mode Exit fullscreen mode

Then, we write the function we actually needed. Since downloading can timeout, it will return Result.

impl CacheForUser {
    async fn get_response(&mut self, url: Url) -> Result<(), &'static str> {
        if let Some(cached_response) = self.cache.get(&url) {
            self.channel.send(cached_response).await;
            return Ok(());
        }
        ...
Enter fullscreen mode Exit fullscreen mode

Here comes the part where the deadline applies! Let's create the appropriate timeout-tracking Future.

tokio::select! awaits Futures (Promises in JS terminology) it gets and, like match block, processes the first one ready.

        let response = {
            tokio::pin! {let response_deadline = sleep(Duration::from_millis(600));}

            let in_channel;
            tokio::select! {
                c = AsyncInChannel::connect(url) => {in_channel = c;}
                _ = &mut response_deadline       => {return Err("timeout on connection");}
            }

            let data;
            tokio::select! {
                d = in_channel.read()       => {data = d;}
                _ = &mut response_deadline  => {return Err("timeout on reading data");}
            }
            data
        };        // timeout no longer applies
Enter fullscreen mode Exit fullscreen mode

From the end of the block, we can be sure that timeout will not interrupt our work. Then, we don't need special consideration for order of cache write and await for sending data out.

        ...
        self.channel.send(&response).await;
        self.cache.insert(url, response);
        Ok(())
    }
}
Enter fullscreen mode Exit fullscreen mode

Mocking

The easiest way to mock this is to have outbound channel write to Stdout, and inbound channel read some constant value (but with delay configured by Url). We don't even need to make Url a string during tests, we may just pass pair of millisecond delays there.

// mock.rs
use tokio::io::{stdout, Stdout, AsyncWriteExt};
use tokio::time::{sleep, Duration};


pub type Url = (u64, u64);

pub struct AsyncOutChannel {
    stdout: Stdout
}
impl AsyncOutChannel {
    pub fn new() -> AsyncOutChannel {
        AsyncOutChannel {stdout: stdout()}
    }
    pub async fn send(&mut self, s: &str) {
        self.stdout.write_all(s.as_bytes()).await
            .expect("IO error on stdout::write_all");
        self.stdout.flush().await
            .expect("IO error on stdout::flush");
    }
}


pub struct AsyncInChannel {
    read_time: u64
}
impl AsyncInChannel {
    pub async fn connect(url: Url) -> AsyncInChannel {
        sleep(Duration::from_millis(url.0)).await;
        AsyncInChannel {read_time: url.1}
    }
    pub async fn read(&self) -> String {
        sleep(Duration::from_millis(self.read_time)).await;
        "read complete\n".to_owned()
    }
}
Enter fullscreen mode Exit fullscreen mode

Testing

Everything is quite straightforward here!

// main.rs
...

#[tokio::main]
async fn main() {
    let mut cfu = CacheForUser {cache: HashMap::new(),
        channel: AsyncOutChannel::new()};

    println!("100,700ms: {:?}", cfu.get_response( (100,700) ).await);
    println!("100,550ms: {:?}", cfu.get_response( (100,550) ).await);
    println!("500,150ms: {:?}", cfu.get_response( (500,150) ).await);
    println!("800,150ms: {:?}", cfu.get_response( (800,150) ).await);
    println!("100,150ms: {:?}", cfu.get_response( (100,150) ).await);
}
Enter fullscreen mode Exit fullscreen mode

We'll see the following results:

100,700ms: Err("timeout on reading data")
100,550ms: Err("timeout on reading data")
500,150ms: Err("timeout on reading data")
800,150ms: Err("timeout on connection")
read complete
100,150ms: Ok(())
Enter fullscreen mode Exit fullscreen mode

I highly wonder whether it's possible to write such code in languages like Python or JS and be sure about timeout working correctly!

Top comments (0)