游戏中的排行榜

绝大部分游戏中都会设置各种排行榜,来代表玩家某个维度的积分排名。最令人熟知的就是天梯排行榜,比较知名的就是国服第一白牛这类排行。除了天梯排行榜外,排行榜的种类则是五花八门,我们耳熟能详的其他排行榜包括但不限于:等级排行榜,战力排行榜,竞速排行榜,成就排行榜,大秘境排行榜等。这些排行榜是玩家实力的光荣榜,不断的激励着游戏玩家在游戏内做很多重复性的劳动来打磨技巧,就为了能够在社区分享自己在榜上的截图。笔者当前也是深受其害,为了进入暗黑破坏神3的大秘境野蛮人排行榜首页浪费了好多时间:

暗黑3排行榜

从上面的这张图可以看出,一个大秘境排行榜其实被地区、职业、赛季、人数、模式等多个规则细分出了很多个子榜,所以完整榜单数量会有上百个。而且对于每个排行榜,其上榜人数也很多,暗黑3这里设置为了1000人:

暗黑3排行榜人数大小

由于有这么多的排行榜,每个排行榜上的人数也很多,使用一个高效的结构来管理排行榜是非常必要的。这些排行榜逻辑与算法面试中经常被考察的Top(N)排序很相似,不过游戏中面对的场景主要是大量动态变化的在线数据,而常规的Top(N)排序处理的是指定的已经生成好的离线数组。除了这个Top(N)排序之外,排行榜还需要处理排名查询,即获取指定玩家在这个排行榜的第几名。常规的最佳Top(N)结构最小堆在应对这个查询请求时呈现出来非常大的劣势,因为在堆中计算排名会触发一次完整遍历,其复杂度为O(N),所以堆这个结构完全不适合用来实现排行榜。此外游戏排行榜对于同分排名也有一些特殊需求,一般都会要求最早到达此分数的排在前面。下面我们将对游戏中的常见排行榜的实现来做一些梳理,展示其主要难点与解决技巧。

基于数组的实现

固定人数的排行榜是排行榜的主流,典型的设计中这种排行榜的大小一般都会保持在10000以内,在这种数量级上使用一个固定大小的有序数组来实现排行榜是简单且有效的方法。下面就是用有序数组实现排行榜功能的核心代码,也就一百行出头,简单有效:

struct player_rank
{
	const std::string player_id;
	double rank_score; // 分数越大代表排名越靠前
	std::uint64_t update_ts;// 更新时间戳 用来参与同分排序 
	player_rank(const std::string& in_player_id, double in_score)
		: player_id(in_player_id)
		, rank_score(in_score)
	{

	}
	std::pair<double, std::string_view> compare_key() const
	{
		return std::make_pair(-1 * rank_score, update_ts);
	}
};
struct player_rank_wrapper
{
	const player_rank* rank_ptr;
	bool operator<(const player_rank_wrapper& other) const
	{
		return rank_ptr->compare_key() < other.rank_ptr->compare_key();
	}

};
class array_rank
{
	std::unordered_map<std::string, std::unique_ptr<player_rank>> m_player_ranks;
	const std::uint32_t m_rank_size;
	std::vector<player_rank_wrapper> m_sorted_ranks;
	std::uint64_t m_update_ts_counter = 0;
public:
	array_rank(std::uint32_t in_rank_size)
		: m_rank_size(in_rank_size)
	{
		m_sorted_ranks.reserve(in_rank_size);
	}
	std::uint64_t gen_update_ts()// 用来分配递增时间戳 以避免同分
	{
		return ++m_update_ts_counter;
	}
	// 返回更新之后玩家的排名 从1 开始计数 如果为0 代表不在排行榜上
	std::uint32_t update(const std::string& player_id, double rank_score)
	{
		auto temp_iter = m_player_ranks.find(player_id);
		if (temp_iter == m_player_ranks.end()) // 一个不在排行榜上的玩家
		{
			if (m_sorted_ranks.size() == m_rank_size) // 当前榜上人数已满
			{
				player_rank temp_player_rank(player_id, rank_score);
				if (m_sorted_ranks.back() < player_rank_wrapper{ &temp_player_rank })
				{
					// 分数小于最后一名 则不上榜
					return 0;
				}
				else
				{
					// 删除最后一名 
					m_player_ranks.erase(m_sorted_ranks.back().rank_ptr->player_id);
					m_sorted_ranks.pop_back();
					auto temp_rank_ptr = std::make_unique<player_rank>(player_id, rank_score);
					temp_rank_ptr->update_ts = gen_update_ts();
					// 然后用二分法执行插入
					auto insert_iter = std::lower_bound(m_sorted_ranks.begin(), m_sorted_ranks.end(), player_rank_wrapper{ temp_rank_ptr.get() });
					auto result_iter = m_sorted_ranks.insert(insert_iter, player_rank_wrapper{ temp_rank_ptr.get() });
					m_player_ranks[player_id] = std::move(temp_rank_ptr);
					return std::distance(m_sorted_ranks.begin(), result_iter) + 1;
				}
			}
		}
		else
		{
			// 已经在排行榜上 根据分数上升或者下降进行前后遍历以找到自身位置
			auto pre_key = temp_iter->second->compare_key();
			auto self_iter = std::lower_bound(m_sorted_ranks.begin(), m_sorted_ranks.end(), player_rank_wrapper{ temp_iter->second.get() });
			if (rank_score == temp_iter->second->rank_score)// 分数相同直接返回之前的排名
			{
				return std::distance(m_sorted_ranks.begin(), self_iter) + 1;
			}
			temp_iter->second->rank_score = rank_score;
			temp_iter->second->update_ts = gen_update_ts();
			auto new_key = temp_iter->second->compare_key();
			if (new_key < pre_key)// 向前搜索
			{
				while (self_iter != m_sorted_ranks.begin())
				{
					auto pre_iter = self_iter - 1;
					if (*pre_iter < *self_iter)
					{
						return std::distance(m_sorted_ranks.begin(), self_iter) + 1;
					}
					else
					{
						std::swap(*self_iter, *pre_iter);
						self_iter = pre_iter;
					}
				}
			}
			else// 向后搜索
			{
				while ((self_iter + 1) != m_sorted_ranks.end())
				{
					auto next_iter = self_iter + 1;
					if (*self_iter < *next_iter)
					{
						return std::distance(m_sorted_ranks.begin(), self_iter) + 1;
					}
					else
					{
						std::swap(*self_iter, *next_iter);
						self_iter = next_iter;
					}
				}
			}
		}
	}
	std::uint32_t get_rank(const std::string& player_id) const
	{
		auto temp_iter = m_player_ranks.find(player_id);
		if (temp_iter == m_player_ranks.end()) // 一个不在排行榜上的玩家
		{
			return 0;
		}
		else
		{
			auto self_iter = std::lower_bound(m_sorted_ranks.begin(), m_sorted_ranks.end(), player_rank_wrapper{ temp_iter->second.get() });
			return std::distance(m_sorted_ranks.begin(), self_iter) + 1;
		}
	}
};

