DEV Community

Ariston
Ariston

Posted on

手搓c++线程池

菜鸟线程池系列

线程池就是一个如果有任务了就不断用当前剩余的线程去处理任务的一个工具,常用于高并发高并行,负责分配任务并管理他们的生命周期。
但是下面的线程池只适用于一般的并发和并行队列,菜鸟级别的线程池。

#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>

class ThreadPool {
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;

    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop;

public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i)
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queueMutex);
                        this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            });
    }

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex);

            if (stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks.emplace([task](){ (*task)(); });
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &worker: workers)
            worker.join();
    }
};
Enter fullscreen mode Exit fullscreen mode

设计思路:

1.首先先把五件套安排上:首先自然是有着一堆thread的线程池workers,然后是任务队列(至于为啥现在是空参空返回值的函数对象看后面),然后自然是这个互斥锁和条件变量,
2.首先是线程池的构造函数,对于该线程池,我们可以有参的构造出来(参数是线程数),首先进行一个for循环,内容就是原地构造thread并插入workers中。一直执行一个循环,直到内部的任务队列为空了,并且这个线程池stop标志位为true了再返回。然后我们进入到workers.emplace_back这里,原地构造线程们,首先,我们创建一个task,等会用于接收真正的tasks的移动构造,然后用于真正的执行,这里面有一个编程技巧,通过作用域来操控锁的释放,这样能让task在被从tasks.front移动构造出来之后,立马就有其他的线程能够拿到锁并且开始从队列取出,缩短锁的持有时间,提高并发性能;现在让我们进入真正的线程池的构造函数中,首先拿锁,然后等一手条件变量(stop为true或是任务队列不为空)(条件变量必须在线程有锁的情况下才能等待),如果stop为true并且任务队列为空了,那么就直接返回线程的创建,就不再创建新的线程了,然后我们从任务队列中转移拷贝出一个任务,并且把任务队列弹出,然后在作用域边缘把锁释放,然后来一个执行任务。
3.这里是任务队列的添加的一个函数模板,要说是真几把难呢
首先定义一手模板,先来个F还有若干Args;然后是使用了一首lambda表达式,并且来了一招尾随返回类型,因为我们也不知道返回的到底是什么类型,那么为什么这这个尾随返回类型 is so long?let me see it。首先是result_of部分,其中表示传进来的函数和参数的一个返回类型,但是它只是一个模板结构体,所以需要再把type引出来,然后我们还要用typename表示出后面的type就是一个类型,咱甭管什么类型,是函数还是类;然后,因为这个类型不能马上推导出具体的类型,因为这里只是在尾随返回类型中声明了么,所以我们要使用一个future,把这个给包起来。
然后我们正式进入这个函数内部,首先先把我们的任务给绑定并且封装起来,绑定的话就是用bind,bind返回的是一个函数对象,然后packaged_task的构造参也是函数对象,里面的模板参数是这个函数对象的返回类型,然后这个packaged_task实例可能会在多个地方被引用尤其是在多线程环境中(我也不知道是为啥),所以说我们需要用shared_ptr把它包起来,至此我们成功的封装了一个异步函数对象。

Top comments (0)