DEV Community

Cover image for That's so Rusty! Fearless concurrency
Imaculate
Imaculate

Posted on • Updated on

That's so Rusty! Fearless concurrency

A concurrent program is one which multitasks (or appears to be), that is two or more tasks run in overlapping time spans. These tasks are executed by threads, the smallest executable unit of a process. Under the hood, is not exactly multitasking but context switching between threads very rapidly that our mortal senses can't detect. Many modern applications rely on this illusion; for instance a server can wait for other requests while processing other requests. A lot could go wrong when threads share data, the most common are two kinds of bugs: race conditions and deadlock.

  1. Race conditions happens when shared data is accessed and/or mutated by threads in an inconsistent order. This has serious repercussions for transactions that must be atomic. It is essential that only one thread accesses shared data at a time, and that the program works irrespective of the order in which threads will run.

  2. Deadlock happens when two threads or more are waiting for each other to take action, due to a lock on resource that neither will attain, resulting in infinite hang. For instance it can happen with two threads when thread1 needs two resources, it has acquired one lock on resource1 and is waiting for thread2 to release resource2 but then thread2 is also waiting on thread1 to release resource1. This situation is known as circular wait. If the threads can hold the resources while waiting to acquire locks and pre-emption is not supported, deadlock becomes irrecoverable.

Avoiding these bugs in any language requires a great deal of discipline. When they do trickle in, they can be cumbersome to detect. In the best case these bugs manifest in crashes and hangs, in the worst case, programs runs unpredictably. That said, guarantee of fearless concurrency is not to be taken lightly. As it so happens, Rust claims to provide just that, or does it?

Rust Concurrency

The building blocks of Rust Concurrency are threads and closures.

1. Closures

Closures are anonymous functions that can access variables from the scope they were defined in. They are one of the functional paradigm features of Rust; they can be assigned to variables, passed as arguments and returned from functions. Their scope is limited to local variables, therefore not exposed beyond the crate. Syntactically, they are very similar to functions except they are unnamed, parameters are passed in vertical bar brackets (||) and type annotations are optional. Unlike functions, closures can access variables from the scope in which they are defined. The variables captured can be borrowed or moved into the closure depending on variable type and how it is used in the closure. Below is an example of a closure (assigned to print_greeting) with one parameter and captures variable generic_greeting.

fn main() {
    let generic_greeting = String::from("Good day,");
    let print_greeting = |name| println!("{} {}!", generic_greeting, name);
    let person = String::from("Crab");
    print_greeting(person);
    // println!("Can I use generic greeting? {}", generic_greeting);
    // println!("Can I use person {}", person);
}
Enter fullscreen mode Exit fullscreen mode

In this example, both variables person and generic_greeting are moved into the closure, therefore they can't be used after calling the closure. The program won't compile on uncommenting the last two print statements.

2. Threads

Rust concurrency is achieved by spawning multiple threads to run different tasks wrapped in parameter-less closures. When a thread is spawned, the return type is JoinHandle type, the main thread doesn't wait for JoinHandle to complete unless it is joined. For this reason, closures passed to threads must take ownership of captured variables to ensure they are still valid when the thread eventually executes. This is demonstrated in the following example.

use std::thread;
use std::time::Duration;

fn main() {
    let t1 = thread::spawn(|| {
        for i in 1..10 {
            println!("Greeting {} from other thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("Greeting {} from main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
    t1.join().unwrap();
}
Enter fullscreen mode Exit fullscreen mode

We get full output from both threads, 9 greetings from spawned thread and 4 greetings from main thread.
Interleave

When t1.join().unwrap() is commented out, the program exits when the main thread completes. You can try it on the Rust playground.

Alt Text

It is also worth noting that the output may vary on multiple runs of the program. Such non-determinism is characteristic of concurrency and as we'll eventually see, a source of many bugs.

As far as multitasking goes, it is crucial that threads share information. Through the standard library, Rust supports communication through two methods: Message Passing and Shared-State.

1. Message passing

Rust supports channels, through which threads can send and receive messages. An example is the Multiple Producers Single Consumers(shortened as mpsc) channel. This channel allows multiple transmitters to communicate with a single receiver, where these components may be in different threads. The channel takes ownership of variables at transmitter end and gives it up at the receiver end. The example below shows how messages are passed in a channel with two transmitters and one receiver.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx1, rx) = mpsc::channel();
    let tx2 = mpsc::Sender::clone(&tx1);

    thread::spawn(move || {
        println!("Sending message from thread 1");
        tx1.send(String::from("Greeting from thread 1")).unwrap();
    });

    thread::spawn(move || {
        println!("Sending message from thread 2");
        tx2.send(String::from("Greeting from thread 2")).unwrap();
    });

    for recvd in rx {
        println!("Received: {}", recvd);
    }
}
Enter fullscreen mode Exit fullscreen mode

A second transmitter is made by cloning the original one. There is no need to join the threads since the receiver blocks until all the channel messages are received, there by waits for thread execution to complete.

2. Shared state

Another means of communication is by sharing memory. Message passing is great but it has the limitation of single ownership; objects have to be moved/cloned to be transmitted to another thread, if the object is moved it is rendered unusable, if it is cloned any updates to it will have to be communicated by message passing. The workaround to this is multiple ownership, which you may recall from smartpointers post is achievable with reference counted smartpointers Rc<T>. We saw how combined with RefCell<T>, we created mutable shared pointers. Why not use them in a multithreaded program? One attempt is shown below:

fn main() {
    let counter = Rc::new(RefCell::new(0));
    let mut threads = vec![];

    for _ in 0..5 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = (*counter).borrow_mut();
            *num += 1;
        });
        threads.push(handle);
    }

    for thread in threads {
        thread.join().unwrap();
    }

    println!("Result: {}", *counter.borrow());
}
Enter fullscreen mode Exit fullscreen mode