上面的代码中实现了一个更新排行的核心接口update,主要流程就是如果新加入的话则删除最后一名然后二分寻找插入位置执行插入,如果已经在榜上则根据积分的下降或者上升执行对应的前后搜索。插入时可能会将整个排行榜数组的元素都后移一位,前后搜索时也可能遍历整个排行榜数组,所以这个接口的最坏复杂度为O(N),这里的N就是当前排行榜数组的大小。不过由于数组是连续存储的,且数组里的元素只是一个指针,所以在内存的缓存系统帮助下其执行时间并没有想象中的那么长。相对而言查询排行榜的接口get_rank就很高效了,只需要执行一次二分查找即可,时间复杂度为O(logN)

不过这里需要考虑到游戏内用来决定排名的玩家积分其实是动态变化的,所以我们不能设置这个有序数组的大小为排行榜大小。如果这样设置的话,如果排行榜上的某个玩家A由于某种原因导致积分下降,导致其并不在真正的Top(N)之中,但是由于没有其他合适的玩家来补充上榜,导致A只能下降为排行榜的最后一名。此时如果玩家B比玩家A积分高,但是发现玩家A在排行榜上但是自己并不在排行榜,排行榜的正确性就会遭受质疑。这种缺乏补位导致的排行榜问题很容易引发各种运营事故,特别是这个排行榜涉及到一些比较贵重的奖励的时候。所以实践中会设置这个大小比排名大小大,经验参数差不多大50%即可,也就是说如果排行榜大小为1000,则有序数组设置为1500。在Github上我建立了一个仓库https://github.com/huangfeidian/rank来用不同的数据结构来实现排行榜,其中的include/array_rank.h部分就对应了前述的带冗余的有序数组排行榜实现。

预留额外容量的方法很大程度上避免了前述的缺乏补位引发的排行榜不正确的问题,不过这个方法并没有根治补位问题。假设某种情况引发这个1500数组中的501个元素都大规模降分导致不在top(1500)之中,此时原来的1501名玩家C就应该出现在排行榜。但是由于C之前并不在之前的top(1500)中,所以此时C并没有上榜。不过考虑到这种大量玩家同时下榜的情况很少见,所以游戏中并不会力求完全解决这个问题,一般来说扩大排行榜内的冗余量到一两倍基本可以避免这个问题。还有一个比较取巧的方法就是客户端在拉取到排行榜Top(N)数据之后,将自身数据通过插入排序加入到获取的数据中然后再选择其中的Top(N)进行显示,这样自身客户端就可以显示正确了。如果发现自身应该上榜但是并没有在排行榜上,则同时向服务器发起一个排行榜更新请求将自身的积分信息推送过去,从而将自身数据插入到服务端的排行榜有序数组中。

基于跳表的实现

