Enhanced Future

future介绍

在多线程环境下,我们经常需要同时启动多个任务。有些任务是比较耗时,而且我们并不急于获得其结果。对于这些任务,我们可以使用std::futurestd::async来封装其异步执行流程。通过std::async来注册异步任务,然后返回一个该异步结果的句柄std::future

std::future < double > result_future = std::async(std::power,2, 1000);

当我们需要得到这个异步过程的结果时,我们可以显示的请求。

double result=result_future.get();

如果该异步过程已经执行完,则可以直接获得结果;如果还未执行,则当前线程被阻塞,直到执行完成。

在后文中,我们将分为两个部分:

  • 异步过程的执行,即处理std::async封装后的任务调度;

  • 异步过程的生成,即处理future=async()中异步过程的注册和结果句柄的生成;

  • 异步过程的串联,即处理future=future.then(async())中异步过程的序列化。

异步过程的执行

基本执行结构

首先我们考虑无返回值、无参数的异步过程的执行。在这种情况下,最直接的处理方案就是利用一个队列来存储提交的异步任务,同时建立一个线程池来消费这个任务队列。为此,我们需要实现两个部分:多线程的任务队列,以及任务的提交和请求。

对于多线程的任务队列,可以参考下面代码

class notification_queue
{
    queue<function<void()>> _q;
    bool _done{ false };
    mutex _mutex;
    condition_variable _ready;
public:
    void done()
    {
        {
            lock_guard<mutex> lock{ _mutex };
            _done = true;
        }
        _ready.notify_all();
    }
    bool try_pop(function<void()>& x)
    {
        unique_lock<mutex> lock{ _mutex };
        _ready.wait(lock, [&]()
        {
            return !_q.empty()||_done;
        });
        if (_q.empty()||done)
        {
            return false;
        }
        x = move(_q.front());
        _q.pop();
        return true;
    }

    template<typename F>
    bool try_push(F&& f)
    {
        {
            lock_guard<mutex> lock{ _mutex,try_to_lock };
            if (!lock||done)
            {
                return false;
            }
            _q.emplace_back(forward<F>(f));
        }
        _ready.notify_one();
        return true;
    }

};

上述代码与一般的多线程队列的不同之处在于增加了done这个bool变量,用来停止任务的提交和请求。其实更好的多线程任务队列实现应该以std::shared_ptr作为返回值,以防止内存分配时异常所导致的数据不一致。当前文档只是为了做概念性说明作用,因此以最简实现作为展示。

在此多线程队列的支持下,初步的线程池系统可以有如下实现:

class task_system
{
    const unsigned _count{ thread::hardware_concurrency() };
    vector<thread> _threads;
    notification_queue _q;
    bool _done{false};
    void run()
    {
        while (!done)
        {
            function<void()> f;
            if (!_q.try_pop(f))
            {
                this_thread.yield();
            }
            else
            {
                f();
            }
        }
    }
public:
    task_system()
    {
        for (unsigned n = 0; n != _count; ++n)
        {
            _threads.emplace_back([&] { run(); });
        }
    }
    ~task_system()
    {
        _done=true;
        _q.done();
        for (auto& e : _threads)
        {
            e.join();
        }
    }
    template <typename F>
    bool async_(F&& f)
    {
        return _q.try_push(forward<F>(f));
    }
    void done()
    {
        _done = true;
    }
};

上述代码虽然实现了一个简单的线程池,但是在效率上是有问题的。主要的原因就是所有的工作线程都在争夺任务队列的控制权,产生了contention。为了缓解contention现象,可以从以下两个方面来入手:

  • 显式的以待头节点的链表来实现队列,从而使得任务的提交和请求所需要的锁分开;

  • 为每一个线程分配一个专有的任务队列,同时允许线程向其他任务队列请求任务,即 work_steal

良构锁队列

所谓良构锁队列,就是尽可能的减少不必要的锁占用时间。因此,我们采取链表来作为良构锁队列的底层数据结构,其数据结构如下:

template<typename T>
class notification_queue
{
private:
    struct node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };
    std::mutex head_mutex;
    std::unique_ptr<node> head;
    std::mutex tail_mutex;
    node* tail;
}

