DEV Community

Alexandre Plt
Alexandre Plt

Posted on • Edited on

C++ and parallelism

For a project, I needed to find a way to implement parallelism without concurrent writes. The goal was to add parallel functions like diffuse, apply, reduce and prefix to said project.

What are those functions?

Those functions are part of the root of parallel computing, allowing a parallel aware developper to write time and space efficient programs (i.e. programs which use the best ratio of time per CPU).

diffuse

This is to efficiently copy a value in a (usually really big) vector, so that each worker can work on said vector later, and have its own set of copies of the value, to avoid race conditions.

apply

This applies a given function on the values of a vector. Example:

std::vector<int> data {0, 1, 2, ... 123456};
std::vector<int> new_data = apply([](int i) { return i * i; }, data);
// expected output
{0, 1, 4, 9, 16, ... x²}
Enter fullscreen mode Exit fullscreen mode

reduce

It takes a source vector, and applies a given operation on each pair of elements to return a single value. Typical usage of this function is reduce([](int a, int b) { return a + b; }, vector_of_numbers); to sum all the elements in a given vector.

prefix

This one is a bit special: it performs a given operation on each elements of the vector, taking into account all of the previous elements.

std::vector<int> data = diffuse(1, 1000);  // generates a vector of 1000 ones
std::vector<int> new_data = prefix([](int a, int b) { return a + b; }, data);
// expected output
{1, 2, 3, 4, 5, ...}
Enter fullscreen mode Exit fullscreen mode

Each element is the result of itself + previous element, all the way back to the first element. It can even work with any operation!

std::future

C++ 11 introduced std::future, a class allowing us to manipulate asynchronous operations easily.

#include <iostream>
#include <future>
#include <functional>

int main()
{
    std::function<int(int)> task = [](int a){
        return a + 8;
    };
    std::future<int> f1 = std::async(std::launch::async, task, 12);
    std::cout << f1.get();

    return 0;
}
Enter fullscreen mode Exit fullscreen mode

That's it. It launched our lambda, with a=12, in a separate thread, and f1.get() waited until execution was done and collected the result!

Don't forget to compile with -pthread or whatever threading library your compiler uses.

std::async creates a future for us, launching in asynchronous mode, because we set the policy std::launch::async, otherwise the compiler could have decided to launch it in deferred mode. Deferred mode is just launching our function on the main thread the first time the value is requested (kind of lazy evaluation), which isn't what we need here.

Producers and harvester

In this kind of problems, we have multiple threads producing data, which is then collected by the main thread to reassemble it. Our goal is to have multiple threads working on smaller portions of the data, so that we can dispatch this work on our CPU cores to use the most of its processing power, instead of using only a single core and wasting energy on not using the others.

Our task will be this function, calculating the sum of the numbers in a given vector:

int task(const std::vector<int>& vec)
{
    int sum = 0;
    for (int i : vec)
        sum += i;
    return sum;
}
Enter fullscreen mode Exit fullscreen mode

Easy and straightfoward, but remember that we want our task to be able to work on any portion of our input data! Here we would need to create smaller vectors and pass them to the task, thus wasting time by copying data to another vector.

In C++20 we could use std::span to create a view object on our vector, but in C++ 17, we will have to be a bit more clever since this abstraction isn't available yet. Basically, a span is a pointer to the first element of the view, and a length. Could it work with iterators? Yes!

int task(VecIt begin, std::size_t count)
{
    int sum = 0;
    for (VecIt it = begin, end = begin + count; it != end; ++it)
        sum += *it;
    return sum;
}
Enter fullscreen mode Exit fullscreen mode

And we have our task, which can work on any portion of our data, even the whole thing by doing so: task(input_data.begin(), input_data.size()).

We will initialize our input vector with a serie of numbers (sequentially for the sake of simplicity), using std::iota, which takes 2 iterators on a given object (begin and end) and a starting number.

#include <numeric>
#include <vector>

int main()
{
    std::vector<int> input_data(135);
    std::iota(input_data.begin(), input_data.end(), 0);
    // input_data now looks like this {0, 1, 2, 3, 4, ... 134}

    return 0;
}
Enter fullscreen mode Exit fullscreen mode

The data being created, we will now create our workers (producers) using std::future. Our futures start immediately when created, thus we just have to call future.get() on them. Internally this calls wait() to avoid busy waiting, which will block our main thread execution until everyone is done. Now one may wonder if calling future.get() on each one of our future will just result in sequential execution: this won't. Each task is in its own thread, thus it can continue to work even though our main thread was interrupted by a call to wait().

Here is the global idea:

constexpr std::size_t WorkerCount = 4;
// keeping our futures to harvest them later
std::array<std::future<int>, WorkerCount> futures;

for (std::size_t i = 0; i < WorkerCount; ++i)
{
    // calculate the range of data on which we shall work
    // ...

    futures[i] = std::async(
        std::launch::async,
        [](){
            return result = task(arguments...);
        }
    );
}

int total = 0;
// wait and harvest results
for (std::size_t i = 0; i < WorkerCount; ++i)
    total += futures[i].get();
std::cout << "total: " << total << "\n";
Enter fullscreen mode Exit fullscreen mode

Conclusion

You now know how to use std::future, and I hope you learned a few things about parellel computing as well!

You can find the full code for this article here:

Top comments (0)