前述的使用数组实现的排行榜有个非常大的硬伤就是其更新的时间复杂度为O(N)N为其数组的最大容量,这样就导致了这样的实现在N达到100000以上的级别时单次更新的最坏时间达到了毫秒级别。这个最坏复杂度主要是由数组插入新元素引发的整体后移导致的,而链表在插入一个新元素所需的时间为O(1),不禁让人想用链表来替代数组。但是使用链表来存储有序数据的话,就无法使用lower_bound来执行二分查找,这样导致寻找插入位置所需的时间变成了O(N),顺带的将查询接口get_rank的时间也从O(logN)劣化到了O(N)。所以单纯的使用链表来替换数组反而是一种劣化的实现,想扭转这种劣势需要以某种方式通过记录的额外信息来快速查询链表节点排名,刚好跳表SkipList就是这样的一种支持快速定位的链表结构。

跳表是由 William Pugh1990发明的一种查找数据结构,支持对数据的快速查找,插入和删除。这个数据结构的插入新元素的时间复杂度为O(logN),同时查询元素位置和删除元素的时间复杂度也是O(LogN),可以说是非常优秀,非常适合用来解决这里的排行榜问题。Rediszset底层数据结构使用的就是跳表,所以有很多文章都介绍了完全依赖Rediszset功能来实现排行榜,官方网站也提供了文章来介绍这个排行榜功能。考虑到这个数据结构的实现上并没有很复杂,我们在这里就来详解一下跳表的底层实现,分析一下为何能够达到这样的最优复杂度。

跳表的底层实现是一个多级链表,每一级的链表都是一个有序链表。最底层的0级链表是包含了所有元素的有序链表,在其之上的每一级有序链表都是对应的下一级有序链表的一个稀疏抽样。下图中的a->b->c->d->e就生动的展示了每一级的稀疏抽样过程,最终生成了有五级链表的跳表:

跳表的抽样

在每一级的有序链表生成下一级的有序链表时,当前级别的链表里每个节点进入下一级的概率都是指定的常数p。每一级每个节点计算概率时都是独立无关事件。所以一个0级节点在第k级有序链表中存在相应节点的概率为,一个0级节点的最大层数为k(k>=1)的概率为。因此一个0级节点所有层级的对应节点的数量期望为:

上面的后半部分可以转换为求,此时注意到:

代入进入得到,所以最终的结果为:

所以跳表中的各层级的总节点数量期望为,也就是O(n),所以引入多级有序链表对于总体的空间复杂度来说影响不大。

有了这样的多级链表定义之后,查询一个元素对应的节点就不再需要去遍历整个链表,可以利用这里的多级性质来加速查询。搜索开始时我们定义一个变量search_node,初始化为头节点,然后开始循环处理:

  1. 如果search_node里存储的值等于目标值,则返回search_node
  2. 如果search_node比目标值小
    1. 如果search_node的后续节点为尾节点,或者后续节点的值大于目标值,则将search_node替换为其下一级的节点,如果没有下一级则返回空
    2. 如果search_node的后续节点的值大于目标值,则将search_node替换为这个后续节点

语言描述流程还是过于抽象不好理解,接下来我们用图形来表示。假如我们需要在下图中查找71代表的节点排名:

查询初始配置

运用上述流程之后,搜索路径如下:

查询路径

了解这个搜索流程之后,获取一个节点的排名就非常简单了。如果任意层级中的任意节点都记录了这个节点到其同层级右边节点之间所包含的0级节点数量到节点的span字段上,则最后找到的节点排名就等于中间路过的所有触发了向右移动的search_nodespan字段累加值加上向右移动的次数。在上述的搜索路径中,第3层头节点到第三层31节点之间的0级节点数量为4,然后第031节点到第071节点之间的0级节点数量为0span累加值为4,同时向右移动次数为2,因此总体排名为6

我们再用简短的代码将这个流程精确化。首先我们需要给出跳表的结构定义,这里为了精简代码就不把之前的update_ts带入了,只考虑存value字段来参与比较:

template<typename V>
struct skiplist_node
{
	struct node_level_info
	{
		skiplist_node* forward; // 在这个level 链表中的下一个节点
		std::uint32_t span; // 当前节点与这个level 链表中的下一个节点 之间的节点数量
	}
	V value; // 跳表中存储的有序值 外部需要保证跳表中这是一个严格偏序的 不存在相等的情况
	std::vector<node_level_info> levels; // 当前节点存储的所有level信息 索引0对应的是最底层的链表
};

template<typename K, typename V>
class skiplist
{
	const double m_p; // 指定的抽样概率p
	std::unordered_map<K, std::unique_ptr<skiplist_node<V>*>> m_key_to_nodes;
	skiplist_node<V> m_head_node; // head_node里存储的value比最小可能值还小
	skiplist_node<V> m_tail_node;// tail_node里存储的value比最大可能值还大
	int m_level; // 当前跳表的最高层级
};

在这样的结构定义下,搜索节点并获取排名的操作代码就比较简短了:

