Concurrency in CPP

并行是一个非常fancy的词,在正确的实现并行的时候能够得到非常大的效率提高。但是,如何正确的实现并行一直陷阱重重。这之间的陷阱主要有如下三个:

  • 并行读写竞争处理

  • 并行操作的异常处理

  • 缓存敏感的数据布局及访问设计

其中第三个问题被誉为CS里面中最难的两个问题之首。常言道:There are only two hard things in Computer Science: cache invalidation and naming things. 后面这个问题能够使整个论坛吵起来乃至血流成河,而前面这个问题只会造成男默女泪的局面。所以,当前就围绕着前两个问题进行讨论。首先解决访问读写竞争问题,然后讨论异常处理问题。

读写竞争:双线程

在并行程序中,最基本的进行访问仲裁的方法是使用mutex。但是在双线程的情况下,采用peterson算法比使用mutex代价小很多。下文就来介绍一个peterson方法。

只有两个程序在访问同一个数据,所以数组大小为2。在下文中,flag[2]代表意向数组,分别为P1和P0,当为1时代表他们想去临界区或者已经在临界区中了,另外有一个turn来代表下一个该进入的程序号,turn可取的值为0或1。其用来仲裁访问争用的代码如下:

#define FALSE 0
#define TRUE 1
#define N 2//这里代表进程数量
int turn;//这里代表轮到谁了
int interested[N];//这里是意向数组,初始化为FALSE 
void enter_region(int process)//进程号为或者
{
    int other;//代表另外的那个进程的号
    other=1-process;
    interested[process]=TURE;//设置当前进程的意向值为TURE 来表明感兴趣
    turn=process;//设置标志位
    while(turn==process&&interested[other]==TURE);//当互斥条件不满足即while里面的条件满足时忙等待
}
void leave_region(int process)
{
    interested[process]=FALSE;//设置意向值表示不感兴趣
}

在下文的描述中我将用P0、P1、turn的顺序来描写三元组。

当程序0想进入临界区的时候,将P0改写为1,并且将turn改为1,而程序1想进入时,则将P1改为1,turn改为0。然后再经过一个逻辑判断循环,如果满足条件则循环结束,开始进入临界区的指令,否则一直循环,类似于忙等待。这两者离开临界区后把相应的意向数组的元素改为0。

首先对P0来进行分析。当P0运行到while()时,三元组的情况只有四种:

  • 三元组为1,1,0时,这种情况只有可能出现在P0已经修改完了之后P1再进行修改。而P1在修改时无法判断P0是否已经进入:因为P1修改时P0的意向值为1,且turn为1,很有可能 0已经进入了临界区,P1不会进入临界区。所以此时P0可以进入临界区。

  • 三元组为1,0,0时,唯一的可行情况就是P1程序并没有进入临界区的企图。所以此时P0可以进入临界区。

  • 三元组为1,1,1时,这说明P1的修改被P0重写了,所以可能P1正在临界区。所以此时P0不得进入临界区。

  • 三元组为1,0,1时,此时是最简单的情况,即程序1不想也不在临界区,程序0可以进入。

所以总的条件就是三元组在全为1时,P0进入忙等待,直到条件打破才能进入临界区;其他情况下P0可以进入临界区。

现在对P1来分析。跟上面的分析类似,当P1运行到while()时,三元组也只有四种可能:

  • 三元组为0,1,0时。这时的唯一可能情形就是在P1修改interested[1]之后,P0尝试进入临界区并进入成功,然后退出了临界区。所以此时P1可以进入临界区。

  • 三元组为1,1,0时,可能P0正在尝试进入临界区,做了修改却被P1重新改掉了,所以不能确定P0是否已经进入了临界区,所以此时P1不得进入临界区。

  • 三元组为0,1,1时,P0并不在临界区内,同时P0并未试图进入临界区。所以,此时P1可以进入临界区。

  • 三元组为1,1,1时,此时P1的修改被程序0给覆盖了,而P0覆盖之后会发现此时的三元组是1、1、1,按照上面的分析P0在这个时候是不会进入临界区的,所以此时P1可以进入临界区。

综上只有1、1、0的时候P1才不能进入临界区,此时的判定条件为while((flag[0]\&\&!turn)==1),在这个时候不停的循环,直到条件打破。