这里我们存储了链表的头节点和尾节点,同时为头节点和尾节点添加相应的锁。同时我们将链表初始化为有一个头节点的链表:

notification_queue() :head(new node),tail(head.get())
{

}

此外,在此链表中加入一个头节点,以减少pop操作对于尾节点锁的争用。对于push操作来说,实现是很直白的,因为这个过程只涉及到了尾节点的锁。唯一需要注意的一点是在获得锁之前就通过make_shared分配好内存空间,以防止在锁的作用域内抛出异常,从而导致数据不一致。

void push(T new_value)
{
    std::shared_ptr<T> new_data(
        std::make_shared<T>(std::move(new_value)));
    std::unique_ptr<node> p(new node);
    node* const new_tail = p.get();
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    tail->data = new_data;
    tail->next = std::move(p);
    tail = new_tail;
}

此时的pop操作则需要小心:

node* get_tail()
{
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
    return tail;
};
bool try_pop(T& result)
{
    std::lock_guard<std::mutex> head_lock(head_mutex);
    if (head.get() == get_tail())
    {
        return false;
    }
    std::unique_ptr<node> old_head = std::move(head);
    head = std::move(old_head->next);
    result = *(old_head->data);
    return true;
}

try_pop的过程中,我们首先判断当前链表是否为空,这里的get_tail必须放在head_lock之后。否则的话:我们获得head时,可能其他线程已经多次进行了try_poppush。从而导致head获得时,队列可能已经为空,而之前获得的尾节点所指向的并非当前尾节点。在这种情况下headtail的比较失败,返回head,从而导致错误。

work_steal线程池

work_steal的实现比较直白。首先我们修改其数据结构为:

class task_system
{
    const unsigned _count{ thread::hardware_concurrency() };
    vector<thread> _threads;
    vector<notification_queue> _q{ _count };
    atomic<unsigned> _index{ 0 };
    bool _done{ false };
}

对于每一个逻辑执行单元,我们都分配一个专有任务队列。提交新任务时,我们直接采取轮转法来平衡负载,用index自增来实现:

template <typename F>
void async_(F&& f)
{
    auto i = _index++;
    for (unsigned n = 0; n != _count; ++n)
    {
        if (_q[(i + n) % _count].try_push(forward<F>(f)))
        {
            return;
        }
    }
    _q[i % _count].push(forward<F>(f));
}

而对于run函数,我们添加一个参数来指明该线程所对应的专有任务队列:

void run(unsigned i)
{
    while (true)
    {
        function<void()> f;
        for (unsigned n = 0; n != _count; ++n)
        {
            if (_q[(i + n) % _count].try_pop(f))
            {
                break;
            }
        }
        if (!f && !_q[i].try_pop(f))
        {
            break;
        }
        f();
    }
}

同时,线程池的构造函数和析构函数也要做相应的修改,以实现RAII:

task_system()
{
    for (unsigned n = 0; n != _count; ++n)
    {
        _threads.emplace_back([&, n] { run(n); });
    }
}
~task_system()
{
    for (auto& e : _q)
    {
        e.done();
    }
    for (auto& e : _threads)
    {
        e.join();
    }
}

至此,一个带work_steal的线程池就完成了

异步过程的生成

返回值保存

现有的代码处理的只是function<void()>类型的任务。对于普通函数来说,函数签名则是function<R(Args...)>的。但是异步提交任务时其参数列表就已经确定,需要处理的只是返回值,即function<R()>。为此,我们可以定义一个shared_base,来存储返回值相关信息。

template <typename R>
struct shared_base
{
    optional<R> _r; 
    mutex _mutex;
    condition_variable _ready;
};

这里的optional<R> _r用来存储异步过程的执行结果。

下面就是这个类所附带的几个函数:虚析构函数、setget

virtual ~shared_base() {}
void set(R&& r)
{
    {
        lock_guard<mutex> lock{ _mutex };
        _r=forward<R>(r);
    }
    _ready.notify_all();
}
const R& get()
{
    lock_guard<mutex> lock{ _mutex };
    _ready.wait(lock, [&]()
    {
        return static_cast<bool>(_r);
    });
    return _r.value();
}