void skiplist::get_prev_info(std::vector<const skiplist_node<V>*>& prev_nodes, std::vector<std::uint32_t> prev_ranks, const skiplist_node<V>* dest) const
{
	const skiplist_node<V>* search_node = &m_head_node;
	std::uint32_t last_level_rank = 0;
	for(int i = m_level; i>=0; i--)
	{
		prev_ranks[i] = last_leve_rank;
		while(search_node->levels[i].forward->value < dest->value)
		{
			prev_ranks[i] += search_node->levels[i].span + 1;
			search_node = search_node->levels[i].forward;
		}
		last_level_rank = prev_ranks[i];
	}
}
std::uint32_t skiplist::get_rank(const K& p) const
{
	auto p_iter = m_key_to_nodes.find(p);
	if(p_iter == m_key_to_nodes.end())
	{
		return 0;
	}
	std::vector<const skiplist_node<V>*> prev_nodes(m_level + 1, nullptr);
	std::vector<std::uint32_t> prev_ranks(m_level + 1, 0);
	get_prev_info(prev_nodes, prev_ranks, p_iter->second.get());
	return prev_ranks[0];
}

这种要么往右要么往下的二分思想从直觉上来说类似于二叉树的搜索,所以我们猜测其时间复杂度应该与O(logN)差不多,不过这个结论还需要一个更加数学的证明。这里的复杂度分析采用的是从后向前分析查找路径,这个过程可以分为从最底层爬到最顶层和后续左移到头节点两个部分。假设当前我们处于一个第 i 层的节点 x,我们并不知道x 的最大层数和x左侧节点的最大层数,只知道x的最大层数至少为i。如果x的最大层数大于i,那么下一步应该是向上走,这种情况的概率为p;如果x的最大层数等于i,那么下一步应该是向左走,这种情况概率为1-p。现在我们用C(i)来表示一个0级节点在搜索过程中上升到第i级的某个节点时所消耗的时间,根据我们前面的分析,可以得到这样的递推式:

使用这个递推式可以非常简单的得到。在到达最顶端的第k层之后,向左移动的次数最大不会超过这一层的所有节点数量,而第k层的节点数量期望为。所以在节点数为n且层数为k的跳表中,整个的回溯操作时间复杂度为。注意到最顶层的节点数量期望会小于,因为如果大于这个值的话这一层的节点数量乘以升级概率会导致下一级的期望节点数量大于等于1,违背了我们最顶层的定义。所以n个节点的跳表最高层数k下有这样的性质:

所以最大层数k的期望值可以用来表示,这样带入到原来计算出来的整体复杂度,可以得到:

经验算整体复杂度就是O(logN)量级,符合我们之前的直觉。剩下的问题就是我们如何高效的构造出跳表,首先要处理的是如何找到新的节点在0级有序链表中的插入位置。这个插入位置定位其实也是非常巧妙的,完整的利用了之前查询排名时的代码:

// 插入一个新的元素 返回插入之后的排名
std::uint32_t skiplist::insert(const K& p, const V& score)
{
	auto p_iter = m_key_to_nodes.find(p);
	if(p_iter != m_key_to_nodes.end())
	{
		// 这里暂时省略已经在排行榜上的讨论
		return 0;
	}
	skiplist_node<V>* temp_node = new skiplist_node<V>();
	temp_node->value = score;
	std::vector<const skiplist_node<V>*> prev_nodes(m_level + 1, nullptr);
	std::vector<std::uint32_t> prev_ranks(m_level + 1, 0);
	get_prev_info(prev_nodes, prev_ranks, temp_node);
	temp_node->levels.push_back({});
	temp_node->levels[0].forward = prev_nodes[0]->levels[0]->forward;
	prev_nodes[0]->levels[0]->forward = temp_node;
	// 剩下一些代码来执行概率上升的逻辑
	m_key_to_nodes[p] = temp_node;
	return prev_ranks[0] + 1;
}

上面展示的代码成功的将这个新节点插入到了0级有序链表中,但是后续的依概率逐级上升的逻辑并没有体现在这里。所以在找到插入位置之后,需要以这样的形式来执行逐级上升插入:

while(true)
{
	int cur_node_level = temp_node->levels.size();
	if(m_level<cur_node_level)
	{
		// 当前节点高度可能比原来的层数高了 需要新建一层
		m_level++;
		m_head_node.levels.push_back(level_info{&m_tail_node, m_key_to_nodes.size()});
		prev_nodes.push_back(&m_head_node);
		prev_ranks.push_back(m_key_to_nodes.size());
	}
	temp_node->levels.push_back({});
	temp_node->levels[cur_node_level].forward = prev_nodes[cur_node_level]->levels[cur_node_level]->forward;
	prev_nodes[cur_node_level]->levels[cur_node_level]->forward = temp_node;
	if(random() > m_p)
	{
		// 概率测试没有通过,停止上升
		break;
	}
}

上面的代码成功的依照概率p对这个新节点做好了多层链表的上升操作,维护好了链表中的forward指针,但是span字段由于插入了这个新节点导致很多地方都需要调整,所以需要这样的去维护span:

