DEV Community

jeikabu
jeikabu

Posted on • Originally published at rendered-obsolete.github.io on

Async Rust Beta- Quick Peek

As announced on the Rust Blog a few weeks ago, the long awaited async-await syntax hit beta and is slated for release with 1.39 early November. Take a look at the Async Book for an in-depth introduction.

In eager anticipation of async-await, we’ve been using futures 0.1 crate for some time. But, we’re long overdue to finally take a look at where things are headed. The following is some loosely structured notes on the upcoming “stable” state of futures and async/await.

Foot in the Door

cargo new async and in Cargo.toml add:

[dependencies]
futures = { version = "0.3.0-alpha", package = "futures-preview" }
Enter fullscreen mode Exit fullscreen mode

Here using the “mostly final” Futures 0.3 crate.

As explained in The Book there are three release channels: nightly, beta, and stable. You may have used “nightly” before to access the latest and greatest features- this is where async/await previously lived. “beta” is where a version goes before becoming the next “stable” release.

# Update all installed toolchains
rustup update
# List installed toolchains
rustup toolchain list
# Install beta toolchain
rustup install beta
Enter fullscreen mode Exit fullscreen mode

There’s a few ways to use the beta/nightly channels:

# List toolchain overrides
rustup override list
# Set default toolchain for directory
rustup override set beta # or "nightly"
# Now defaults to beta toolchain
cargo build

# Explicitly build using beta/nightly toolchain
cargo +beta build # or "+nightly"
Enter fullscreen mode Exit fullscreen mode

In src/main.rs:

async fn hello_world() {
    println!("Hello world");
}

async fn start() {
    hello_world().await
}

fn main() {
    let future = start();
    futures::executor::block_on(future);
}
Enter fullscreen mode Exit fullscreen mode

cargo run to greet the world.

Notice how await is post-fix instead of await hello_world() as found in many other languages. The syntax was heavily debated, but the rationale boils down to improving: method chaining, co-existance with the ? operator, and precedence rules.

A contrived example with a series of calls (some of which can fail):

let answer = can_fail().await?
    .some_func().await
    .member_can_fail().await?
    .get_answer()?;
Enter fullscreen mode Exit fullscreen mode

You can’t understand async without understanding Rust’s Future trait. Perhaps the first thing to learn about Future is they’re lazy; nothing happens unless something “pumps” them- as executor::block_on does here. Contrast this with std::thread::spawn which creates a running thread. If futures are polled, does that mean Rust async programming isn’t event-driven à la epoll/kqueue? Don’t fret, a Waker can be used to signal the future is ready to be polled again.

Error-Handling

We have test code like:

while !done.load(Ordering::Relaxed) {
    match block_on(ctx.receive()) {
        Ok(msg) => {
            let pipe = msg.get_pipe()?;
            let mut response = msg.dup()?;
            response.set_pipe(&pipe);
            block_on(ctx.send(response))?;
        }
        _ => panic!(),
    }
}
Enter fullscreen mode Exit fullscreen mode

How we might re-write it:

let future = async {
    while !done.load(Ordering::Relaxed) {
        match ctx.receive().await {
            Ok(msg) => {
                let pipe = msg.get_pipe()?;
                let mut response = msg.dup()?;
                response.set_pipe(&pipe);
                ctx.send(response).await?;
            }
            _ => panic!(),
        }
    }
};

block_on(future);
Enter fullscreen mode Exit fullscreen mode

Unfortunately, how the ? operator works in async blocks (i.e. async {}) is not defined, and async closures (i.e. async || {}) are unstable.

If we replace ? with .unwrap() it compiles and runs.

Heterogeneous Returns

Given:

broker_pull_ctx.receive().for_each(|msg| {
    if let Ok(msg) = msg {
        broker_push_ctx.send(msg).then(|msg| {
            // Do something with the message
            future::ready(())
        })
    } else {
        future::ready(())
    }
});
Enter fullscreen mode Exit fullscreen mode

Sadness:

|
144 | / if let Ok(msg) = msg {
145 | | broker_push_ctx.send(msg).then(|res| {
    | | _____________________ -
146 | || res.unwrap().unwrap();
147 | || future::ready(())
148 | || })
    | || ______________________ - expected because of this
149 | | } else {
150 | | future::ready(())
    | | ^^^^^^^^^^^^^^^^^ expected struct `futures_util::future::then::Then`, found struct `futures_util::future::ready::Ready`
151 | | }
    | | _________________ - if and else have incompatible types
    |
    = note: expected type `futures_util::future::then::Then<futures_channel::oneshot::Receiver<std::result::Result<(), runng::result::Error>>, futures_util::future::ready::Ready<_>, [closure@runng/tests/test_main.rs:145:52: 148:22]>`
               found type `futures_util::future::ready::Ready<_>`
Enter fullscreen mode Exit fullscreen mode

Basically, then()- like many Rust combinators- returns a distinct type (Then in this case).

If you reach for a trait object for type erasure via -> Box<dyn Future<Output = ()>> and wrap the returns in Box::new() you’ll run into:

error[E0277]: the trait bound `dyn core::future::future::Future<Output = ()>: std::marker::Unpin` is not satisfied
   --> runng/tests/test_main.rs:155:58
    |
155 | let fut = broker_pull_ctx.receive().unwrap().for_each(|msg| -> Box<dyn Future<Output = ()>> {
    | ^^^^^^^^ the trait `std::marker::Unpin` is not implemented for `dyn core::future::future::Future<Output = ()>`
    |
    = note: required because of the requirements on the impl of `core::future::future::Future` for `std::boxed::Box<dyn core::future::future::Future<Output = ()>>`
Enter fullscreen mode Exit fullscreen mode

Lo, the 1.33 feature “pinning”. Thankfully, the type-insanity that is Pin<Box<dyn Future<Output = T>>> is common enough that a future::BoxFuture<T> alias is provided:

let fut = broker_pull_ctx...for_each(|msg| -> future::BoxFuture<()> {
    if let Ok(msg) = msg {
        Box::pin(broker_push_ctx.send(msg).then(|_| { }))
    } else {
        Box::pin(future::ready(()))
    }
});
block_on(fut);
Enter fullscreen mode Exit fullscreen mode

Alternatively, you can multiplex the return with something like future::Either:

let fut = broker_pull_ctx...for_each(|msg| {
    use futures::future::Either;
    if let Ok(msg) = msg {
        Either::Left(
            broker_push_ctx.send(msg).then(|_| { })
        )
    } else {
        Either::Right(future::ready(()))
    }
});
block_on(fut);
Enter fullscreen mode Exit fullscreen mode

This avoids the boxing allocation, but it might become a bit gnarly if there’s a large number of return types.

block_on() != .await

Our initial, exploratory implementation made heavy use of wait() found in futures 0.1. To transition to async/await it’s tempting to replace wait() with block_on():

#[test]
fn block_on_panic() -> runng::Result<()> {
    let url = get_url();

    let mut rep_socket = protocol::Rep0::open()?;
    let mut rep_ctx = rep_socket.listen(&url)?.create_async()?;

    let fut = async {
        block_on(rep_ctx.receive()).unwrap();
    };
    block_on(fut);

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

cargo test block_on_panic yields:

---- tests::reqrep_tests::block_on_panic stdout ----
thread 'tests::reqrep_tests::block_on_panic' panicked at 'cannot execute `LocalPool` executor from within another executor: EnterError', src/libcore/result.rs:1165:5
Enter fullscreen mode Exit fullscreen mode

Note this isn’t a compiler error, it’s a runtime panic. I haven’t looked into the details of this, but the problem stems from the nested calls to block_on(). It seems that if the inner future finishes immediately everything is fine, but not if it blocks. However, it works as expected with await:

let fut = async {
    rep_ctx.receive().await.unwrap();
};
block_on(fut);
Enter fullscreen mode Exit fullscreen mode

Async Traits

Try:

trait AsyncTrait {
    async fn do_stuff();
}
Enter fullscreen mode Exit fullscreen mode

Nope:

error[E0706]: trait fns cannot be declared `async`
 --> src/main.rs:5:5
  |
5 | async fn do_stuff();
  | ^^^^^^^^^^^^^^^^^^^^
Enter fullscreen mode Exit fullscreen mode

How about:

trait AsyncTrait {
    fn do_stuff();
}

struct Type;

impl AsyncTrait for Type {
    async fn do_stuff() { }
}
Enter fullscreen mode Exit fullscreen mode

Nope:

error[E0706]: trait fns cannot be declared `async`
  --> src/main.rs:11:5
   |
11 | async fn do_stuff() { }
   | ^^^^^^^^^^^^^^^^^^^^^^^
Enter fullscreen mode Exit fullscreen mode

One work-around involves being explicit about the return types, and using an async block within the impl:

use futures::future::{self, BoxFuture, Future};

trait AsyncTrait {
    fn boxed_trait() -> Box<dyn Future<Output = ()>>;
    fn pinned_box() -> BoxFuture<'static, ()>;
}

impl<T> AsyncTrait for T {
    fn boxed_trait() -> Box<dyn Future<Output = ()>> {
        Box::new(async {
            // .await to your heart's content
        })
    }
    fn pinned_box() -> BoxFuture<'static, ()> {
        Box::pin(async {
            // .await to your heart's content
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (3)

Collapse
 
saint4eva profile image
saint4eva

Nice writeup. I believe the Rust team and community drew a lot of inspiration from the C# async-await, during their planning and design stages? Good to see a lot of other programming languages incorporating this feature into their various ecosystems. Once more, we'll done Rust.

Collapse
 
jeikabu profile image
jeikabu

From the various threads I've read they look at a wide range of "prior art", and it was the main argument against postfix ".await"- it would be foreign to everyone.

Should have mentioned this is just the MVP to unblock the ecosystem and async-await is far from "finished". Should probably sneak that into the opening somewhere.

Collapse
 
saint4eva profile image
saint4eva

Cool.