这样,我们就可以在异步调用中调用set函数来填充返回值,同时在future中调用get来等待返回值。为了在等待线程和工作线程中同时操作这个shared_base对象,我们需要以 shared_ptr的形式来保存这个对象的指针。自此,future所需要的元素都已具备,定义future来保存异步过程的结果句柄,其简单实现如下所示:

template <typename R>
class future
{
    shared_ptr<R> _p;
    explicit future(shared_ptr<R> p) : _p(move(p))
    {

    }
    future() = default;
    template <typename F>
    const R& get() const 
    { 
        return _p->get(); 
    }
};

这里的R就是shared_base<T>类型。

异步过程封装

由于线程池中只支持function<void()>类型的调用,为此我们需要在shared_base的基础上进一步做封装。

首先,我们将返回值封装起来:

template <typename R, typename... Args>
struct shared<R(Args...)> : shared_base<R>
{
    function<R(Args...)> _f;
    template<typename F>
    shared(F&& f) : _f(forward<F>(f)) {}
    template <typename... A>
    void operator()(A&&... args)
    {
        this->set(_f(forward<A>(args)...));
        _f = nullptr;
    }
};

当前的shared结构既保存了返回值相关信息,还保留了函数的相关信息,即当前类型是一个可调用对象。在这个对象之上,我们再定义一个packaged_task

template<typename R, typename ...Args >
class packaged_task<R(Args...)>
{
    weak_ptr<shared<R(Args...)>> _p;
    explicit packaged_task(weak_ptr<shared<R(Args...)>> p) : _p(move(p)) {}
public:
    packaged_task() = default;
    template <typename... A>
    void operator()(A&&... args) const
    {
        auto p = _p.lock();
        if (p)
        {
            (*p)(forward<A>(args)...);
        }
    }
};

这个packaged_task中存储了一个weak_ptr<shared<R(Args...)>>,用来保留所有的调用信息和返回值信息。至于这里为什么用weak_ptr,可能是为了处理某些特殊情况:例如异步结果直接被抛弃,此时则没有必要去执行该异步过程。即异步过程的所有者是其返回值的所有者,而不是过程本身。

至此,我们把异步结果的存储和异步过程的存储都解决了,剩下的问题就是:我们如何根据一个可调用对象生成futurepackaged_task。为此,我们定义一个新的中间函数package来执行次任务:

template <typename S, typename F>
auto package(F&& f) -> pair<packaged_task<S>, future<result_of_t_<S>>>
{
    auto p = make_shared<shared<S>>(forward<F>(f));
    return make_pair(packaged_task<S>(p), future<result_of_t_<S>>(p));
}
template <typename F, typename ...Args>
auto async(F&& f, Args&&... args)
{
    using result_type = result_of_t<F(Args...)>;
    using packaged_type = packaged_task<result_type()>;
    auto pack = package<result_type()>(bind(forward<F>(f), forward<Args>(args)...));
    _system.async_(move(get<0>(pack)));
    return get<1>(pack);
}

这个package函数做了两件事:

  • 根据所提供的函数参数f来生成一个保存了返回值和参数fshared<S>,用shared_ptr p来控制其所有权。这里利用了result_of_t这个type_traits,其作用是根据函数签名返回该函数的返回值类型。

  • 根据上一步生成的shared_ptr p构造packaged_taskfuture对象。这里利用了两个自动类型转换:一个是由shared_ptr<shared<result_type()>>构造出一个weak_ptr <shared<result_type()>>,另外一个就是根据shared_ptr<shared<result_type()>> 构造出一个shared_ptr<shared_base<result_type>>。在自动类型转换之后,再调用这两个对象相应的构造函数。

同时,在async中,主要包括四个过程。

  • 通过bind将函数f与其相应的参数args绑定起来,生成了一个function<result_type()类型的可调用对象。

  • 将上一步构造出的临时可调用对象作为package的参数,生成了包含package_task<result_type()>future<result_type>pair pack

  • 这个pair中所包含的package_task在当作一个可调用对象时,其函数签名为void(),因此可以直接向_system提交异步任务,这里的_system的类型就是前一节中所提到的work_steal 线程池任务系统。

  • 最后,将pair中的future<result_type>返回,作为结果句柄。