// 原来维护链表指针的代码
for(int i = 0; i< prev_nodes.size(); i++)
{
	if(i < temp_node->levels.size())
	{
		// prev 节点的排名
		uint32_t prev_node_rank = prev_ranks[i];
		// forward节点在插入temp_node后的排名
		uint32_t forward_node_rank = prev_node_rank + prev_nodes[i]->levels[i].span + 1 + 1;
		// temp_node节点的排名
		uint32_t self_node_rank = prev_node_rank[0] + 1;
		// 利用这三个变量来更新temp_node 和prev_node 的span
		prev_nodes[i]->levels[i].span = self_node_rank - prev_node_rank - 1;
		temp_node->levels[i].span = forward_node_rank - self_node_rank - 1;
	}
	else
	{
		// 这一层级大于temp_node的高度 因此只需要将span+1即可
		prev_nodes[i]->levels[i].span++;
	}
}

现在来计算一下整个插入过程的时间复杂度。首先是查找初始插入位置,这部分的复杂度就是之前的查询复杂度,也就是O(logN)。然后执行不断上升,每上升一层所需要的额外代价就是调整两个forward和两个span,所以可以认为这个上升的时间与这个节点的期望高度成线性相关,又由于单节点的期望高度为,所以这个的调整时间可以认为是常数。综上讨论,插入的时间复杂度等价于查询复杂度,都是O(logN)

接下来考虑删除节点这个操作,删除节点的过程就是原来插入节点过程的逆过程,查询了prev_nodes,prev_ranks之后再调整forward,span即可,所以这个删除的复杂度也是O(logN)。至于已有节点的更新,可以通过先从跳表中删除这个节点然后更新积分最后重新插入这个节点来实现,所以这个更新操作的时间复杂度也是O(logN)。可以看出跳表的所有操作都是非常高效无短板,而且相关的实现代码也很简短,因此特别适合用来维护大规模的排行榜。我自己在前面介绍的rank仓库中也提供了一个基于跳表的排行榜实现,代码在include/skiplist_rank.h中。不过在我自己的对比测试test/speed_test中发现不符合直觉的结果,在100w左右的数据量上跳表的排行榜实现与数组的排行榜实现并没有非常显著的性能优势。感觉数组的线性内存读取在缓存结构的加持下比链表的遍历有碾压性的性能优势,否则无法解释这样的结果。

基于树的实现

在没有接触跳表之前,有些人会尝试用二叉平衡树结构去实现一个排行榜。因为对于二叉平衡树来说,插入删除节点的复杂度也是O(logN)。如果需要获取一个节点的排名,则需要在树的节点中定义一个额外字段来存储这个节点对应的子树节点个数:

struct binary_tree_node
{
	binary_tree_node* left;
	binary_tree_node* right;
	std::uint32_t subtree_node_count; // 当前节点对应的子树里总节点个数
	V score;	
};

有了这个字段之后,获取一个节点的排名就是收集从根节点到这个节点的路径上所有触发右转的节点的左子树节点数量的累加值,最后再加上1。下面就是描述这个获取排名的代码:

// 使用者需要保证在树中不存在两个节点有同样的score值
std::uint32_t binary_tree_node::get_rank(const V& other_score) const
{
	std::uint32_t rank_value = 0;
	if(score<=other_score)
	{
		std::uint32_t cur_node_rank = 1;
		if(left)
		{
			cur_node_rank += left->subtree_node_count;
		}
		if(score == other_score)
		{
			return cur_node_rank
		}
		if(right)
		{
			return right->get_rank(other_key, other_score) + cur_node_rank;
		}
		else
		{
			return cur_node_rank + 1;
		}
	}
	else
	{
		return left->get_rank();
	}
}

为了简单起见这里使用递归来不断下降,可以看出这个获取排名的过程与二叉树的查找过程并没有多大的差异,因此这个排名获取的时间复杂度也是O(logN)。既然平衡二叉树结构的所有操作复杂度都是O(logN),那为什么业界基本没有使用平衡二叉树来做排行榜呢。这个主要是因为实现一个平衡二叉树所需的代码实在是太多了,avl树的实现所需代码比跳表来说多了好几倍,更不用说更加复杂的红黑树了。而且树结构的节点访问在内存局部性上与跳表一样劣势很大,所以其在常见的100-1000的排行榜大小上远远不及数组的实现。这两个因素的作用下导致树结构在排行榜实现上基本无人问津。

基于树状数组的实现

有些游戏允许任何拥有积分的玩家都可以查询自己的排名,这样排名值可能到千万级别。前述的数组排行榜已经难堪大用,只剩跳表和平衡二叉树结构能应对这种情况。不过如果这种积分排名中会在低分段聚集绝大部分的玩家,同分玩家数会有非常多。可以利用这个性质来构造这些分数子排行榜,这样一个总榜就可以被切分为上万个子榜,每个子榜单独维护,可以极大的降低整体维护复杂度。不过切分为子榜之后,对于排名值的查询就分为了三步:

  1. 根据分数定位到这个分数所在的子榜,然后通过这个子榜获取榜内排名A
  2. 获取这个子榜前面所有的高排名子榜中的玩家数量之和B
  3. 返回A+B作为最终的排名值