A shared pointer to a counter is sent to 5 threads, each of which increments it by 1. The number is then printed after the threads complete. Will this work?

ErrorSend

It doesn't compile but the error message is on to something. The type Rc<RefCell<T>> can not be sent between threads safely. Further down is the reason; it doesn't implement the Send trait.
Looks like the intent has been correctly expressed but we need thread safe versions of the pointer(s). Do safe alternatives exist? Indeed, the alternative to Rc<T> is the atomic reference counted type Arc<T>. In addition to properties of Rc<T> (shared ownership) it can be safely shared between threads by implementing the Send trait. Making a substitution in the program should get us closer to working state. Does the below snippet compile?

use std::cell::RefCell;
use std::thread;
use std::sync::Arc;

fn main() {
    let counter = Arc::new(RefCell::new(0));
    let mut threads = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = (*counter).borrow_mut();
            *num += 1;
        });
        threads.push(handle);
    }

    for thread in threads {
        thread.join().unwrap();
    }

    println!("Result: {}", *counter.borrow());
}
Enter fullscreen mode Exit fullscreen mode

No, We have a similar but slightly different error now. RefCell<T> can't be shared among threads, doesn't implement Send and Sync traits.
ErrorSync

If RefCell<T> is not suitable for concurrency, there has to be a thread safe alternative. In fact, the Mutex (Mutex<T>) is just that; in addition to providing interior mutability, mutexes can be shared between threads. A thread must acquire a lock before accessing mutex object, ensuring single thread access at a time. Locking a mutex returns LockResult<MutexGuard<T>> type of smartpointer. A LockResult is an enum that can be Ok<T> or Error. For simplicity, we extract it by calling unwrap() which returns the inner object (MutexGuard in this case) if its Ok and panics on error. A MutexGuard is another smartpointer that can be dereferenced to access the inner object, the lock is released when thr LockResult goes out of scope. For our program, we substitute Mutex<T> for RefCell<T> and update the way it is manipulated as shown below:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut threads = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num_guard = (*counter).lock().unwrap();
            *num_guard += 1;
        });
        threads.push(handle);
    }

    for thread in threads {
        thread.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Enter fullscreen mode Exit fullscreen mode

Much to our delight, it works!
Success

It is worth noting that the atomicity of Arc<T> and Mutex<T> comes with performance penalty. It is possible but not wise to use them in single threaded programs.

Knowing what we know, is Rust concurrency as fearless as claimed? The answer lies in the way it handles the most common concurrency pitfalls some of which were introduced above.

1. Race conditions

We established in Mutables that data races/inconsistency happens when two or more pointers access the same data at the same time, at least one of the pointers is being used to write to the data and access to the data is not synchronized. By ensuring a mutex is always associated with an object, access to the object is always synchronized. This is unlike C++ where a mutex is a separate entity and the programmer has to manually ensure a lock is acquired before accessing a resource. Below is an example of how incorrect use of mutex can cause inconsistencies.

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;

mutex mtx;
int counter = 0;

void increment_to_5(int id)
{
    cout << "Executing from thread: " << id << endl;
    if (counter < 5)
    {
        lock_guard<mutex> lck(mtx);
        counter++;
    }
}

int main()
{
    thread threads[10];
    for (int i = 0; i < 10; ++i)
        threads[i] = thread(increment_to_5, i);

    for (auto &thread : threads)
        thread.join();

    cout << "Final result: " << counter << endl;
    return 0;
}
Enter fullscreen mode Exit fullscreen mode

The function increment_to_5() is called in different threads and it is supposed to increment counter variable until it gets to 5.
Although it is reasonable to only lock the mutex before the critical section (incrementing it), the lock should have been acquired before accessing comparing counter to 5. Otherwise, multiple threads may read a stale value, and increase it beyond the desired 5. This consequence is made clear on adding a small delay before lock acquisition as shown below.

cout << "Executing from thread: " << id << endl;
    if (counter < 5)
    {
        this_thread::sleep_for(chrono::milliseconds(1));
        lock_guard<mutex> lck(mtx);
        counter++;
    }
}
Enter fullscreen mode Exit fullscreen mode