通过上面的分析,我们可以把这两个条件综合起来,从而得到了下面的这个判断语句:

while(turn==process&&interested[other]==TURE)

读写竞争:多线程

在多线程时,peterson算法就无法使用了。此时,必须使用mutex来控制并发访问。下面便以多线程访问一个list为例来说明C++中使用mutex的方法。

#include <mutex>
#include <iostream>
#include <thread>
#include <list>
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    return std::find(some_list.begin(), some_list.end(), value_to_find)
        != some_list.end();
}

在上面的代码中,我们对于list的每一个操作都加了一个锁,从而实现了互斥访问。本来mutex需要unlock的,但是这里巧妙地利用了C++中的RAII,避免了每次获得锁之后都手工unlock。方法就是在lock\_guard的析构函数中进行unlock,这样就保证了互斥锁的正确释放。

互斥锁是解决竞争的最基本方案,但是代价较高。有些操作之间是可以并行存在的,强制性的互斥访问可能会降低效率。例如list\_contains这个操作是非修改性操作,多个该操作的并行执行并不影响结果。所以,当前的互斥锁mutex对于大量的非修改操作来说并不是一个友好的方案,而读写锁(reader writer mutex)则应运而生。但是C++标准委员会拒绝了读写锁的提案,所以我们只能从Boost库中找到相应的解决方案,即boost::shared\_mutex。具体的使用代码如下:

boost::shared_mutex _access;
void reader()
{
  boost::shared_lock< boost::shared_mutex > lock(_access);
  // do work here, without anyone having exclusive access
}
void conditional_writer()
{
  boost::upgrade_lock< boost::shared_mutex > lock(_access);
  // do work here, without anyone having exclusive access
  if (something) {
    boost::upgrade_to_unique_lock< boost::shared_mutex > uniqueLock(lock);
    // do work here, but now you have exclusive access
  }
  // do more work here, without anyone having exclusive access
}
void unconditional_writer()
{
  boost::unique_lock< boost::shared_mutex > lock(_access);
  // do work here, with exclusive access
}

从上面的代码可以看出boost::shared\_mutex的使用有三种情况:共享锁,可升级锁,独占锁。这三种情况提供了一些灵活性,同时相对于C++标准来说又比较杂乱。如果bu'shi此外,读写锁还有两种策略:一种是在读者存在的情况下写者试图访问,会阻塞新来的读者;而另外一种则不会阻塞新来的读者。当前的读写锁好像木有说明它是哪一种模式,需要测试。

多线程接口设计

通过使用mutex我们解决了多线程之间对于共享内存的互斥访问问题,现在来处理多线程的异常问题。以下面的多线程栈为例,假设当前栈的接口如下:

template<typename T, typename Container = std::deque<T> >
class stack
{
public:
    explicit stack(const Container&);
    explicit stack(Container&& = Container());
    template <class Alloc> explicit stack(const Alloc&);
    template <class Alloc> stack(const Container&, const Alloc&);
    template <class Alloc> stack(Container&&, const Alloc&);
    template <class Alloc> stack(stack&&, const Alloc&);
    bool empty() const;
    size_t size() const;
    T& top();
    T const& top() const;
    void push(T const&);
    void push(T&&);
    void pop();
    void swap(stack&&);
};

假设我们利用了mutex保证了该栈接口都能保证线程之间互斥访问,此时并没有万事大吉。考虑下面的代码:

stack<int> s;
if (!s.empty())
{
    int const value = s.top();
    s.pop();
    do_something(value);
}

该代码在单线程下是完美无缺的,但是在多线程下却有可能出现问题。因为在判断empty之后,可能有其他线程进行了pop操作导致栈空。此时如果再去top则会抛出异常。

多线程栈竞争

我们可以看出,虽然保证了接口是线程安全的,但是这样的一个简单操作却仍然无法保证正确性。这种错误根源在于接口设计的不合理性,我们需要把判断后访问栈顶元素的操作合并为一个操作。