这里的子榜内玩家一般也就不到100000级别,可以使用之前介绍的各种排行榜的数据结构来维护。如果单一子榜内玩家数量过多,此时还可以放弃先到先得的排序规则,进一步根据玩家名字进行hash然后创建更低一级的子榜。这样就形成了三级排行榜结构,保证每一级的排行榜数量都控制在10000以内,处理起来就快的多了。

搞定了第一步子榜内部的维护之后,接下来需要处理第二步,对所有高优先级子榜进行求和。为了简化这个求和问题的讨论,我们假设每个排行榜内的玩家个数都存储到一个数组vector<uint32> rank_nums之中,此时要解决的问题就是给定索引i,对rank_nums[0],...,rank_nums[i-1]这些元素进行求和。这个简单点的实现就是遍历相关排行榜进行累加,即sum(rank_nums.begin(), rank_nums.begin()+i, 0),时间复杂度就是子榜数量的线性复杂度O(N)。考虑到这种排行榜玩家数量为千万,且子榜数量也超过10000,线性复杂度乘以玩家数量也是一个天文数字。所以我们需要寻找一个更优的结构来获取前序子排行榜的容量之和,最好其更新与查询复杂度都是O(logN)级别,而树状数组正是满足这一需求的数据结构。

树状数组也叫做二元索引树Binary Indexed Tree,最早由 PeterM.Fenwick于1994年在A New Data Structure for Cumulative Frequency Tables提出。这个数据结构多用于高效计算数列的前缀和与区间和,这些操作的时间复杂度都是O(logN)。这个数据结构在原始的数组数据a之外创建了一个额外的数组c,数组c中的每个元素都是a中一段连续元素的累加和,ca的对应关系图如下:

树状数组

这张图只是一个粗略的展示,整个ca之间的映射关系有一个完整的形式定义。树状数组中,规定c[x]管辖的区间为长度为,对应a这个闭区间的元素和,其中:

  1. 设二进制最低位为第0位,则k恰好为x二进制表示中,最低位的1所在的二进制位数;
  2. 恰好为x二进制表示中,最低位的1以及后面所有0组成的数

举例来说,88的二进制表示为01011000,所以这个k就是3,此时对应的区间左侧为,因此

事实上我们并不需要求出k来,求其实更加方便,这里只需要使用一个非常简单的二进制操作就可以得到x对应的:

int lowbit(int x) {
  // x 的二进制中,最低位的 1 以及后面所有 0 组成的数。
  // lowbit(0b01011000) == 0b00001000
  //          ~~~~^~~~
  // lowbit(0b01110010) == 0b00000010
  //          ~~~~~~^~
  return x & -x;
}

这里使用的主要是补码操作,类似巧妙的位操作在Hacker's Delight里还介绍了很多,有兴趣的读者可以尝试去看看。

有了这个结构之后,我们再来探究如何利用这个结构高效的计算出。首先我们先获取c[n],以及其覆盖的区间[m, n],此时可以得到:

这样问题就转换为了同类型但是处理范围更小的问题上,使用这个转化方法来求的最终过程如下,红色的元素就是最终被访问到的数组c元素:

树状数组的前缀和查询

对应的cpp代码也非常简短:

std::uint64_t pre_sum(std::uint32_t k)
{
	std::uint64_t result = 0;
	while(true)
	{
		result += c[k];
		k -= lowbit(k);
		if(k==0)
		{
			return result;
		}
	}
}

由于m-1=n-lowbit(n),也就是说新的子问题对应的n2相当于将原来的n的二进制表示最低位的1变换为0。由于n的二进制表示里1的数量不超过log(N)+1,所以最多只需要执行log(N)+1次这样的操作就可以获取的值。由于每次转化操作所需操作只有一个求和和lowbit调用,所以求前缀和的整体复杂度就是O(logN)。也就是说在树状数组的帮助下,我们可以在对数时间复杂度内获取任意一个玩家的排名!

讲完了如何使用树状数组来优化排行榜的查询,接下来我们来讲如何在排行榜更新过程中去维护这个对应的树状数组。假设分段k对应的子榜新加入了一个玩家,这样就会导致a[k]+=1,此时我们需要去维护受影响的c。从上面的图可以观察出a[k]会立即更新到c[k]。然后对于任意c[k],其覆盖的区间总是被c[k+lowbit(k)]所覆盖的区间所包围,因此需要递归更新:

void update(std::uint32_t k, int delta)
{
	while(true)
	{
		c[k]+=delta;
		k= k + lowbit(k);
		if(k >= c.size())
		{
			return;
		}
	}
}