自此,从可调用对象构造出了对应的异步执行对象packaged_taskfuture。这个packaged_task被提交到了系统的任务处理系统_system中,而future则返回到用户线程。

异步过程的串联

单后继串联

在处理多个异步任务的系统设计中,异步任务的串联(即任务之间的依赖)是很重要的一个特性。设想一下场景中,我们需要依次调用两个函数:

std::future < double > result_future_1 = std::async(std::power,2, 100);
double result_1=result_future_1.get();
std::future < double > result_future_2 = std::async(std::power,result_1, 10);
double result_2=result_future_2.get();

在上述实例中,result_2依赖于result_1。因此执行时我们必须显式的调用future.get()来获得result_1,然后才能提交第二个异步任务。这种显示的同步在依赖路径变长时就会显得非常繁杂,而且引入了多次同步的需求。理想情况下,我们想要的只是最后的执行结果,只需要调用一次future.get()就可以。简化的代码可以变成这样:

std::future < double > result_future_1 = std::async(std::power,2, 100);
std::future < double > result_future_2 = result_future_1.then(std::bind(std::power,_1, 10));
double result_2=result_future_2.get();

同时也可以变成这样

std::future < double > result_future_1 = std::async(std::power,2, 100).then(std::bind(std::power,_1, 10));
double result_1=result_future_1.get();

但是C++11中的std::future并不支持串联,所以我们就不得不去造轮子。现在我们就来实现future.then()

为了支持then操作,我们必须在future中保存后续的执行路径。我们来研究future中对then的处理:

template <typename F>
auto then(F&& f)
{
    auto pack = package<result_of_t<F(R)>()>([p = _p, f = forward<F>(f)]()
    {
        return f(p->_r.back());
    });
    _p->then(move(pack.first));
    return pack.second;
}

这里的then操作每次生成一个新的pair<packaged_task,future>。其中的packaged_task部分捕获了调用者futureshared_ptr<shared_base>成员_p,同时这个packaged_task注册到了_p._then队列中。pair中的future则保留了新的返回值信息。现在剩下的任务就是实现shared_base.then了。

为此我们首先修改share_base,增加一个保存后续任务的optional,同样以vector的形式来实现。此时shared_base<R>的数据成员如下:

optional<R> _r; // optional
mutex _mutex;
condition_variable _ready;
optional<function<void()>> _then;

这个_then成员就是用来保存后续的执行路径的。其实也不算路径,因为只保留路径中的直接子节点,可以当作路径中的next指针。

then操作就比较直接了:

template <typename F>
void then(F&& f)
{
    bool resolved{ false };
    {
        lock_guard<mutex> lock{ _mutex };
        if (!_r)
        {
            _then=forward<F>(f);
        }
        else
        {
            resolved = true;
        }
    }
    if (resolved)
    {
        _system.async_(forward<F>(f));
    }
}

在执行then操作时,首先判断是否之前的异步任务已经执行完全。这个判断是通过_r来执行的,因为optional有一个到bool的自动类型转换。

如果之前的异步任务已执行,则提交当前任务到任务调度系统;否则将当前任务挂载在next下,即_then

同时,set函数也要做一些修改:

void set(R&& r)
{
    optional<function<void()>> then;
    {
        lock_guard<mutex> lock{ _mutex };
        _r=forward<R>(r);
        swap(_then, then);
    }
    _ready.notify_all();
    if(then)
    {
        _system.async_(move(then.value()));
    }
}

这段代码也是比较直接:在当前shared_base获得了初始值之后,再去执行下一个异步过程。至于这里为什么用swap操作,目前还不是很清楚,需要测试一下才能知道这里的swap到底发生了什么。

多后继串联

之前的代码处理的只是包含了一个后续任务依赖的情况,但是当我们谈到依赖的时候,我们所提到的都是依赖树,而不是依赖序列。这种依赖树的情况,一个packaged_task可能有多个任务都在等待其完成。

std::future < double > result_future_1 = std::async(std::power,2, 100);
double result_1=result_future_1.get();
std::future < double > result_future_2 = std::async(std::power,result_1, 10);
std::future < double > result_future_3 = std::async(std::power,result_1, 10);
double result_2=result_future_2.get();
double result_3=result_future_3.get();