但是,在决定动手之前,我们还需要考虑更多的细节。例如,在stack<vector<int>>中,栈中的元素是一个vector<int>。如果我们通过拷贝构造的方式来返回vector<int>的话,需要临时构造一个相同大小的对象。新对象的构造需要牵涉到内存管理,从而潜在的包含了bad\_alloc异常。在该情况出现时,pop会造成栈已经被修改而无法返回正确的值的情形,从而造成值丢失。因此,我们不能使用直接返回值的方式来进行pop。下面是几种备选方案:

  • 将目标对象以引用方式来传递,从而避免拷贝构造。此时我们需要增加一个接口pop(T\&),这个方案能解决很多问题。但是这个方案要求预先构造一个对象,并进行赋值。这些操作对于某些类型来说并不可行:需要明确参数的构造函数,赋值运算符被删除。

  • 强制要求拷贝构造函数不抛出异常,或者使用移动构造的形式来进行值返回。但是,同上面的原因,某些类型木有移动构造函数或者noexcept的拷贝构造函数。

  • 返回栈顶元素的指针,而不是栈顶元素的值。返回指针的好处是不会抛出异常,缺点是需要手工维护内存,这个非常危险。不过我们可以利用C++中的std::share\_ptr来自动管理指针。

三个方法各有优点,我们还可以把这三个优点都综合起来。最后的接口设计如下:

#include <exception>
#include <memory>
struct empty_stack : std::exception
{
    const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
public:
    threadsafe_stack();
    threadsafe_stack(const threadsafe_stack&);
    threadsafe_stack& operator=(const threadsafe_stack&) = delete;
    void push(T new_value);
    std::shared_ptr<T> pop();
    void pop(T& value);
    bool empty() const;
};

在这个新的接口设计中,我们禁止了赋值运算符,同时删除了swap函数。在该接口的指引下,我们可以得到下面的实现代码:

class threadsafe_stack
{
private:
    std::stack<T> data;
    mutable std::mutex m;
public:
    threadsafe_stack()
    {
    }
    threadsafe_stack(const threadsafe_stack& other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;
    }
    threadsafe_stack& operator=(const threadsafe_stack&) = delete;
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lock(m);
        data.push(new_value);
    }
    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
        data.pop();
        return res;
    }
    void pop(T& value)
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        value = data.top();
        data.pop();
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }
};

pop的实现中,即使make\_shared抛出了bad\_alloc异常,数据也不会丢失,因为真正意义上的pop的执行在该异常之后。在多线程程序中,尽量把数据修改操作延迟到可能的异常之后,以防止异常对整个数据结构的破坏。

死锁

很多时候,单独用一个锁来管理所有的资源会导致所谓的并行完全变成了串行(考虑为全内存加锁的极端情况)。为了增大并行的粒度,需要增加多个锁来管理不同部分的资源。但是多个锁的情况下会引起总所周知的死锁问题:线程1拥有锁A请求锁B,同时线程2拥有锁B请求锁A。对于预防死锁,解决方案有两个:

  • 要求所有线程在开始时一次性获得所有的锁。

  • 要求所有线程按照同样的顺序来获得锁。

对于一次性对多个锁进行加锁的情况,C++内建了这种支持。如下代码:

class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
    some_big_object some_detail;
    std::mutex m;
public:
    X(some_big_object const& sd) :some_detail(sd)
    {
    }
    friend void swap(X& lhs, X& rhs)
    {
        if (&lhs == &rhs)
            return;
        std::lock(lhs.m, rhs.m);
        std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
        std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
        swap(lhs.some_detail, rhs.some_detail);
    }
};

在上面的代码中,std::lock(lhs.m, rhs.m)一次性获得了两个锁,而且是异常安全的。如果获得了一个锁之后出现异常,则会自动释放已经获得的锁。但是为了得到函数结束自动释放锁的要求,我们必须将所得到的锁分发给std::lock\_guard,并利用RAII来进行锁的释放。

尽管std::lock能处理同时获得多个锁的情况,但是对于多个锁不是同时获得的情况无能为力(例如前面提到的读写锁中的condition\_wirter)。此时,唯一的解决方案就是为不同的所安排不同的层级:获得顺序中低等级的锁不能在高等级锁的前面,类似于IRQL。标准库之中并不存在所谓的等级锁,但是我们可以自己造啊。一个等级锁的实现如下:

class hierarchical_mutex
{
    std::mutex internal_mutex;
    unsigned long const hierarchy_value;
    unsigned long previous_hierarchy_value;
    static thread_local unsigned long this_thread_hierarchy_value;
    void check_for_hierarchy_violation()
    {
        if (this_thread_hierarchy_value <= hierarchy_value)
        {
            throw std::logic_error(mutex hierarchy violated);
        }
    }
    void update_hierarchy_value()
    {
        previous_hierarchy_value = this_thread_hierarchy_value;
        this_thread_hierarchy_value = hierarchy_value;
    }
public:
    explicit hierarchical_mutex(unsigned long value) :
        hierarchy_value(value),
        previous_hierarchy_value(0)
    {
    }
    void lock()
    {
        check_for_hierarchy_violation();
        internal_mutex.lock();
        update_hierarchy_value();
    }
    void unlock()
    {
        this_thread_hierarchy_value = previous_hierarchy_value;
        internal_mutex.unlock();
    }
    bool try_lock()
    {
        check_for_hierarchy_violation();
        if (!internal_mutex.try_lock())
            return false;
        update_hierarchy_value();
        return true;
    }
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

注意最后的thread\_local关键字,该关键字是在C++11中引入的,意义是每一个线程都含有自己单独的一个变量。该变量初始为ULONG\_MAX,从而使得其他等级的锁都比当前优先级低,保证了第一次加锁的可行性。在多次加锁的过程中,会检查要获得的锁是否比当前线程所获得的锁优先级还高,这是通过check\_for\_hierarchy\_violation来完成的。当有违反的时候,直接爆异常。成功加锁时,记录上一个锁的优先级状态,从而使得在释放的时候能够回溯当前锁加锁前的优先级。通过这样的一个优先级记录保证了在加锁与释放锁的复合操作下仍然能够维持优先级顺序。

单例模式

单例模式(Singleton):确保某一对象只有一个实例。单例模式要求目标对象仅仅构造一次,类似的还有某些对象构造资源消耗过大,我们只希望该对象在真正使用的时候才去构造,即所谓的延迟构造。为了实现延迟构造,我们需要执行类似于下面的代码:

std::shared_ptr<some_resource> resource_ptr;
void foo()
{
    if (!resource_ptr)
    {
        resource_ptr.reset(new some_resource);
    }
    resource_ptr->do_something();
}

该代码在单线程的时候是能够满足要求的,在多线程时则会有问题。为了处理多线程的情况,我们需要加锁:

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
    std::unique_lock<std::mutex> lk(resource_mutex);
    if (!resource_ptr)
    {
        resource_ptr.reset(new some_resource);
    }
    lk.unlock();
    resource_ptr->do_something();
}

正确性问题解决了,但是问题来了:每次操作都需要加锁,开销非常大。在第一次初始化完全之后,完全没有必要再去加锁。为了解决开销问题,有人发明了两次加锁法(double checked locking),代码如下:

void undefined_behaviour_with_double_checked_locking()
{
    if (!resource_ptr)
    {
        std::lock_guard<std::mutex> lk(resource_mutex);
        if (!resource_ptr)
        {
            resource_ptr.reset(new some_resource);
        }
    }
    resource_ptr->do_something();
}

只有当初始值不存在的时候才去加锁,看上去很美。但是,有一个非常细微的bug:C++中的new是分两步执行的,一步是分配内存,一步是初始化对象。而指针的赋值即可能在分配内存之后,也可能在初始化对象之后。如果在分配内存之后初始完对象之前就对指针赋值,另外的线程可能会检查到指针不为空,并执行resource\_ptr->do\_something(),而此时对象并未构造完全。这种访问是C++中的未定义行为,整个程序都可能崩。由于new的这个特性,使得之前在C++中依靠new实现单例模式是不可能的。

另外的一种方法是声明一个该类型的static变量,从而试图在运行前创建一个单一对象,摆脱了new的干扰。但是此时同样会有数据竞争:即多个线程都认为自己是第一个进行初始化的或者当一个线程正在初始化的时候另外一个线程对其进行使用。因此,在C++11之前,正确的实现单例模式是不可能的。为此,新标准中规定static修饰的变量保证了只会初始化一次,该特性是建立在std::call\_once之上的(这里先略去相关介绍)。所以,实现单例模式现在只需要下面的几行代码:

class my_class;
my_class& get_my_class_instance()
{
    static my_class instance;
    return instance;
}

参考链接

Published:
2015-04-28 20:23
Category:
Tag:
CPP15