由于循环过程中的每次操作都会将k的二进制表示中最低位的1向左推进起码一位,因此整个循环次数不会大于log(p) + 1,其中pc的大小。由于每次循环中的消耗也只是两次加法和一次lowbit调用,因此整体的更新复杂度也是对数复杂度。

如果一个玩家的分数更新之后导致从子榜m切换到n,则我们需要先对a[m]做一次减法更新update(m,-1),然后再对a[n]做一次加法更新update(n,1)。两个操作都是对数时间复杂度,因此整个切换子榜的操作也是对数时间复杂度。

综上所述,使用树状数组可以非常高校的实现多级分段排行榜的查询与更新,所有操作的复杂度都是对数时间复杂度。

Mosaic Game 中的排行榜

mosaic_game中也集成了排行榜服务,其实现在roles/server/service_server/include/service/rank_service.h上。这个服务在启动的时候会去读取配置文件rank_list.json里的排行榜配置来生成多个由https://github.com/huangfeidian/rank/include/array_rank.h为底层数据结构的排行榜。在创建好这些没有数据的排行榜之后,再读取之前存在数据库中的历史排行榜数据,来实现排行榜的数据加载:

// bool rank_service::init(const json::object_t& data);
auto cur_rank_ptr = std::make_unique<system::rank::array_rank>(cur_rank_name, cur_rank_sz, cur_rank_pool_sz, -100000, 100000);

m_array_ranks[cur_rank_name] = std::move(cur_rank_ptr);
tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::find_one, std::string{}, "", collection_name());
auto cur_db_callback = [cur_rank_name, this](const json& db_reply)
{
	load_rank_cb(cur_rank_name, db_reply);
};
json query;
query["name"] = cur_rank_name;
auto cur_find_task = tasks::db_task_desc::find_task::find_one(cur_task_base, json(query),  {});
auto cur_server = get_server();
cur_server->call_db(cur_find_task->to_json(), cur_db_callback);
m_remain_ranks_to_load.insert(cur_rank_name);

为了支持排行榜数据的存库,我们还在排行榜基类上提供了与json之间进行转换的接口:

virtual json rank_interface::encode() const
{
	json result;
	result["name"] = m_name;
	result["impl_name"] = rank_impl_name();
	result["rank_sz"] = m_rank_sz;
	result["pool_sz"] = m_pool_sz;
	result["min_value"] = m_min_value;
	result["max_value"] = m_max_value;
	return result;
}
virtual bool rank_interface::decode(const json& data) = 0;

json array_rank::encode() const
{
	std::vector<rank_info> temp_sorted_rank_info;
	temp_sorted_rank_info.reserve(m_sorted_rank_ptrs.size());
	for (const auto &one_key : m_sorted_rank_ptrs)
	{
		temp_sorted_rank_info.push_back(*one_key.ptr);
	}
	json result = rank_interface::encode();
	result["sorted_ranks"] = temp_sorted_rank_info;
	return result;
}

bool array_rank::decode(const json &data)
{
	std::uint32_t rank_sz;
	std::uint32_t pool_sz;
	std::string name;
	std::vector<rank_info> temp_sorted_rank_info;
	try
	{
		data.at("name").get_to(name);
		data.at("pool_sz").get_to(pool_sz);
		data.at("rank_sz").get_to(rank_sz);
		data.at("sorted_ranks").get_to(temp_sorted_rank_info);
	}
	catch (std::exception &e)
	{
		assert(false);
		return false;
	}
	reset(temp_sorted_rank_info);
	return true;
}

排行榜的更新操作是一个低频可控操作,但是排行榜的查询操作却是一个完全由玩家行为决定的不可用操作,而且很多排行榜处理的查询都是返回所有榜上结果的整榜查询,所以rank_service的绝大部分cpu时间都是在处理查询任务上。为了减轻rank_servicecpu负担,mosaic_game中将排行榜的查询功能委托到了每个进程都有的rank_manager单例上,这个单例会拥有rank_service上所有排行榜数据的一个只读副本,以供Space进程来执行本地查询。

这个rank_manager在所在进程启动之后往rank_service注册,获取此时所有排行榜的全量数据副本,然后以这些数据副本来初始化一系列的arary_rank:

void rank_manager::sync_full_msg(const utility::rpc_msg& data, const json::array_t& all_rank_datas)
{
	m_timer_mgr.cancel_timer(m_register_timer);
	m_rank_datas.clear();
	for(const auto& one_rank_data: all_rank_datas)
	{
		auto temp_array_rank = system::rank::array_rank::create(one_rank_data);
		if(!temp_array_rank)
		{
			m_logger->error("fail to create rank with data {}", one_rank_data.dump());
			continue;
		}
		m_rank_datas[temp_array_rank->m_name] = std::move(temp_array_rank);
	}
}

