Lockfree Hashtable

Lockfree Hashtable

对于让hashtable变成无锁,主要要解决如下几个问题:

  1. hash表的核心就是探查方法,开链法在插入节点的时候会引入动态内存分配的问题,这个是在无锁里面很棘手的问题,所以没有采取开链法,同时二次探查在无锁和缓存上面很不友好,所以使用的就是线性探查。因此我们首先要使得线性探查无锁化。
  2. hash表插入的时候可能会导致过载。在STL的实现中是发现map内部装载率大于一定值时将map扩容。由于扩容的时候会出现迭代器失效的问题,所以这种方法在无锁的时候压根不可行。所以很多实现是直接开一个新的当前表大小的干净副本,通过指针将所有副本链接起来。查询和插入的时候需要遍历所有的副本

lockfree linear search

在preshing的一篇文章里面谈到了无锁线性扫描的实现,这里定义了一个基本的Entry:

struct Entry
{
    mint_atomic32_t key;
    mint_atomic32_t value;
};
Entry *m_entries;

在这个Entry里,我们规定如果key的值为0,则代表这个entry还没有被使用,所以插入的时候禁止传入为0的key

在此结构之下,定义的setItem操作如下:

void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
{
    for (uint32_t idx = 0;; idx++)
    {
        uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
        if ((prevKey == 0) || (prevKey == key))
        {
            mint_store_32_relaxed(&m_entries[idx].value, value);
            return;
        }
    }
}

类似的getItem的操作如下:

uint32_t ArrayOfItems::GetItem(uint32_t key)
{
    for (uint32_t idx = 0;; idx++)
    {
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey == key)
            return mint_load_32_relaxed(&m_entries[idx].value);
        if (probedKey == 0)
            return 0;          
    }
}

现在的疑问在于,这里的原子操作使用的都是relaxed语义,这个语义在x86上基本等于没有任何作用,如何在使得SetItem里的第8行能够被GetItem的第7行可见。事实上这压根做不到,因为一个线程在执行到SetItem的第8行之前被换出, 然后另外一个线程执行到了GetItem的第7行,这里读取的还是老的值。除了这种情况之外,还可能出现SetItem里的CAS操作并没有将数据更新的通知发放到其他的core上去,然而第8行的store操作已经被另外一个执行GetItem的线程可见的情况,此时GetItem会返回0。这两种情况都是合法的,因为在多线程中读取数据的时机是不确定的,因此读取老数据也是正常的。甚至可以说在没有通知机制的情况下,是不是最新根本没有意义。如果要实现publish-listen的机制,则需要在SetItem的时候将一个原子的bool变量设置为True,同时这个Store操作要使用Release语义,同时另外一个线程在CAS这个值的时候,要使用Acquire语义。

// Shared variables
char message[256];
ArrayOfItems collection;

void PublishMessage()
{
    // Write to shared memory non-atomically.
    strcpy(message, "I pity the fool!");

    // Release fence: The only way to safely pass non-atomic data between threads using Mintomic.
    mint_thread_fence_release();

    // Set a flag to indicate to other threads that the message is ready.
    collection.SetItem(SHARED_FLAG_KEY, 1)
}

这样才能使得数据更改完并通知完之后,另外一方能够得到最新数据。因此,当前设计的无锁hashtable在多线程上唯一做的事情就是防止了多个线程对同一个entry同时做SetItem操作。

preshing对SetItem有一个优化:减少不必要的CAS操作。在原来的实现中会遍历所有的元素去执行CAS操作,其实只有key == 0 or key == my_key的时候我们才需要去做CAS。所以这里的优化就是预先作一次load,发现可以去set的时候才去CAS

void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
{
    for (uint32_t idx = 0;; idx++)
    {
        // Load the key that was there.
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey != key)
        {
            // The entry was either free, or contains another key.
            if (probedKey != 0)
                continue;           // Usually, it contains another key. Keep probing.

            // The entry was free. Now let's try to take it using a CAS.
            uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
            if ((prevKey != 0) && (prevKey != key))
                continue;       // Another thread just stole it from underneath us.

            // Either we just added the key, or another thread did.
        }

        // Store the value in this array entry.
        mint_store_32_relaxed(&m_entries[idx].value, value);
        return;
    }
}

naive lockfree hashtable

在上面的lockfree linear scan的基础上,做一个lockfree hashtable还是比较简单的。这里定义了三个函数intergerHash, SetItem, GetItem