The final value of counter will vary on different runs. The Rust translation of this program won't encounter this problem because counter is a mutex type and has lock has to be acquired before any access. Different runs of below program will produce the same result.

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut threads = vec![];

    for i in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            println!("Executing from thread: {}", i);
            let mut num_guard = (*counter).lock().unwrap();
            if *num_guard < 5 {
                thread::sleep(Duration::from_millis(1));
                *num_guard += 1;
            }
        });
        threads.push(handle);
    }

    for thread in threads {
        thread.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Enter fullscreen mode Exit fullscreen mode

2. Deadlock

Although Rust mitigates some concurrency risks, it is not capable of detecting deadlock at compile time. The example below shows how two threads can end up in a deadlock on two mutexes.

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
    let v1 = Arc::new(Mutex::new(0));
    let v2 = Arc::new(Mutex::new(0));

    let v11 = Arc::clone(&v1);
    let v21 = Arc::clone(&v2);
    let t1 = thread::spawn(move ||
        {
            println!("t1 attempting to lock v1");
            let v1_guard = (*v11).lock().unwrap();
            println!("t1 acquired v1");

            println!("t1 waiting ...");
            thread::sleep(Duration::from_secs(5));

            println!("t1 attempting to lock v2");
            let v2_guard = (*v21).lock().unwrap();

            println!("t1 acquired both locks");
        }
    );

    let v12 = Arc::clone(&v1);
    let v22 = Arc::clone(&v2);
    let t2 = thread::spawn(move ||
        {
            println!("t2 attempting to lock v2");
            let v1_guard = (*v22).lock().unwrap();
            println!("t2 acquired v2");

            println!("t2 waiting ...");
            thread::sleep(Duration::from_secs(5));

            println!("t2 attempting to lock v1");
            let v2_guard = (*v12).lock().unwrap();

            println!("t2 acquired both locks");
        }
    );

    t1.join().unwrap();
    t2.join().unwrap();
}

Enter fullscreen mode Exit fullscreen mode

From the output, the program hangs after each thread has acquired its first lock.
Deadlock

In a similar fashion, deadlock can also happen with message passing as in below example.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();

    thread::spawn(move || {
        println!("Waiting for item 1");
        rx1.recv().unwrap();

        println!("Sending item 2");
        tx2.send(()).unwrap();
    });

    println!("Waiting for item 2");
    rx2.recv().unwrap();

    println!("Sending item 1");
    tx1.send(()).unwrap();
}
Enter fullscreen mode Exit fullscreen mode

Since each channel waits for an item before sending, both channels are left waiting forever.

Message Passing Deadlock

3. Reference cycles

From smart pointers post, we saw how Rc pointers can cause reference cycles. Arc pointers are also not immune to them but they can be similarly mitigated with Atomic Weak pointers.

From above observation, its safe to say that Rust concurrency is not 100% flawless. It prevents race conditions at compile time but can't guarantee the same for deadlocks or reference cycles. In retrospect, these problems are not unique to Rust, and compared to most languages Rust performs much better. Concurrency in Rust is not necessarily fearless but it is less fearful.

Top comments (2)

Collapse
 
sxci profile image
sxci
fn main() {
    let generic_greeting = String::from("Good day,");
    let print_greeting = |name| println!("{} {}!", generic_greeting, name);
    let person = String::from("Crab");
    print_greeting(&person);
    println!("Can I use generic greeting? {}", generic_greeting);
    println!("Can I use person {}", person);
}
Enter fullscreen mode Exit fullscreen mode

this is ok

Collapse
 
richardpringle profile image
Richard Pringle

Thanks! Well written article. Just remember that Rust doesn't actually prevent race conditions, but only data-races (which is one of many types of race conditions). Data-races are particularly bad because they could lead to undefined behaviour whereas many other types of race conditions could simply lead to unexpected results.