这个只是rank_manager启动的时候执行与rank_service之间的数据同步,如果后续rank_service由于玩家上报了新的积分导致排行榜发生变化,也需要将这个变化同步到所有注册过来的rank_manager。简单的实现可以将rank_service接收到的所有的更新请求都往注册过来的rank_manager转发一份,让rank_manager也同步执行所有更新。不过这个暴力的实现会导致很多性能的浪费,因为绝大部分的玩家更新都不会引发排行榜的变动。为了知道一次update操作是否引发了排行榜的变化,之前的排行榜基类rank_interface::update接口就不能只返回这个玩家的最新排名,还需要返回一些额外信息:

struct update_rank_result
{
	std::uint32_t pre_rank = 0; // 更新前的排名
	std::uint32_t new_rank = 0; // 更新后的排名
	std::uint64_t update_ts = 0; // 更新后的玩家积分时间戳
};
// rank 为1 代表第一名  为0 代表不在排行榜上
virtual update_rank_result update(const rank_info& one_player) = 0;

有了这个返回数据之后,我们就可以非常方便的判定此次更新是否会影响到排行榜,结果中的pre_ranknew_rank任何一个是有效排名值都会影响到排行榜,只有都不是有效排名值的时候才不会影响:

void rank_service::update_rank(const utility::rpc_msg& msg, const std::string& rank_name, const std::string& player_id, const json::object_t& player_info, double rank_value)
{
	if(m_delay_broadcast_timer.valid())
	{
		return;
	}
	auto cur_rank_iter = m_array_ranks.find(rank_name);
	if(cur_rank_iter == m_array_ranks.end())
	{
		return;
	}

	system::rank::rank_info new_rank_info;
	new_rank_info.player_id = player_id;
	new_rank_info.player_info = player_info;
	new_rank_info.rank_value = rank_value;
	auto cur_update_result = cur_rank_iter->second->update(new_rank_info);
	new_rank_info.update_ts = cur_update_result.update_ts;
	auto cur_rank_pool_sz = cur_rank_iter->second->m_rank_sz;
	bool should_sync_manager = false;
	if(cur_update_result.new_rank >0 && cur_update_result.new_rank <= cur_rank_pool_sz)
	{
		should_sync_manager = true;
	}
	else
	{
		if(cur_update_result.pre_rank >0 && cur_update_result.pre_rank <= cur_rank_pool_sz)
		{
			should_sync_manager = true;
		}
	}
	if(should_sync_manager)
	{
		sync_rank(rank_name, "update_rank", json(new_rank_info));
	}
	
}

这里的sync_rank接口就是负责将这个排行榜的更新操作广播到所有注册过来的rank_managerrank_manager接收到这个update_rank指令之后就会执行一次更新的回放,从而达到与rank_service的同步:

void rank_manager::sync_rank(const utility::rpc_msg& data, const std::string& rank_name, const std::string& sync_cmd, const json& sync_data)
{
	auto cur_rank_iter = m_rank_datas.find(rank_name);
	if(cur_rank_iter == m_rank_datas.end())
	{
		return;
	}
	auto& cur_array_rank = *cur_rank_iter->second.get();
	if(sync_cmd == "update_rank")
	{
		system::rank::rank_info temp_rank_info;
		try
		{
			sync_data.get_to(temp_rank_info);
		}
		catch(const std::exception& e)
		{
			m_logger->error("sync_rank  rank {} cmd {} data {} decode fail error {}", rank_name, sync_cmd, sync_data.dump(), e.what());
			return;
		}
		cur_array_rank.update(temp_rank_info);
		return;
	}
}

在客户端的排行榜展示界面中,除了要展示榜上玩家的唯一id与积分值之外,还要显示玩家的很多描述性字段,例如名称、门派、职业、种族、帮派、等级等各种杂七杂八的头像框信息。由于这些信息都是在随时变动的,所以在这些信息变动的时候都需要通知rank_service来执行玩家信息的更新操作:

void rank_service::update_player_info(const utility::rpc_msg& msg, const std::string& rank_name, const std::string& player_id, const json::object_t& player_info)
{
	if(m_delay_broadcast_timer.valid())
	{
		return;
	}
	auto cur_rank_iter = m_array_ranks.find(rank_name);
	if(cur_rank_iter == m_array_ranks.end())
	{
		return;
	}
	
	if(!cur_rank_iter->second->update_player_info(player_id, player_info))
	{
		return;
	}
	json temp_array;
	temp_array.push_back(player_id);
	temp_array.push_back(player_info);
	sync_rank(rank_name, "update_player_info", temp_array);
}

由于玩家自己并不知道自己是否在排行榜上,所以当其中某一个字段发生变化之后,玩家需要向所有的排行榜发出一个更新player_info的操作,更新之后还需要通过sync_rank广播到所有的rank_manager上。在player_info里字段越来越多的时候,这个广播操作也是越来越频繁,而且频率远比更新排名积分高。实际上维护这个player_info并不是排行榜的本职需求,假如客户端可以以某种途径方便的通过player_id拿到这个玩家的最新player_info的话,rank_service这里可以删除player_info的维护逻辑,这样就可以进一步大规模的减轻排行榜服务的负担。