future介绍
在多线程环境下,我们经常需要同时启动多个任务。有些任务是比较耗时,而且我们并不急于获得其结果。对于这些任务,我们可以使用std::future
和std::async
来封装其异步执行流程。通过std::async
来注册异步任务,然后返回一个该异步结果的句柄std::future
。
std::future < double > result_future = std::async(std::power,2, 1000);
当我们需要得到这个异步过程的结果时,我们可以显示的请求。
double result=result_future.get();
如果该异步过程已经执行完,则可以直接获得结果;如果还未执行,则当前线程被阻塞,直到执行完成。
在后文中,我们将分为两个部分:
-
异步过程的执行,即处理
std::async
封装后的任务调度; -
异步过程的生成,即处理
future=async()
中异步过程的注册和结果句柄的生成; -
异步过程的串联,即处理
future=future.then(async())
中异步过程的序列化。
异步过程的执行
基本执行结构
首先我们考虑无返回值、无参数的异步过程的执行。在这种情况下,最直接的处理方案就是利用一个队列来存储提交的异步任务,同时建立一个线程池来消费这个任务队列。为此,我们需要实现两个部分:多线程的任务队列,以及任务的提交和请求。
对于多线程的任务队列,可以参考下面代码
class notification_queue { queue<function<void()>> _q; bool _done{ false }; mutex _mutex; condition_variable _ready; public: void done() { { lock_guard<mutex> lock{ _mutex }; _done = true; } _ready.notify_all(); } bool try_pop(function<void()>& x) { unique_lock<mutex> lock{ _mutex }; _ready.wait(lock, [&]() { return !_q.empty()||_done; }); if (_q.empty()||done) { return false; } x = move(_q.front()); _q.pop(); return true; } template<typename F> bool try_push(F&& f) { { lock_guard<mutex> lock{ _mutex,try_to_lock }; if (!lock||done) { return false; } _q.emplace_back(forward<F>(f)); } _ready.notify_one(); return true; } };
上述代码与一般的多线程队列的不同之处在于增加了done
这个bool
变量,用来停止任务的提交和请求。其实更好的多线程任务队列实现应该以std::shared_ptr
作为返回值,以防止内存分配时异常所导致的数据不一致。当前文档只是为了做概念性说明作用,因此以最简实现作为展示。
在此多线程队列的支持下,初步的线程池系统可以有如下实现:
class task_system { const unsigned _count{ thread::hardware_concurrency() }; vector<thread> _threads; notification_queue _q; bool _done{false}; void run() { while (!done) { function<void()> f; if (!_q.try_pop(f)) { this_thread.yield(); } else { f(); } } } public: task_system() { for (unsigned n = 0; n != _count; ++n) { _threads.emplace_back([&] { run(); }); } } ~task_system() { _done=true; _q.done(); for (auto& e : _threads) { e.join(); } } template <typename F> bool async_(F&& f) { return _q.try_push(forward<F>(f)); } void done() { _done = true; } };
上述代码虽然实现了一个简单的线程池,但是在效率上是有问题的。主要的原因就是所有的工作线程都在争夺任务队列的控制权,产生了contention
。为了缓解contention
现象,可以从以下两个方面来入手:
-
显式的以待头节点的链表来实现队列,从而使得任务的提交和请求所需要的锁分开;
-
为每一个线程分配一个专有的任务队列,同时允许线程向其他任务队列请求任务,即
work_steal
。
良构锁队列
所谓良构锁队列,就是尽可能的减少不必要的锁占用时间。因此,我们采取链表来作为良构锁队列的底层数据结构,其数据结构如下:
template<typename T> class notification_queue { private: struct node { std::shared_ptr<T> data; std::unique_ptr<node> next; }; std::mutex head_mutex; std::unique_ptr<node> head; std::mutex tail_mutex; node* tail; }
这里我们存储了链表的头节点和尾节点,同时为头节点和尾节点添加相应的锁。同时我们将链表初始化为有一个头节点的链表:
notification_queue() :head(new node),tail(head.get()) { }
此外,在此链表中加入一个头节点,以减少pop
操作对于尾节点锁的争用。对于push
操作来说,实现是很直白的,因为这个过程只涉及到了尾节点的锁。唯一需要注意的一点是在获得锁之前就通过make_shared
分配好内存空间,以防止在锁的作用域内抛出异常,从而导致数据不一致。
void push(T new_value) { std::shared_ptr<T> new_data( std::make_shared<T>(std::move(new_value))); std::unique_ptr<node> p(new node); node* const new_tail = p.get(); std::lock_guard<std::mutex> tail_lock(tail_mutex); tail->data = new_data; tail->next = std::move(p); tail = new_tail; }
此时的pop
操作则需要小心:
node* get_tail() { std::lock_guard<std::mutex> tail_lock(tail_mutex); return tail; }; bool try_pop(T& result) { std::lock_guard<std::mutex> head_lock(head_mutex); if (head.get() == get_tail()) { return false; } std::unique_ptr<node> old_head = std::move(head); head = std::move(old_head->next); result = *(old_head->data); return true; }
在try_pop
的过程中,我们首先判断当前链表是否为空,这里的get_tail
必须放在head_lock
之后。否则的话:我们获得head
时,可能其他线程已经多次进行了try_pop
和push
。从而导致head
获得时,队列可能已经为空,而之前获得的尾节点所指向的并非当前尾节点。在这种情况下head
与 tail
的比较失败,返回head
,从而导致错误。
work_steal线程池
work_steal
的实现比较直白。首先我们修改其数据结构为:
class task_system { const unsigned _count{ thread::hardware_concurrency() }; vector<thread> _threads; vector<notification_queue> _q{ _count }; atomic<unsigned> _index{ 0 }; bool _done{ false }; }
对于每一个逻辑执行单元,我们都分配一个专有任务队列。提交新任务时,我们直接采取轮转法来平衡负载,用index
自增来实现:
template <typename F> void async_(F&& f) { auto i = _index++; for (unsigned n = 0; n != _count; ++n) { if (_q[(i + n) % _count].try_push(forward<F>(f))) { return; } } _q[i % _count].push(forward<F>(f)); }
而对于run
函数,我们添加一个参数来指明该线程所对应的专有任务队列:
void run(unsigned i) { while (true) { function<void()> f; for (unsigned n = 0; n != _count; ++n) { if (_q[(i + n) % _count].try_pop(f)) { break; } } if (!f && !_q[i].try_pop(f)) { break; } f(); } }
同时,线程池的构造函数和析构函数也要做相应的修改,以实现RAII:
task_system() { for (unsigned n = 0; n != _count; ++n) { _threads.emplace_back([&, n] { run(n); }); } } ~task_system() { for (auto& e : _q) { e.done(); } for (auto& e : _threads) { e.join(); } }
至此,一个带work_steal
的线程池就完成了
异步过程的生成
返回值保存
现有的代码处理的只是function<void()>
类型的任务。对于普通函数来说,函数签名则是function<R(Args...)>
的。但是异步提交任务时其参数列表就已经确定,需要处理的只是返回值,即function<R()>
。为此,我们可以定义一个shared_base
,来存储返回值相关信息。
template <typename R> struct shared_base { optional<R> _r; mutex _mutex; condition_variable _ready; };
这里的optional<R> _r
用来存储异步过程的执行结果。
下面就是这个类所附带的几个函数:虚析构函数、set
、get
。
virtual ~shared_base() {} void set(R&& r) { { lock_guard<mutex> lock{ _mutex }; _r=forward<R>(r); } _ready.notify_all(); } const R& get() { lock_guard<mutex> lock{ _mutex }; _ready.wait(lock, [&]() { return static_cast<bool>(_r); }); return _r.value(); }
这样,我们就可以在异步调用中调用set
函数来填充返回值,同时在future
中调用get
来等待返回值。为了在等待线程和工作线程中同时操作这个shared_base
对象,我们需要以 shared_ptr
的形式来保存这个对象的指针。自此,future
所需要的元素都已具备,定义future
来保存异步过程的结果句柄,其简单实现如下所示:
template <typename R> class future { shared_ptr<R> _p; explicit future(shared_ptr<R> p) : _p(move(p)) { } future() = default; template <typename F> const R& get() const { return _p->get(); } };
这里的R
就是shared_base<T>
类型。
异步过程封装
由于线程池中只支持function<void()>
类型的调用,为此我们需要在shared_base
的基础上进一步做封装。
首先,我们将返回值封装起来:
template <typename R, typename... Args> struct shared<R(Args...)> : shared_base<R> { function<R(Args...)> _f; template<typename F> shared(F&& f) : _f(forward<F>(f)) {} template <typename... A> void operator()(A&&... args) { this->set(_f(forward<A>(args)...)); _f = nullptr; } };
当前的shared
结构既保存了返回值相关信息,还保留了函数的相关信息,即当前类型是一个可调用对象。在这个对象之上,我们再定义一个packaged_task
。
template<typename R, typename ...Args > class packaged_task<R(Args...)> { weak_ptr<shared<R(Args...)>> _p; explicit packaged_task(weak_ptr<shared<R(Args...)>> p) : _p(move(p)) {} public: packaged_task() = default; template <typename... A> void operator()(A&&... args) const { auto p = _p.lock(); if (p) { (*p)(forward<A>(args)...); } } };
这个packaged_task
中存储了一个weak_ptr<shared<R(Args...)>>
,用来保留所有的调用信息和返回值信息。至于这里为什么用weak_ptr
,可能是为了处理某些特殊情况:例如异步结果直接被抛弃,此时则没有必要去执行该异步过程。即异步过程的所有者是其返回值的所有者,而不是过程本身。
至此,我们把异步结果的存储和异步过程的存储都解决了,剩下的问题就是:我们如何根据一个可调用对象生成future
和packaged_task
。为此,我们定义一个新的中间函数package
来执行次任务:
template <typename S, typename F> auto package(F&& f) -> pair<packaged_task<S>, future<result_of_t_<S>>> { auto p = make_shared<shared<S>>(forward<F>(f)); return make_pair(packaged_task<S>(p), future<result_of_t_<S>>(p)); } template <typename F, typename ...Args> auto async(F&& f, Args&&... args) { using result_type = result_of_t<F(Args...)>; using packaged_type = packaged_task<result_type()>; auto pack = package<result_type()>(bind(forward<F>(f), forward<Args>(args)...)); _system.async_(move(get<0>(pack))); return get<1>(pack); }
这个package
函数做了两件事:
-
根据所提供的函数参数
f
来生成一个保存了返回值和参数f
的shared<S>
,用shared_ptr p
来控制其所有权。这里利用了result_of_t
这个type_traits
,其作用是根据函数签名返回该函数的返回值类型。 -
根据上一步生成的
shared_ptr p
构造packaged_task
和future
对象。这里利用了两个自动类型转换:一个是由shared_ptr<shared<result_type()>>
构造出一个weak_ptr <shared<result_type()>>
,另外一个就是根据shared_ptr<shared<result_type()>>
构造出一个shared_ptr<shared_base<result_type>>
。在自动类型转换之后,再调用这两个对象相应的构造函数。
同时,在async
中,主要包括四个过程。
-
通过
bind
将函数f
与其相应的参数args
绑定起来,生成了一个function<result_type()
类型的可调用对象。 -
将上一步构造出的临时可调用对象作为
package
的参数,生成了包含package_task<result_type()>
和future<result_type>
的pair pack
。 -
这个
pair
中所包含的package_task
在当作一个可调用对象时,其函数签名为void()
,因此可以直接向_system
提交异步任务,这里的_system
的类型就是前一节中所提到的work_steal
线程池任务系统。 -
最后,将
pair
中的future<result_type>
返回,作为结果句柄。
自此,从可调用对象构造出了对应的异步执行对象packaged_task
和future
。这个packaged_task
被提交到了系统的任务处理系统_system
中,而future
则返回到用户线程。
异步过程的串联
单后继串联
在处理多个异步任务的系统设计中,异步任务的串联(即任务之间的依赖)是很重要的一个特性。设想一下场景中,我们需要依次调用两个函数:
std::future < double > result_future_1 = std::async(std::power,2, 100); double result_1=result_future_1.get(); std::future < double > result_future_2 = std::async(std::power,result_1, 10); double result_2=result_future_2.get();
在上述实例中,result_2
依赖于result_1
。因此执行时我们必须显式的调用future.get()
来获得result_1
,然后才能提交第二个异步任务。这种显示的同步在依赖路径变长时就会显得非常繁杂,而且引入了多次同步的需求。理想情况下,我们想要的只是最后的执行结果,只需要调用一次future.get()
就可以。简化的代码可以变成这样:
std::future < double > result_future_1 = std::async(std::power,2, 100); std::future < double > result_future_2 = result_future_1.then(std::bind(std::power,_1, 10)); double result_2=result_future_2.get();
同时也可以变成这样
std::future < double > result_future_1 = std::async(std::power,2, 100).then(std::bind(std::power,_1, 10)); double result_1=result_future_1.get();
但是C++11
中的std::future
并不支持串联,所以我们就不得不去造轮子。现在我们就来实现future.then()
。
为了支持then
操作,我们必须在future
中保存后续的执行路径。我们来研究future
中对then
的处理:
template <typename F> auto then(F&& f) { auto pack = package<result_of_t<F(R)>()>([p = _p, f = forward<F>(f)]() { return f(p->_r.back()); }); _p->then(move(pack.first)); return pack.second; }
这里的then
操作每次生成一个新的pair<packaged_task,future>
。其中的packaged_task
部分捕获了调用者future
的shared_ptr<shared_base>
成员_p
,同时这个packaged_task
注册到了_p._then
队列中。pair
中的future
则保留了新的返回值信息。现在剩下的任务就是实现shared_base.then
了。
为此我们首先修改share_base
,增加一个保存后续任务的optional
,同样以vector
的形式来实现。此时shared_base<R>
的数据成员如下:
optional<R> _r; // optional mutex _mutex; condition_variable _ready; optional<function<void()>> _then;
这个_then
成员就是用来保存后续的执行路径的。其实也不算路径,因为只保留路径中的直接子节点,可以当作路径中的next
指针。
而then
操作就比较直接了:
template <typename F> void then(F&& f) { bool resolved{ false }; { lock_guard<mutex> lock{ _mutex }; if (!_r) { _then=forward<F>(f); } else { resolved = true; } } if (resolved) { _system.async_(forward<F>(f)); } }
在执行then
操作时,首先判断是否之前的异步任务已经执行完全。这个判断是通过_r
来执行的,因为optional
有一个到bool
的自动类型转换。
如果之前的异步任务已执行,则提交当前任务到任务调度系统;否则将当前任务挂载在next
下,即_then
。
同时,set
函数也要做一些修改:
void set(R&& r) { optional<function<void()>> then; { lock_guard<mutex> lock{ _mutex }; _r=forward<R>(r); swap(_then, then); } _ready.notify_all(); if(then) { _system.async_(move(then.value())); } }
这段代码也是比较直接:在当前shared_base
获得了初始值之后,再去执行下一个异步过程。至于这里为什么用swap
操作,目前还不是很清楚,需要测试一下才能知道这里的swap
到底发生了什么。
多后继串联
之前的代码处理的只是包含了一个后续任务依赖的情况,但是当我们谈到依赖的时候,我们所提到的都是依赖树,而不是依赖序列。这种依赖树的情况,一个packaged_task
可能有多个任务都在等待其完成。
std::future < double > result_future_1 = std::async(std::power,2, 100); double result_1=result_future_1.get(); std::future < double > result_future_2 = std::async(std::power,result_1, 10); std::future < double > result_future_3 = std::async(std::power,result_1, 10); double result_2=result_future_2.get(); double result_3=result_future_3.get();
为了支持这种多后继的操作,我们需要对shared_base
做修改。
首先,_then
成员不再是一个optional<function<void()>>
,而是一个vector<function<void>>
,因为我们所存储的后续任务不再仅限于一个。
optional<R> _r; // optional mutex _mutex; condition_variable _ready; vector<function<void()>> _then;
同时,我们也要修改对应的then
和set
的代码实现,从optional
转换到vector
上来。
template <typename F> void then(F&& f) { bool resolved{ false }; { lock_guard<mutex> lock{ _mutex }; if (!_r) { _then.push_back(forward<F>(f)); } else { resolved = true; } } if (resolved) { _system.async_(forward<F>(f)); } } :::c++ void set(R&& r) { vector<function<void()>> then; { lock_guard<mutex> lock{ _mutex }; _r=forward<R>(r); swap(_then, then); } _ready.notify_all(); for(const auto& f:then) { _system.async_(move(f)); } }
在经过这种修改之后,之前的示例代码可以这样写:
std::future < double > result_future_1 = std::async(std::power,2, 100); std::future < double > result_future_2 = result_future_1.then(bind(std::power,_1, 10)); std::future < double > result_future_3 = result_future_1.then(bind(std::power,_1, 10)); double result_2=result_future_2.get(); double result_3=result_future_3.get();
多前驱串联
wait_all 串联
对于wait_for_all
形式,一个任务的启动需要多个任务都已完成。我们需要等待所有过程的执行完全,所以需要显示的等待。为了支持这种显示的同步操作,我们继续修改之前的shared_base
和future
,都加入显示的等待wait
。
template <typename R> void shared_base<R>::wait() { lock_guard<mutex> lock{ _mutex }; _ready.wait(lock, [&]() { return !_r.empty(); }); } template <typename R> void future<R>::wait() const { _p->wait(); }
这样我们可以以一下形式来实现wait_for_all
:
template<typename F1, typename... Fs> void wait_for_all(F1& f1, Fs&... fs) { bool dummy[] = { (f1.wait(), true), (fs.wait(), true)... }; // prevent unused parameter warning (void) dummy; }
虽然上述代码实现了wait_for_all
这个接口,但是这种实现并不利于future
的进一步组合。更好的实现应该是综合所依赖的future
生成一个新的future
,类似于如下形式:
template < typename R,typename... Arg> auto all_package(function<R(Arg...)>&& _f, future<Arg>&& all_futures...) ->pair<packaged_task<R()>, future<R>> { auto lambda_task = [all=move(all_futures)...,f=move(f)]() { return f(all.get()...); } auto p = make_shared<shared<R()>>(move(lambda_task)); return make_pair(packaged_task<R()>(p), future<R>(p)); }
同时添加async_all
template <typename R, typename ...Args> auto async_all(function<R(Args...)>&& f, future<Args>&&... args) { auto pack = all_package<R,Args...>(forward<function<R(Args...)>>(f), forward<future<Args>>(args)...); _system.async_(move(get<0>(pack))); return get<1>(pack); }
wait_any 串联
wait_any
相对来说就比较麻烦了,需要实现一个通知机制。只要所等待的future
其中一个有了信号,其他的future
就不需要再执行了。我们可以采取atomic_bool
的形式来实现这个once_flag
,剩下的任务就是如何将这个atomic_bool
挂载到各个future
之上,这个挂载操作我们可以利用then
。但是用then
的话就无法控制其他future
的执行,因为当我们执行then
操作的时候之前的结果肯定已经计算完成。所以,我只能说毫无办法。
如果实在想这么做的话,需要在这些future
生成之前就定义好这个atomic_bool
。同时修改这些任务的执行逻辑,执行前首先判断atomic_bool
是否已经set
了,没有的话才执行后续任务。