为了支持这种多后继的操作,我们需要对shared_base做修改。

首先,_then成员不再是一个optional<function<void()>>,而是一个vector<function<void>>,因为我们所存储的后续任务不再仅限于一个。

optional<R> _r; // optional
mutex _mutex;
condition_variable _ready;
vector<function<void()>> _then;

同时,我们也要修改对应的thenset的代码实现,从optional转换到vector上来。

template <typename F>
void then(F&& f)
{
    bool resolved{ false };
    {
        lock_guard<mutex> lock{ _mutex };
        if (!_r)
        {
            _then.push_back(forward<F>(f));
        }
        else
        {
            resolved = true;
        }
    }
    if (resolved)
    {
        _system.async_(forward<F>(f));
    }
}


:::c++
void set(R&& r)
{
    vector<function<void()>> then;
    {
        lock_guard<mutex> lock{ _mutex };
        _r=forward<R>(r);
        swap(_then, then);
    }
    _ready.notify_all();
    for(const auto& f:then)
    {
        _system.async_(move(f));
    }
}

在经过这种修改之后,之前的示例代码可以这样写:

std::future < double > result_future_1 = std::async(std::power,2, 100);
std::future < double > result_future_2 = result_future_1.then(bind(std::power,_1, 10));
std::future < double > result_future_3 = result_future_1.then(bind(std::power,_1, 10));
double result_2=result_future_2.get();
double result_3=result_future_3.get();

多前驱串联

wait_all 串联

对于wait_for_all形式,一个任务的启动需要多个任务都已完成。我们需要等待所有过程的执行完全,所以需要显示的等待。为了支持这种显示的同步操作,我们继续修改之前的shared_basefuture,都加入显示的等待wait

template <typename R>
void shared_base<R>::wait()
{
    lock_guard<mutex> lock{ _mutex };
    _ready.wait(lock, [&]()
    {
        return !_r.empty();
    });
}
template <typename R>
void future<R>::wait() const
{
    _p->wait();
}

这样我们可以以一下形式来实现wait_for_all

template<typename F1, typename... Fs>
void wait_for_all(F1& f1, Fs&... fs)
{
    bool dummy[] = { (f1.wait(), true), (fs.wait(), true)... };

    // prevent unused parameter warning
    (void) dummy;
}

虽然上述代码实现了wait_for_all这个接口,但是这种实现并不利于future的进一步组合。更好的实现应该是综合所依赖的future生成一个新的future,类似于如下形式:

template < typename R,typename... Arg>
auto all_package(function<R(Arg...)>&& _f, future<Arg>&& all_futures...)
->pair<packaged_task<R()>, future<R>>
{
    auto lambda_task = [all=move(all_futures)...,f=move(f)]()
    {
        return f(all.get()...);
    }
    auto p = make_shared<shared<R()>>(move(lambda_task));
    return make_pair(packaged_task<R()>(p), future<R>(p));
}

同时添加async_all

template <typename R, typename ...Args>
auto async_all(function<R(Args...)>&& f, future<Args>&&... args)
{
    auto pack = all_package<R,Args...>(forward<function<R(Args...)>>(f), forward<future<Args>>(args)...);
    _system.async_(move(get<0>(pack)));
    return get<1>(pack);
}

wait_any 串联

wait_any相对来说就比较麻烦了,需要实现一个通知机制。只要所等待的future其中一个有了信号,其他的future就不需要再执行了。我们可以采取atomic_bool的形式来实现这个once_flag,剩下的任务就是如何将这个atomic_bool挂载到各个future之上,这个挂载操作我们可以利用then。但是用then的话就无法控制其他future的执行,因为当我们执行then操作的时候之前的结果肯定已经计算完成。所以,我只能说毫无办法。

如果实在想这么做的话,需要在这些future生成之前就定义好这个atomic_bool。同时修改这些任务的执行逻辑,执行前首先判断atomic_bool是否已经set了,没有的话才执行后续任务。

Published:
2015-11-05 21:31
Category:
Tag:
CPP15