Lockfree Hashtable


  1. hash表的核心就是探查方法,开链法在插入节点的时候会引入动态内存分配的问题,这个是在无锁里面很棘手的问题,所以没有采取开链法,同时二次探查在无锁和缓存上面很不友好,所以使用的就是线性探查。因此我们首先要使得线性探查无锁化。
  2. hash表插入的时候可能会导致过载。在STL的实现中是发现map内部装载率大于一定值时将map扩容。由于扩容的时候会出现迭代器失效的问题,所以这种方法在无锁的时候压根不可行。所以很多实现是直接开一个新的当前表大小的干净副本,通过指针将所有副本链接起来。查询和插入的时候需要遍历所有的副本

lockfree linear search


struct Entry
    mint_atomic32_t key;
    mint_atomic32_t value;
Entry *m_entries;



void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
    for (uint32_t idx = 0;; idx++)
        uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
        if ((prevKey == 0) || (prevKey == key))
            mint_store_32_relaxed(&m_entries[idx].value, value);


uint32_t ArrayOfItems::GetItem(uint32_t key)
    for (uint32_t idx = 0;; idx++)
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey == key)
            return mint_load_32_relaxed(&m_entries[idx].value);
        if (probedKey == 0)
            return 0;          

现在的疑问在于,这里的原子操作使用的都是relaxed语义,这个语义在x86上基本等于没有任何作用,如何在使得SetItem里的第8行能够被GetItem的第7行可见。事实上这压根做不到,因为一个线程在执行到SetItem的第8行之前被换出, 然后另外一个线程执行到了GetItem的第7行,这里读取的还是老的值。除了这种情况之外,还可能出现SetItem里的CAS操作并没有将数据更新的通知发放到其他的core上去,然而第8行的store操作已经被另外一个执行GetItem的线程可见的情况,此时GetItem会返回0。这两种情况都是合法的,因为在多线程中读取数据的时机是不确定的,因此读取老数据也是正常的。甚至可以说在没有通知机制的情况下,是不是最新根本没有意义。如果要实现publish-listen的机制,则需要在SetItem的时候将一个原子的bool变量设置为True,同时这个Store操作要使用Release语义,同时另外一个线程在CAS这个值的时候,要使用Acquire语义。

// Shared variables
char message[256];
ArrayOfItems collection;

void PublishMessage()
    // Write to shared memory non-atomically.
    strcpy(message, "I pity the fool!");

    // Release fence: The only way to safely pass non-atomic data between threads using Mintomic.

    // Set a flag to indicate to other threads that the message is ready.
    collection.SetItem(SHARED_FLAG_KEY, 1)


preshing对SetItem有一个优化:减少不必要的CAS操作。在原来的实现中会遍历所有的元素去执行CAS操作,其实只有key == 0 or key == my_key的时候我们才需要去做CAS。所以这里的优化就是预先作一次load,发现可以去set的时候才去CAS

void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
    for (uint32_t idx = 0;; idx++)
        // Load the key that was there.
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey != key)
            // The entry was either free, or contains another key.
            if (probedKey != 0)
                continue;           // Usually, it contains another key. Keep probing.

            // The entry was free. Now let's try to take it using a CAS.
            uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
            if ((prevKey != 0) && (prevKey != key))
                continue;       // Another thread just stole it from underneath us.

            // Either we just added the key, or another thread did.

        // Store the value in this array entry.
        mint_store_32_relaxed(&m_entries[idx].value, value);

naive lockfree hashtable

在上面的lockfree linear scan的基础上,做一个lockfree hashtable还是比较简单的。这里定义了三个函数intergerHash, SetItem, GetItem

inline static uint32_t integerHash(uint32_t h)
    h ^= h >> 16;
    h *= 0x85ebca6b;
    h ^= h >> 13;
    h *= 0xc2b2ae35;
    h ^= h >> 16;
    return h;

这个hash函数的来源是MurmurHash3’s integer finalizer , 据说这样可以让每一位都起到差不多的作用。

void HashTable1::SetItem(uint32_t key, uint32_t value)
    for (uint32_t idx = integerHash(key);; idx++)
        idx &= m_arraySize - 1;

        // Load the key that was there.
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey != key)
            // The entry was either free, or contains another key.
            if (probedKey != 0)
                continue;           // Usually, it contains another key. Keep probing.

            // The entry was free. Now let's try to take it using a CAS.
            uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
            if ((prevKey != 0) && (prevKey != key))
                continue;       // Another thread just stole it from underneath us.

            // Either we just added the key, or another thread did.

        // Store the value in this array entry.
        mint_store_32_relaxed(&m_entries[idx].value, value);



uint32_t HashTable1::GetItem(uint32_t key)
    for (uint32_t idx = integerHash(key);; idx++)
        idx &= m_arraySize - 1;

        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey == key)
            return mint_load_32_relaxed(&m_entries[idx].value);
        if (probedKey == 0)
            return 0;          


// Shared variables
char message[256];
HashTable1 collection;