inline static uint32_t integerHash(uint32_t h)
{
    h ^= h >> 16;
    h *= 0x85ebca6b;
    h ^= h >> 13;
    h *= 0xc2b2ae35;
    h ^= h >> 16;
    return h;
}

这个hash函数的来源是MurmurHash3’s integer finalizer , 据说这样可以让每一位都起到差不多的作用。

void HashTable1::SetItem(uint32_t key, uint32_t value)
{
    for (uint32_t idx = integerHash(key);; idx++)
    {
        idx &= m_arraySize - 1;

        // Load the key that was there.
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey != key)
        {
            // The entry was either free, or contains another key.
            if (probedKey != 0)
                continue;           // Usually, it contains another key. Keep probing.

            // The entry was free. Now let's try to take it using a CAS.
            uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
            if ((prevKey != 0) && (prevKey != key))
                continue;       // Another thread just stole it from underneath us.

            // Either we just added the key, or another thread did.
        }

        // Store the value in this array entry.
        mint_store_32_relaxed(&m_entries[idx].value, value);
        return;
    }
}

这里的SetItem正确工作有一个前提:整个hashtable不是满的,是满的一定会出错。

GetItem还是老样子:

uint32_t HashTable1::GetItem(uint32_t key)
{
    for (uint32_t idx = integerHash(key);; idx++)
    {
        idx &= m_arraySize - 1;

        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey == key)
            return mint_load_32_relaxed(&m_entries[idx].value);
        if (probedKey == 0)
            return 0;          
    }
}

所谓的publish函数也是一样:

// Shared variables
char message[256];
HashTable1 collection;

void PublishMessage()
{
    // Write to shared memory non-atomically.
    strcpy(message, "I pity the fool!");

    // Release fence: The only way to safely pass non-atomic data between threads using Mintomic.
    mint_thread_fence_release();

    // Set a flag to indicate to other threads that the message is ready.
    collection.SetItem(SHARED_FLAG_KEY, 1)
}

至于delete操作,我们可以规定value是某个值的时候代表当前entry是被删除的,这样就可以用SetItem(key, 0)来模拟delete操作了。

what about full

上面的无锁hashtable有一个致命缺陷,他没有处理整个hashtable满了的情况。为了处理满的情况,我们需要设置最大探查数量为当前hashtable的容量, 同时维护多个独立的hashtable,用一个无锁的链表将所有的hashtable的指针串联起来。如果最大探查数量达到上限,且当前hashtable没有下一个hashtable的指针,且则先建立一个新的hashtable,并挂载到无锁链表上,回到了有下一个hashtable的情况,然后对下一个hashtable做递归遍历。

这样做的确解决了扩容的问题,但是会出现性能下降的问题。后面过来的key在查询的时候会变得越来越慢,因为经常需要查询多层的hashtable。为了避免这个问题,出现了一种新的设计:每次添加一层hashtable的时候,都将容量扩大一倍,然后将上一个hashtable的内容拷贝到新的hashtable里。这个新的hashtable也叫做main_hashtable,由于我们无法在无锁的情况下把整个hashtable拷贝过去,所以采用lazy的方式,这个方式的步骤如下:

  1. 维持一个hashtable的无锁链表,链表的头节点就叫做main_hashtable,所有的hashtable通过一个next指针相连;
  2. 插入的时候如果发现当前的main_hashtable的装载因子(这个装载因子考虑了所有的key)已经大于0.5,则新建一个hashtable,然后插入到新的hashtable里;
  3. 扩容的时候设置一个标志位,表明当前正在扩容,避免多个线程同时扩容,浪费资源,扩容期间所有等待扩容的线程都忙等待,扩容完成之后清除正在扩容的标记;
  4. 新建立的hashtable是空的,大小为当前main_hashtable的两倍,每次新加入一个hashtable的时候都插入到头部,使之成为新的main_hashtable
  5. 查询的时候,根据这些next指针一直查询,直到最后一个hashtable
  6. 如果查询返回结果的时候发现返回结果的那个hashtable并不是main_hashtable,则把当前的key value对插入到main_hashtable里,这就是核心的lazy copy的过程

这个lazy的过程代码如下:

auto id = details::thread_id();
        auto hashedId = details::hash_thread_id(id);

        auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
        for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
            // Look for the id in this hash
            auto index = hashedId;
            while (true) {      // Not an infinite loop because at least one slot is free in the hash table
                index &= hash->capacity - 1;

                auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
                if (probedKey == id) {
                    // Found it! If we had to search several hashes deep, though, we should lazily add it
                    // to the current main hash table to avoid the extended search next time.
                    // Note there's guaranteed to be room in the current hash table since every subsequent
                    // table implicitly reserves space for all previous tables (there's only one
                    // implicitProducerHashCount).
                    auto value = hash->entries[index].value;
                    if (hash != mainHash) {
                        index = hashedId;
                        while (true) {
                            index &= mainHash->capacity - 1;
                            probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
                            auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
                            auto reusable = details::invalid_thread_id2;
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed)) ||
                                (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire))) {
#else
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed))) {
#endif
                                mainHash->entries[index].value = value;
                                break;
                            }
                            ++index;
                        }
                    }

                    return value;
                }
                if (probedKey == details::invalid_thread_id) {
                    break;      // Not in this hash table
                }
                ++index;
            }
        }

现在的核心则转移到了扩容的过程,扩容涉及到了动态内存分配和初始内容的填充,是比较耗的操作,所以要避免多个线程在争抢扩容的控制权。在moodycamel的设计里,是这样处理扩容的。

// Insert!
            auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
            while (true)
            {
                if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire))
                {
                    // We've acquired the resize lock, try to allocate a bigger hash table.
                    // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
                    // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
                    // locked block).
                    mainHash = implicitProducerHash.load(std::memory_order_acquire);
                    if (newCount >= (mainHash->capacity >> 1))
                    {
                        auto newCapacity = mainHash->capacity << 1;
                        while (newCount >= (newCapacity >> 1))
                        {
                            newCapacity <<= 1;
                        }
                        auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
                        if (raw == nullptr)
                        {
                            // Allocation failed
                            implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                            implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
                            return nullptr;
                        }

                        auto newHash = new (raw) ImplicitProducerHash;
                        newHash->capacity = newCapacity;
                        newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
                        for (size_t i = 0; i != newCapacity; ++i)
                        {
                            new (newHash->entries + i) ImplicitProducerKVP;
                            newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
                        }
                        newHash->prev = mainHash;
                        implicitProducerHash.store(newHash, std::memory_order_release);
                        implicitProducerHashResizeInProgress.clear(std::memory_order_release);
                        mainHash = newHash;
                    }
                    else
                    {
                        implicitProducerHashResizeInProgress.clear(std::memory_order_release);
                    }
                }

                // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
                // to finish being allocated by another thread (and if we just finished allocating above, the condition will
                // always be true)
                if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2))
                {
                    bool recycled;
                    auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
                    if (producer == nullptr)
                    {
                        implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                        return nullptr;
                    }
                    if (recycled)
                    {
                        implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                    }

#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
                    producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
                    producer->threadExitListener.userData = producer;
                    details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
#endif

                    auto index = hashedId;
                    while (true)
                    {
                        index &= mainHash->capacity - 1;
                        auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);

                        auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
                        auto reusable = details::invalid_thread_id2;
                        if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)) ||
                            (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire)))
                        {
#else
                        if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)))
                        {
#endif
                            mainHash->entries[index].value = producer;
                            break;
                        }
                        ++index;
                    }
                    return producer;
                }

                // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
                // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
                // we try to allocate ourselves).
                mainHash = implicitProducerHash.load(std::memory_order_acquire);
            }

上面的第五行就是抢夺控制权的过程,进入扩容的条件就是当前装载因子已经大于0.5,且扩容标志位没有设置。

newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)

扩容的时候设置一个标志位,相当于一个锁,扩容完成之后清空标志位。但是由于线程换出的存在,这个标志位可能导致其他线程永远抢不到控制权,进入无限死循环。所以这里又对没有抢夺到扩容控制权的线程,还有另外的一个判断,如果装载因子小于0.75,则直接尝试插入,不用管。

newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)

这个分支里还做了一些事情,就是当真正的获得了一个implicit producer之后,注册一个线程退出的callback,这个callback会把当前producer销毁,并在hashtable里删除对应的key

最后剩下的一种情况就是:拿不到扩容所有权,且当前装载因子已经上了0.75,此时除了死循环没有办法,约等于死锁。这种情况很罕见,但是仍然可以构造出来:正在扩容的线程被换出。不知原作者如何处理这个情况。

Published:
2017-06-26 00:50
Category:
Tag:
Cpp15