void PublishMessage()
    // Write to shared memory non-atomically.
    strcpy(message, "I pity the fool!");

    // Release fence: The only way to safely pass non-atomic data between threads using Mintomic.

    // Set a flag to indicate to other threads that the message is ready.
    collection.SetItem(SHARED_FLAG_KEY, 1)

至于delete操作,我们可以规定value是某个值的时候代表当前entry是被删除的,这样就可以用SetItem(key, 0)来模拟delete操作了。

what about full

上面的无锁hashtable有一个致命缺陷,他没有处理整个hashtable满了的情况。为了处理满的情况,我们需要设置最大探查数量为当前hashtable的容量, 同时维护多个独立的hashtable,用一个无锁的链表将所有的hashtable的指针串联起来。如果最大探查数量达到上限,且当前hashtable没有下一个hashtable的指针,且则先建立一个新的hashtable,并挂载到无锁链表上,回到了有下一个hashtable的情况,然后对下一个hashtable做递归遍历。


  1. 维持一个hashtable的无锁链表,链表的头节点就叫做main_hashtable,所有的hashtable通过一个next指针相连;
  2. 插入的时候如果发现当前的main_hashtable的装载因子(这个装载因子考虑了所有的key)已经大于0.5,则新建一个hashtable,然后插入到新的hashtable里;
  3. 扩容的时候设置一个标志位,表明当前正在扩容,避免多个线程同时扩容,浪费资源,扩容期间所有等待扩容的线程都忙等待,扩容完成之后清除正在扩容的标记;
  4. 新建立的hashtable是空的,大小为当前main_hashtable的两倍,每次新加入一个hashtable的时候都插入到头部,使之成为新的main_hashtable
  5. 查询的时候,根据这些next指针一直查询,直到最后一个hashtable
  6. 如果查询返回结果的时候发现返回结果的那个hashtable并不是main_hashtable,则把当前的key value对插入到main_hashtable里,这就是核心的lazy copy的过程


auto id = details::thread_id();
        auto hashedId = details::hash_thread_id(id);

        auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
        for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
            // Look for the id in this hash
            auto index = hashedId;
            while (true) {      // Not an infinite loop because at least one slot is free in the hash table
                index &= hash->capacity - 1;

                auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
                if (probedKey == id) {
                    // Found it! If we had to search several hashes deep, though, we should lazily add it
                    // to the current main hash table to avoid the extended search next time.
                    // Note there's guaranteed to be room in the current hash table since every subsequent
                    // table implicitly reserves space for all previous tables (there's only one
                    // implicitProducerHashCount).
                    auto value = hash->entries[index].value;
                    if (hash != mainHash) {
                        index = hashedId;
                        while (true) {
                            index &= mainHash->capacity - 1;
                            probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
                            auto empty = details::invalid_thread_id;
                            auto reusable = details::invalid_thread_id2;
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed)) ||
                                (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire))) {
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed))) {
                                mainHash->entries[index].value = value;

                    return value;
                if (probedKey == details::invalid_thread_id) {
                    break;      // Not in this hash table


// Insert!
            auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
            while (true)
                if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire))
                    // We've acquired the resize lock, try to allocate a bigger hash table.
                    // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
                    // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
                    // locked block).
                    mainHash = implicitProducerHash.load(std::memory_order_acquire);
                    if (newCount >= (mainHash->capacity >> 1))
                        auto newCapacity = mainHash->capacity << 1;
                        while (newCount >= (newCapacity >> 1))
                            newCapacity <<= 1;
                        auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
                        if (raw == nullptr)
                            // Allocation failed
                            implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                            return nullptr;

                        auto newHash = new (raw) ImplicitProducerHash;
                        newHash->capacity = newCapacity;
                        newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
                        for (size_t i = 0; i != newCapacity; ++i)
                            new (newHash->entries + i) ImplicitProducerKVP;
                            newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
                        newHash->prev = mainHash;
                        implicitProducerHash.store(newHash, std::memory_order_release);
                        mainHash = newHash;

                // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
                // to finish being allocated by another thread (and if we just finished allocating above, the condition will
                // always be true)
                if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2))
                    bool recycled;
                    auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
                    if (producer == nullptr)
                        implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                        return nullptr;
                    if (recycled)
                        implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);

                    producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
                    producer->threadExitListener.userData = producer;

                    auto index = hashedId;
                    while (true)
                        index &= mainHash->capacity - 1;
                        auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);

                        auto empty = details::invalid_thread_id;
                        auto reusable = details::invalid_thread_id2;
                        if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)) ||
                            (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire)))
                        if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)))
                            mainHash->entries[index].value = producer;
                    return producer;

                // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
                // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
                // we try to allocate ourselves).
                mainHash = implicitProducerHash.load(std::memory_order_acquire);


newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)


newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)

这个分支里还做了一些事情,就是当真正的获得了一个implicit producer之后,注册一个线程退出的callback,这个callback会把当前producer销毁,并在hashtable里删除对应的key


