MosaicGame外围系统接入

游戏服务器由于游戏实时性的需要,会将所有数据都放在内存里来方便的进行读写,而不是每次都通过一些接口来与外部数据存储系统进行交互。但是这样也会带来一些问题,比如数据的持久化问题,特别是进程崩溃引发的玩家数据丢失的问题。为了解决这个问题,游戏服务器会定期将内存中的数据同步到外部数据存储系统中。除了这个持久化问题之外,有些非实时交互的功能需要去读取游戏相关的数据,这种延迟要求不高的数据查询一般都是使用外部存储系统去支持,而不是直接去查询游戏内存中的数据。因为数据查询很多时候会访问大量的数据并执行查询过滤,同时下发的数据量也会很大,这样如果直接使用游戏进程去承载这些数据查询的话负载会非常的大。基于数据持久化与负载优化的考量,游戏服务器里会不可避免的接入外围系统,这些外围系统里最主要的就是缓存系统和数据库系统。缓存系统目前已经有了标准化的解决方案Redis,在mosaic_game里也同样的使用Redis作为缓存系统的实现。至于数据库系统,目前游戏业界一般使用的是MySql或者MongoDB,由于MongoDB使用起来比较简单,所以在mosaic_game里采用了MongoDB作为数据库后端,其实使用MySQL作为数据库后端对于代码上的修改也没多少,因为上层基本都有接口封装,不会直接与数据库进行交互。接下来对这两个系统的接入部分来做一些阐述。

基于Redis的缓存系统

Redis提供了很多的功能,最知名的就是KV缓存和排行榜。在游戏业务里主要用的是其KV缓存功能,主要用来存储一些实时性要求不高的数据,比如玩家的属性快照。至于排行榜功能,游戏内一般都使用自己开发的组件,因为游戏排行榜有很多奇怪的结算逻辑,对比排序逻辑就不怎么重要了,因此没必要去与Redis做一些额外的交互。

游戏里使用RedisKV缓存的数据一般都是读取操作的数量远比写入操作高,例如其他玩家的头像信息,这些头像信息一般来说只有玩家升级、更换体型和门派的时候才需要去更新,但是这个头像数据很容易出现在客户端的各种UI里,最明显的样例就是排行榜UI里需要同时显示数十个玩家角色的头像信息,所以这个头像信息的查询操作远远比更新操作频繁。特别是在排行榜刷新的那一刻,由于大量玩家想要看到最新的排行榜结果,会导致巨量的玩家同时刷新排行榜,而排行榜上单页面就可以引发数十个头像数据查询操作,此时的查询QPS很容易达到上万级别。如果每次去获取一个玩家对应的最新头像信息都去查询数据库,那么面对这种巨量同时查询的情况很容易把后端数据库打爆,即使后端数据库使用了分布式集群。所以游戏服务器业务里一般会把这些被客户端频繁查询的数据都放到Redis集群里,同时这个Redis集群提供基于HTTP的查询接口,然后客户端在查询这些数据的时候使用基于HTTP接口封装的批量查询MGET操作。这样即使出现短时间内大批量的数据查询,也只是让Redis的集群负载升高和客户端的响应延迟升高而已,并不会影响到真正的游戏服务器。由于Redis在处理缓存查询的速度远比数据库查询的速度快,所以基于HTTP接口的查询延迟在普通情况下都会比基于服务器转发数据库查询的延迟小很多,即使HTTP会涉及到基于TCP的三次握手延迟。

mosaic_game里,专门在player上添加了一个player_redis_component来封装所有与redis的交互,在这个组件上提供了将玩家基本信息上传到Redis集群的接口update_player_info_for_redis,这个接口会将当前玩家的属性里所有添加了sync_redis标记的属性打包为一个json,主要是base_propteam_prop里的一些字段:

enum class property_flags_enum: std::uint8_t
{
	save_db = 0,
	sync_self,
	sync_ghost,
	sync_other,
	sync_redis,
	sync_leader,
};

	class property_flags
	{
	
	private:
		
		// 省略其他字段
		const static std::uint64_t sync_redis_bit = 1 << std::uint8_t(property_flags_enum::sync_redis);

	public:
		const static std::uint64_t no_proxy = 0;
		const static std::uint64_t sync_redis = sync_redis_bit;
		// 省略其他字段
		const static std::uint64_t mask_all = std::numeric_limits<std::uint64_t>::max();
	};

class Meta(property) base_prop
{
public:
	Meta(property(sync_clients, save_db, sync_redis)) std::string m_account_name;
	Meta(property(sync_clients, save_db, sync_redis)) std::string m_entity_id;
	Meta(property(sync_clients, save_db)) std::uint64_t m_create_ts;
	Meta(property(sync_clients, save_db, sync_redis)) std::string m_nickname;
	Meta(property(sync_clients, save_db, sync_redis)) std::uint32_t m_sect = 0;
	Meta(property(sync_clients, save_db, sync_redis)) std::uint32_t m_level = 0;
	Meta(property(sync_self, save_db)) std::uint32_t m_exp = 0;
	Meta(property()) std::uint64_t m_save_db_ts = 0;
	Meta(property(sync_ghost)) std::string m_account_anchor;
	#ifndef __meta_parse__
	#include "player/base_prop.generated.inch"
	#endif
};

void player_redis_component::update_player_info_for_redis()
{
	json::object_t encode_result;
	m_player->prop_data().encode_with_flag(spiritsaway::property::property_flags{mosaic_game::property::property_flags::sync_redis}, true, encode_result);
	std::string cur_redis_str = redis::command::hash::set("Player", m_owner->entity_id(), json(encode_result).dump());
	redis::redis_task_desc cur_redis_task;
	cur_redis_task.cmds.push_back(std::move(cur_redis_str));

	m_owner->call_redis(json(cur_redis_task), utility::mixed_callback_manager::callback_handler{});
}

这个接口承接了一些事件监听操作,例如玩家登录、升级等事件。当玩家触发了这些事件时,这个Redis更新接口就会被调用:

bool player_redis_component::init(const json& data)
{
	m_player = dynamic_cast<player_entity*>(m_owner);
	if(!m_player)
	{
		return false;
	}
	m_player->login_dispatcher().add_listener(&player_redis_component::on_login, this);
	m_player->levelup_dispatcher().add_listener(&player_redis_component::update_player_info_for_redis, this);
	return true;
}

void player_redis_component::on_login(bool is_relay)
{
	if(is_relay)
	{
		return;
	}
	update_player_info_for_redis();
}

上面的Redis样例代码里展示了如何去更新Redis里的玩家信息,其实就是对于Player这个集合做HSET操作,将玩家的一些Redis可见属性字段构造出来的Json字符串存储到Redis里。但是这里mosaic_game并没有直接将HSET这个指令发送到redis_server进程,而是自己对常用的Redis接口进行了封装,当前封装了hashlistsetzset等各种数据结构的操作,对应的代码文件在common/redis_logic/文件夹下:

namespace spiritsaway::mosaic_game::redis::command
{
	class hash
	{
	public:
		static std::string set(const std::string& name, const std::string& key, const std::string& value);
		static std::string set(const std::string& name, const std::vector<std::pair<std::string, std::string>>& kvs);
		static std::string setnx(const std::string& name, const std::string& key, const std::string& value);
		static std::string get(const std::string& name, const std::string& key);
		static std::string get(const std::string& name, const std::vector<std::string>& keys);
		// 省略其他指令	
	};
	// 省略其他数据结构的操作

	std::string hash::set(const std::string& name, const std::string& key, const std::string& value)
	{
		std::vector<std::string> result;
		result.push_back("HSET");
		result.push_back(name);
		result.push_back(key);
		result.push_back(value);
		return cmd_join(result);
	}
	std::string hash::setnx(const std::string& name, const std::string& key, const std::string& value)
	{
		std::vector<std::string> result;
		result.push_back("HSETNX");
		result.push_back(name);
		result.push_back(key);
		result.push_back(value);
		return cmd_join(result);
	}
	std::string hash::set(const std::string& name, const std::vector<std::pair<std::string, std::string>>& kvs)
	{
		std::vector<std::string> result;
		result.reserve(2*kvs.size() + 2);
		result.push_back("HMSET");
		result.push_back(name);
		for(const auto& [k, v]: kvs)
		{
			result.push_back(k);
			result.push_back(v);
		}
		
		return cmd_join(result);
	}
}

这样封装的作用是避免业务层直接对接缓存系统的具体实现,而是通过封装的接口来进行操作,从而可以在不改变业务逻辑的情况下,切换不同的缓存系统实现,毕竟KV数据库又不是只有Redis一个。

上述封装指令每次执行之后都会构造出一个std::string对象,这个对象就是Redis指令的字符串表示,mosaic_game不会直接将这个字符串对象发送到redis_server进程,而是将这个字符串对象封装到redis_task_desc这个对象里,这个对象里的cmds可以塞入多个指令来执行复合操作,channel字段则是用来区分有序执行通道来使用的,这个有序执行将在后面被介绍:

struct redis_task_desc
{
	std::string channel;
	std::vector<std::string> cmds;
	NLOHMANN_DEFINE_TYPE_INTRUSIVE(redis_task_desc, channel, cmds)
};

最后通过call_redis接口将这个redis_task_desc提交到游戏服务器里的redis_server角色的进程去委托执行:

void server_entity::call_redis(const json& redis_task, const utility::mixed_callback_manager::callback_handler entity_callback_id)
{
	get_server()->call_redis(redis_task, this, entity_callback_id);
}

void space_server::call_redis(const json& redis_task, entity::server_entity* cur_entity, const utility::mixed_callback_manager::callback_handler entity_callback_id)
{
	json redis_request_info, redis_request_param;
	auto cur_callback_id = create_entity_callback(cur_entity, entity_callback_id );
	redis_request_param["callback_id"] = cur_callback_id;
	redis_request_param["request_detail"] = redis_task;
	redis_request_info["cmd"] = "redis_request";
	redis_request_info["param"] = std::move(redis_request_param);
	m_router->push_msg(*m_local_name_ptr, "redis_server", redis_request_info.dump(), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	m_logger->info("call_redis for entity {} entity_cb {} global_cb {}", cur_entity->entity_id(), entity_callback_id.value(), cur_callback_id);
}

这个call_redis接口允许注册一个回调函数,当redis_server进程处理完这个请求后,会将结果返回给space_server进程,space_server进程会根据这个回调函数的entity_callback_id来调用对应的回调函数。只不过目前的update_player_info_for_redis并不需要这个回调函数,所以提供了一个默认构造的空callback_handler作为参数。

在当前的mosaic_game的架构设计中,可以同时存在多个拥有redis_server角色的进程,而且每个redis_server进程所提供的服务都是无状态的,因此当一个进程想要请求执行Redis相关操作的时候,可以选择任意一个redis_server进程去处理这个请求。但是为了避免一个space_server与所有的redis_server都执行连接,这里会在space_server连接到mgr_server的时候会请求分配一个redis_server:

void space_server::on_connect(std::shared_ptr<network::net_connection> connection)
{
	json_stub::on_connect(connection);
	auto connection_name_ptr = get_connection_name(connection.get());
	if (*connection_name_ptr == m_upstream_server.name)
	{
		request_allocate_counter("online_session");
		request_allocate_resource_server("db_server");
		request_allocate_resource_server("redis_server");
	}
	// 省略后续代码
}

void space_server::request_allocate_resource_server(const std::string& resource_server_type)
{
	json request_msg, request_param;
	request_msg["cmd"] = "request_allocate_resource";
	request_param["from_server_name"] = m_local_server.name;
	request_param["from_server_type"] = m_local_server.type;
	request_param["resource_server_type"] = resource_server_type;
	request_msg["param"] = request_param;
	auto msg_ptr = std::make_shared<std::string>(request_msg.dump());
	auto remote_name_ptr = std::make_shared<std::string>(m_upstream_server.name);
	if (!m_router->push_msg(m_local_name_ptr, remote_name_ptr, msg_ptr, enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0)))
	{
		add_timer_with_gap(std::chrono::milliseconds(2 * 1000), [resource_server_type, this]() {
			request_allocate_resource_server(resource_server_type);
			});
	}
}

mgr_server接收到这个redis_server的分配请求后,会根据当前的redis_server进程列表里的负载情况进行排序,并选择其中负载最低的去执行绑定,这里的负载其实就是绑定了这个redis_server的进程数量:

std::string mgr_server::allocate_resource_svr(std::unordered_map<std::string, resource_stub_info>& resource_svrs, const std::string& from_server_name)
{
	if(resource_svrs.empty())
	{
		return {};
	}
	std::vector<std::pair<std::string, std::size_t>> resource_server_loads;
	resource_server_loads.reserve(resource_svrs.size());
	for (const auto& one_pair : resource_svrs)
	{
		if(!one_pair.second.ready)
		{
			continue;
		}
		resource_server_loads.emplace_back(one_pair.first, one_pair.second.connected_svrs.size());
	}
	std::sort(resource_server_loads.begin(), resource_server_loads.end(), [](const std::pair<std::string, std::size_t>& a, const std::pair<std::string, std::size_t>& b)
		{
			return a.second < b.second;
		});
	auto dest_resource_svr = resource_server_loads[0].first;
	resource_svrs[dest_resource_svr].connected_svrs.insert(from_server_name);
	return dest_resource_svr;
}

std::string mgr_server::allocate_resource_for_game(const std::string& space_server_name, const std::string& resource_svr_type)
{

	auto cur_game_iter = m_space_stub_infos.find(space_server_name);
	if (cur_game_iter == m_space_stub_infos.end())
	{
		return {};
	}
	auto temp_iter = cur_game_iter->second.resource_svrs.find(resource_svr_type);
	if(temp_iter != cur_game_iter->second.resource_svrs.end())
	{
		return temp_iter->second;
	}
	std::string result_svr;
	if(resource_svr_type == "redis_server")
	{
		result_svr = allocate_resource_svr(m_redis_stub_infos, space_server_name);
	}
	else
	{
		result_svr = allocate_resource_svr(m_db_stub_infos, space_server_name);
	}
	if(!result_svr.empty())
	{
		cur_game_iter->second.resource_svrs[resource_svr_type] = result_svr;
	}
	return result_svr;
}

space_server接收到redis_server的分配结果后,就会使用connect_to_server发起一个网络连接到这个redis_server进程:

void space_server::on_reply_allocate_resource(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
	stub_info cur_resource_svr;
	std::string resource_server_type;
	std::string cur_err;
	try
	{
		msg.at("resource_server_type").get_to(resource_server_type);
		msg.at("errcode").get_to(cur_err);
		if(!cur_err.empty())
		{
			m_logger->warn("on_reply_allocate_resource errcode {}", cur_err);
			add_timer_with_gap(std::chrono::milliseconds(2 * 1000), [resource_server_type, this]() {
				request_allocate_resource_server(resource_server_type);
			});
			return;
		}
		msg.at("resource_svr").get_to(cur_resource_svr);
	}
	catch (std::exception& e)
	{
		m_logger->warn("on_reply_allocate_resource msg invalid {} error {}", msg.dump(4), e.what());
		return;
	}

	m_named_servers[cur_resource_svr.name] = cur_resource_svr;
	connect_to_server(cur_resource_svr.name);

}

当与这个redis_server进程建立好连接后,space_server就会使用link_anchor_to_connection将这个redis_server的连接绑定到redis_server通信地址上,这样后续的push_msg时使用redis_server这个名字就会自动的找到这个连接:

void space_server::on_connect(std::shared_ptr<network::net_connection> connection)
{
	json_stub::on_connect(connection);
	auto connection_name_ptr = get_connection_name(connection.get());
	if (*connection_name_ptr == m_upstream_server.name)
	{
		// 省略连接到mgr_server的逻辑
	}
	else
	{
		auto cur_server_iter = m_named_servers.find(*connection_name_ptr);
		if (cur_server_iter == m_named_servers.end())
		{
			return;
		}
		if (cur_server_iter->second.type == "db_server")
		{
			m_router->link_anchor_to_connection("db_server", connection.get());
			m_logger->info("m_connected_resource_servers add {}", *connection_name_ptr);
		}
		else if(cur_server_iter->second.type == "redis_server")
		{
			m_router->link_anchor_to_connection("redis_server", connection.get());
			m_logger->info("m_connected_resource_servers add {}", *connection_name_ptr);
		}
		else
		{
			return;
		}
		if(m_router->has_anchor("redis_server") && m_router->has_anchor("db_server"))
		{
			json report_ready_info, temp_param;
			report_ready_info["cmd"] = "report_server_ready";
			temp_param["server_name"] = *m_local_name_ptr;
			temp_param["server_type"] = m_local_server.type;
			report_ready_info["param"] = temp_param;
			m_router->push_msg( m_local_name_ptr,std::make_shared<std::string>(m_upstream_server.name), std::make_shared<std::string>(report_ready_info.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
		}
		
	}

}

当本地存在redis_server对应的anchor之后,当前的space_server才有可能通知mgr_server本进行已经ready,可以处理业务逻辑了。所以任意Entity在调用Redis接口的时候,可以保证redis_server这个anchor有绑定的NetConnection,从而可以将请求发送到redis_server进程。

redis_server进程的on_server_control_msg里会解析出space_server发出的redis_server请求相关参数,得到redis_task_desccallback_id之后,就会构造出一个redis_task对象,并投递到当前的任务队列redis_task_channels里处理,当这个任务处理完成之后,对应的回调lambda就会被调用,在这个回调里负责将执行结果进行封装并以reply_redis_request这个control_msg通知回redis_request的发起方:

auto cur_lambda = [=](const std::vector<redis::redis_reply> &redis_replys)
{
	json reply;
	reply["cmd"] = "reply_redis_request";
	json reply_param;
	reply_param["callback_id"] = callback_id;
	json::array_t array_result;
	for (const auto &one_reply : redis_replys)
	{
		// 省略每个redis_reply的打包逻辑
	}
	reply_param["result"] = array_result;

	reply["param"] = reply_param;
	auto reply_str = std::make_shared<const std::string>(reply.dump());
	add_task_to_main_loop([=, reply_str=std::move(reply_str)]()
	{
		m_router->push_msg(m_local_name_ptr, from, reply_str, enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	});
	
};
auto cur_task = std::make_shared<redis::redis_task>(cur_task_desc, cur_lambda);
m_logger->info("add redis_task {}", json(cur_task_desc).dump());
redis_task_channels.add_task(cur_task);

space_server接收到redis_server发回的reply_redis_request消息之后,就会调用on_reply_async_request来处理这个请求结果,这个函数会根据callback_id将之前注册的回调函数取出来并调用,从而完成整个redis异步请求的流程:

void space_server::on_reply_async_request(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const json& msg)
{
	std::uint64_t temp_callback_id;
	json result;

	try
	{
		msg.at("callback_id").get_to(temp_callback_id);
		msg.at("result").get_to(result);


	}
	catch (std::exception& e)
	{
		m_logger->error("fail to handle on_reply_async_request from {} msg {} error {}", *from, msg.dump(), e.what());
		return;
	}
	m_callback_manager.invoke_callback(m_callback_manager.reconstruct_handler(temp_callback_id), result);
}

基于MongoDB的数据库

Entity与数据库的交互模式与之前介绍的Redis交互模式非常类似,也是通过一个call_db接口来间接调用数据库的:



void server_entity::call_db(const json& db_task, const utility::mixed_callback_manager::callback_handler entity_callback_id)
{
	get_server()->call_db(db_task, this, entity_callback_id);
}

void space_server::call_db(const json& db_task, entity::server_entity* cur_entity, const utility::mixed_callback_manager::callback_handler entity_callback_id)
{
	json db_request_info, db_request_param;
	auto cur_callback_id = create_entity_callback(cur_entity, entity_callback_id );
	db_request_param["callback_id"] = cur_callback_id;
	db_request_param["request_detail"] = db_task;
	db_request_info["cmd"] = "db_request";
	db_request_info["param"] = std::move(db_request_param);
	m_router->push_msg(*m_local_name_ptr, "db_server", db_request_info.dump(), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	m_logger->info("call_db for entity {} entity_cb {} global_cb {}", cur_entity->entity_id(), entity_callback_id.value(), cur_callback_id);
}

这里会往db_server这个anchor发送最终的数据库请求。与redis_server这个anchor一样,db_server进程角色支持同时存在多个实例。每次space_server在启动的时候都会向mgr_server请求一个可用的db_server实例去执行连接与绑定。当一个space_server绑定好了redis_serverdb_server之后,才能向mgr_server汇报ready:

void space_server::on_connect(std::shared_ptr<network::net_connection> connection)
{
	// 省略很多代码
	if (cur_server_iter->second.type == "db_server")
	{
		m_router->link_anchor_to_connection("db_server", connection.get());
		m_logger->info("m_connected_resource_servers add {}", *connection_name_ptr);
	}
	else if(cur_server_iter->second.type == "redis_server")
	{
		m_router->link_anchor_to_connection("redis_server", connection.get());
		m_logger->info("m_connected_resource_servers add {}", *connection_name_ptr);
	}
	else
	{
		return;
	}
	if(m_router->has_anchor("redis_server") && m_router->has_anchor("db_server"))
	{
		json report_ready_info, temp_param;
		report_ready_info["cmd"] = "report_server_ready";
		temp_param["server_name"] = *m_local_name_ptr;
		temp_param["server_type"] = m_local_server.type;
		report_ready_info["param"] = temp_param;
		m_router->push_msg( m_local_name_ptr,std::make_shared<std::string>(m_upstream_server.name), std::make_shared<std::string>(report_ready_info.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	}
}

db_serveron_server_control_msg接口接收到db_request请求之后,就会与redis类似的形式去构造一个db_task,绑定好db_task执行完成之后的通知回调之后,就会将这个db_task加入到内部的工作队列里去执行:

bool db_server::on_server_control_msg(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const std::string& cmd, const json& msg)
{
	// 省略很多代码
	auto cur_lambda = [=](const db_task_desc::task_reply& db_reply)
	{
		json reply;
		reply["cmd"] = "reply_db_request";
		json reply_param;
		reply_param["errcode"] = std::string();
		reply_param["callback_id"] = callback_id;
		reply_param["result"] = db_reply.to_json();
		reply["param"] = reply_param;
		auto reply_str = std::make_shared<const std::string>(reply.dump());
		add_task_to_main_loop([=,reply_str=std::move(reply_str)]()
		{
			m_router->push_msg(m_local_name_ptr, from, reply_str, enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
		});
	};
	auto cur_db_task = std::make_shared<mongo_task>(cur_task, cur_lambda, m_logger);
	m_logger->info("add db_task {}", db_request_detail.dump(4));
	mongo_task_channels.add_task(cur_db_task);
	return true;
}

这个reply_db_request回调RPC也会被space_server利用on_reply_async_request接口来处理,这里跟redis_server的处理逻辑是一致的:

bool space_server::on_server_control_msg(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const std::string& cmd, const json& msg)
{
	if(json_stub::on_server_control_msg(con, from, cmd, msg))
	{
		return true;
	}
	if (cmd == "request_create_account")
	{
		on_request_create_account(con, from, msg);
		return true;
	}
	else if(cmd == "reply_db_request")
	{
		on_reply_async_request(con, from, msg);
		return true;
	}
	else if(cmd == "reply_redis_request")
	{
		on_reply_async_request(con, from, msg);
		return true;
	}
	else if(cmd == "reply_service_request")
	{
		on_reply_async_request(con, from, msg);
		return true;
	}
	// 省略其他分支
}
	

Redis的接口是比较有限的,所以可以很方便的封装常见的Redis操作。但是由于mosaic_game选择的是MongoDB这个NOSQL作为数据库,其数据库操作接口非常灵活,所以无法简单对其进行封装,因此在调用的地方不得不直接使用MongoDB的一些语法。例如下面的玩家定时存库接口,其数据更新部分db_doc里使用了$set这个MongoDB专用语法,代表使用query_doc作为查询条件查到对应的玩家doc之后,将m_prop_data里的存库数据对应的json整体更新到原来的doc上:

void player_entity::save_db()
{
	cancel_timer(m_save_db_timer);

	auto cur_db_calback = [this](const json& db_reply)
	{
		this->save_db_back(db_reply);
	};
	auto cur_db_callback_id = m_callback_mgr.add_callback(cur_db_calback);
	tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::update_one, std::string{}, std::to_string(cur_db_callback_id.value()), "Player");
	json query_doc, db_doc;
	query_doc["base.entity_id"] = entity_id();
	db_doc["$set"] = m_prop_data.encode_with_flag(spiritsaway::property::property_flags{spiritsaway::mosaic_game::property::property_flags::save_db}, true, false);
	auto cur_update_task = tasks::db_task_desc::update_task::update_one(cur_task_base, query_doc, db_doc, false);

	call_db(cur_update_task->to_json(), cur_db_callback_id);
}

虽然简单而又完美的对MongoDB的数据库操作进行封装是不可能的,但是在mosaic_game里还是尽量的对常用操作进行了封装, 封装代码在common/db_logic/include/db_task_desc.h里,下面就是被封装的所有操作的枚举类型定义:

enum class task_op
{
	invalid = 0,
	find_one, // 查找符合条件的一个文档
	find_multi, // 查找符合条件的多个文档 
	count, // 统计符合条件的文档数量
	insert_one, // 插入一个文档
	insert_multi, // 插入多个文档
	update_one, // 更新符合条件的一个文档
	update_multi, // 更新符合条件的多个文档
	delete_one, // 删除符合条件的一个文档
	delete_multi, // 删除符合条件的多个文档
	modify_update, // 修改符合条件的文档,仅更新指定字段 如果不存在则创建新文档
	modify_delete // 修改符合条件的文档,仅删除指定字段
};

对于上面的每个枚举值都会有一个静态函数来构造std::shared_ptr<db_task_desc::base_task>, 例如上面样例代码里的update_one就会构造出一个继承自db_task_desc::base_taskdb_task_desc::update_task类型的智能指针:

class update_task: public base_task
{
protected:
	bool m_multi = false;
	bool m_upset = false;
	json m_doc;
	json m_query;

public:
	json to_json() const override;
	std::string from_json(const json& data) override;

	update_task(const base_task& in_base,
		const json& in_doc,
		bool in_multi,
		bool in_upset);
	bool is_multi() const;
	bool is_upset() const;
	const json& doc() const;
	const json& query() const;
	update_task(const base_task& in_base);
	static std::shared_ptr<update_task> update_one(
		const base_task& base,
		const json& query,
		const json& doc,
		bool upset
	);
	static std::shared_ptr<update_task> update_multi(
		const base_task& base,
		const json& query,
		const json& doc
	);
};

构造出来的std::shared_ptr<db_task_desc::base_task>会通过定义的to_json接口转换为json字符串,然后通过call_db函数发送到db_server,这部分的逻辑与之前的redis_server指令封装是一致的。

Redis与MongoDB的配合使用

Redis作为一个缓存服务器,在其性能上要远远快于MongoDB,所以在mosaic_game里一般会将一些频繁访问的玩家数据缓存到Redis里,这样客户端就直接从Redis里读取数据,而不是从MongoDB里读取,非常大的减少了数据库的访问压力。但是由于Redis是一个内存数据库,其容量是有限的,所以在mosaic_game里一般不会将所有的玩家数据都缓存到Redis里,而是只缓存那些经常访问的玩家数据。同时为了避免Redis占据内存的无限膨胀,存储在Redis的那些玩家数据一般都会设置一个TTL来执行长期未访问数据的清除。这两个设计的影响下,客户端从Redis读取玩家数据的时候可能会出现某些玩家的数据并不在Redis里的情况。此时需要从MongoDB中拉取这些玩家的存库属性数据,并过滤出Redis所需要的相关字段,然后更新到Redis之中。由于数据库的操作并不对客户端暴露,所以在player_redis_component上提供了这个通过服务端批量查询多个玩家Redis信息的接口redis_query_player_infos_from_client:

void player_redis_component::redis_query_player_infos_from_client(const utility::rpc_msg& msg, const std::vector<std::string>& player_ids, std::uint64_t callback_id)
{
	utility::rpc_msg forward_msg;
	forward_msg.cmd = "redis_query_player_infos";
	std::vector<json> callback_args;
	callback_args.push_back(true);
	callback_args.push_back(callback_id);
	auto new_callback_id = m_owner->add_callback("reply_redis_query_players", callback_args);
	forward_msg.set_args(player_ids);
	m_owner->call_service_with_cb("redis_service", forward_msg, new_callback_id);
}

这个接口会将请求发送到redis_service,处理的时候会再次去redis_server里执行一次批量查询:

void redis_service::redis_query_player_infos(const utility::rpc_msg& msg, const std::vector<std::string>& player_ids, std::uint64_t callback_id)
{
	redis::redis_task_desc temp_redis_task;
	temp_redis_task.cmds.push_back(redis::command::hash::get(player_collection_name(), player_ids));
	auto cur_redis_calback = [from = msg.from, player_ids,callback_id,  this](const json& redis_reply)
	{
		this->on_redis_query_player_back(from, player_ids,callback_id,  redis_reply);
	};
	auto cur_redis_callback_id = m_callback_mgr.add_callback(cur_redis_calback);
	get_server()->call_redis(json(temp_redis_task), this, cur_redis_callback_id);
}

void player_redis_component::reply_redis_query_players(const utility::rpc_msg& msg, bool from_client, std::uint64_t callback_id, const json& reply)
{
	if(from_client)
	{
		utility::rpc_msg reply_msg;
		reply_msg.cmd = "reply_redis_query_players";
		reply_msg.set_args(callback_id, reply);
		m_player->call_client(reply_msg);
	}
	else
	{
		m_owner->invoke_callback(utility::mixed_callback_manager::reconstruct_handler(callback_id), reply);
	}
	
}

当查询完成之后,会计算出那些玩家的数据并不在redis_server里,这些玩家的id组成数组empty_player_ids。如果这个数组不为空,则直接通过send_callback_reply将查询出来的Redis数据发送到服务端的player_entity让其执行下发操作。如果这个数组不为空,那么需要从MongoDB里拉取empty_player_ids对应的玩家数据,并更新到Redis里:

void redis_service::on_redis_query_player_back(const std::string& from, const std::vector<std::string>& player_ids, std::uint64_t callback_id, const json& redis_reply)
{
	
	std::string cur_err;
	std::vector<std::string> cur_player_infos;
	json result_player_infos;
	std::vector<std::string> empty_player_ids;
	do
	{
		redis_reply.at(0).at("error").get_to(cur_err);
		redis_reply.at(0).at("content").get_to(cur_player_infos);
		if(cur_player_infos.size() != player_ids.size())
		{
			cur_err = "result sz not match";
			break;
		}
		for(std::size_t i = 0; i< player_ids.size(); i++)
		{
			if(cur_player_infos[i].empty())
			{
				empty_player_ids.push_back(player_ids[i]);
			}
			else
			{
				try
				{
					result_player_infos[player_ids[i]] = json::parse(utility::base64_decode(cur_player_infos[i])).get<json::object_t>();
				}
				catch(const std::exception& e)
				{
					m_logger->error("on_redis_query_back fail for player {} with reply {} error is {}", player_ids[i], redis_reply.dump(), e.what());
					cur_err = "fail to parse";
					break;
				}
				
				
			}
		}
	} while(0);
	if(!cur_err.empty() || empty_player_ids.empty())
	{
		send_callback_reply(from, callback_id, cur_err, result_player_infos);
		return;
	}
	auto cur_db_calback = [from, callback_id, result_player_infos, empty_player_ids,this ](const json& db_reply) mutable
	{
		this->on_db_query_players_back(from, callback_id, result_player_infos, empty_player_ids, db_reply);
	};
	auto cur_db_callback_id = m_callback_mgr.add_callback(cur_db_calback);
	
	tasks::db_task_desc::base_task cur_task_base(tasks::db_task_desc::task_op::find_multi, std::string{}, std::to_string(cur_db_callback_id.value()), player_collection_name());
	json::object_t query;
	query["base.entity_id"]["$in"] = empty_player_ids;

	auto  cur_find_task = tasks::db_task_desc::find_task::find_multi(cur_task_base, query, std::uint32_t(empty_player_ids.size()), player_redis_fields());
	get_server()->call_db(cur_find_task->to_json(), this, cur_db_callback_id);

}

当数据库查询回来之后,会将查询到的玩家数据更新到result_player_infos中,然后通过send_callback_replyresult_player_infos发送到服务端的player_entity让其执行下发操作:

void redis_service::on_db_query_players_back(const std::string& from, std::uint64_t callback_id, json& result_player_infos, const std::vector<std::string>& empty_player_ids, const json& db_reply)
{
	std::string cur_err;
	tasks::db_task_desc::task_reply cur_reply;
	std::vector<json::object_t> raw_player_infos;
	std::vector<std::string> raw_player_ids;
	std::vector<std::pair<std::string, std::string>> redis_info_to_set;
	do
	{
		cur_err = cur_reply.from_json(db_reply);
		if(!cur_err.empty())
		{
			break;
		}
		if(!cur_reply.error.empty())
		{
			cur_err = cur_reply.error;
			break;
		}
		if(cur_reply.content.empty())
		{
			cur_err = "db query empty";
			break;
		}
		raw_player_infos.reserve(cur_reply.content.size());
		raw_player_ids.reserve(cur_reply.content.size());

		for(std::size_t i = 0; i < cur_reply.content.size(); i++)
		{
			try
			{
				json::object_t temp_player_info;
				json::parse(cur_reply.content[i]).get_to(temp_player_info);
				std::string temp_player_id;
				temp_player_info.at("base").at("entity_id").get_to(temp_player_id);
				result_player_infos[temp_player_id] = temp_player_info;
				redis_info_to_set.push_back(std::make_pair(temp_player_id, utility::base64_encode(temp_player_info["base"].dump())));
			}
			catch(std::exception& e)
			{
				cur_err = "parse db result fail";
				m_logger->error("on_db_query_players_back fail to parse {} error {}", cur_reply.content[i], e.what());
				break;
			}
			
		}
	} while(0);
	send_callback_reply(from, callback_id, cur_err, result_player_infos);

	for(const auto& one_empty_player_id: empty_player_ids)
	{
		if(result_player_infos.find(one_empty_player_id) == result_player_infos.end())
		{
			redis_info_to_set.push_back(std::make_pair(one_empty_player_id, utility::base64_encode("{}")));
		}
	}
	if(redis_info_to_set.empty())
	{
		return;
	}
	redis::redis_task_desc cur_redis_task;
	for(const auto& [one_empty_player_id, one_player_info]: redis_info_to_set)
	{
		// 使用 setnx,避免把在 DB 查询期间已被事件写入的 Redis 值覆盖
		cur_redis_task.cmds.push_back(redis::command::hash::setnx(player_collection_name(), one_empty_player_id, one_player_info));
	}
	auto cur_redis_calback = [this](const json& redis_reply)
		{
			this->on_set_redis_back(redis_reply);
		};
	auto cur_redis_callback_id = m_callback_mgr.add_callback(cur_redis_calback);
	get_server()->call_redis(json(cur_redis_task), this, cur_redis_callback_id);
}

在执行数据下发的同时,还会将这些从数据库里获取的数据更新到Redis里,确保Redis里的数据与MongoDB里的数据保持一致。注意这里使用的是setnx命令,这个命令只有在对应的key不存在时才会设置成功。使用setnx而不是set的理由是玩家数据的存库并不是实时的,而是采取计时器的方式来做定期存库。所以如果这个玩家在线的话,DB里的数据可能会比当前内存中的玩家数据老,如果在DB查询期间这个在线的玩家通过update_player_redis_info执行了最新的redis数据的推送,那么DB查询回来之后的数据可能比当前Redis里已经存在的数据更老,所以要使用setnx来避免把在DB查询期间已经写入的Redis数据覆盖。

熟悉互联网后端业务的应该都知道缓存同步里的延迟双删这个概念,即在更新数据库之前先删除Redis里的数据,然后再更新数据库, 更新数据库完成之后再添加一个短时间的计时器来再次从Redis里删除对应的数据。这样做的目的是为了避免在更新数据库的过程中,如果有查询操作过来,查询操作会发现Redis里没有数据,就会去查询数据库,查询到的数据是旧数据,这样就会导致数据不一致的问题。假如不去做这个延迟的删除,只做开始的第一次Redis删除,就可能会出现一致性的问题,下面是单删流程里出现一致性问题的一个示例时序图:

时间线程1线程2
t1删除缓存
t2查询数据库,得到旧值(缓存中没值,准备添加缓存)
t3更新数据库
t4添加缓存
t5更新旧值到缓存

由于在游戏服务器里,所有的数据都是以内存里的最新数据为准的,数据库里的数据只是作为一个持久化的备份。所以当前游戏服务器的设计里不会考虑这么复杂的机制去实现缓存一致性的问题,而是简单的采取只要是所关注的数据发生变更就立即更新Redis的方式,如果使用数据库里的数据去更新Redis则必须采用setnx的形式。此外在业务逻辑层再保证玩家上线时和下线存库之后立即更新Redis,就基本可以在Redis里维护好最新的玩家数据了。

这里还对不存在的玩家数据也进行了处理,即如果DB查询回来的玩家数据中不存在某个玩家,那么就会在Redis里设置一个空的json字符串作为这个玩家的redis数据。这样就避免了不断的对这个玩家执行数据库的查询操作,从而避免缓存击穿。

外围系统的任务调度

HiRedisMongoCxxDriver里提供的编程模型非常相似,都需要新建一个Context对象来作为连接上下文,这个对象在使用驱动里提供的连接接口来创建,连接成功之后外围业务就可以不断的使用Context上提供的读写接口来与后端的Redis/MongoDB来执行交互。但是Context每次只能发起一个任务,只有当后端系统处理完这个任务并发回结果之后才能发起下一个任务,即任务的执行是串行的。下面的代码就是HiRedis官方提供的最小样例代码,包含了前述描述的相关处理流程:

#include <stdio.h>
#include <stdlib.h>

#include <hiredis/hiredis.h>

int main() {
    // The `redisContext` type represents the connection
    // to the Redis server. Here, we connect to the
    // default host and port.
    redisContext *c = redisConnect("127.0.0.1", 6379);

    // Check if the context is null or if a specific
    // error occurred.
    if (c == NULL || c->err) {
        if (c != NULL) {
            printf("Error: %s\n", c->errstr);
            // handle error
        } else {
            printf("Can't allocate redis context\n");
        }

        exit(1);
    }

    // Set a string key.
    redisReply *reply = redisCommand(c, "SET foo bar");
    printf("Reply: %s\n", reply->str); // >>> Reply: OK
    freeReplyObject(reply);

    // Get the key we have just stored.
    reply = redisCommand(c, "GET foo");
    printf("Reply: %s\n", reply->str); // >>> Reply: bar
    freeReplyObject(reply);

    // Close the connection.
    redisFree(c);
}

至于MongoCxxDriver会比HiRedis复杂一下,需要首先建立一个全进程唯一的instance对象,然后再创建client对象,在client对象的构造函数里负责发起到后端的网络连接,连接成功了之后再选择一下要使用的数据库Collection,最后在这个数据库Collection对象上执行具体的数据库操作:

#include <mongocxx/instance.hpp>
#include <mongocxx/client.hpp>
#include <mongocxx/uri.hpp>
#include <bsoncxx/json.hpp>
#include <mongocxx/exception/exception.hpp>
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_document;
int main()
{
    mongocxx::instance instance;
    // Replace the placeholder with your Atlas connection string
    mongocxx::uri uri("<connection string>");
    // Create a mongocxx::client with a mongocxx::options::client object to set the Stable API version
    mongocxx::options::client client_options;
    mongocxx::options::server_api server_api_options(mongocxx::options::server_api::version::k_version_1);
    client_options.server_api_opts(server_api_options);
    mongocxx::client client(uri, client_options);
    try
    {
        // Ping the server to verify that the connection works
        auto admin = client["admin"];
        auto command = make_document(kvp("ping", 1));
        auto result = admin.run_command(command.view());
        std::cout << bsoncxx::to_json(result) << "\n";
        std::cout << "Pinged your deployment. You successfully connected to MongoDB!\n";
    }
    catch (const mongocxx::exception &e)
    {
        std::cerr << "An exception occurred: " << e.what() << "\n";
        return EXIT_FAILURE;
    }
}

从上面的两个最小样例代码可以看出,HiRedisMongoCxxDriver的编程模型都是基于同步阻塞的接口,且默认不带多线程的支持。由于redis_serverdb_server角色的进程都需要处理大量的请求任务,单个的redisContext/mongocxx::client很容易导致请求排队,所以需要构造多个这样的Context对象来处理请求。但是由于同步阻塞编程模型的限制,要想多个Context对象一起执行任务就必须要给每个Context安排一个专属的线程去执行。所以在redis_server/db_server进程里都使用了多线程的任务队列来处理这些请求任务,同时将每个Context对象封装成一个redis_worker/mongo_worker,每个worker对象专属一个线程。在进程启动的时候就会在do_start函数里将这些线程与worker创建好:

void db_server::do_start(const mongo_config& mongo_servers, std::uint8_t worker_num)
{
	auto cur_task_logger = utility::get_logger("mongo_worker");
	for (std::uint8_t i = 0; i < worker_num; i++)
	{
		
		workers.push_back(std::make_shared<mongo_worker>(mongo_servers, mongo_task_channels, cur_task_logger));
	}
	for (auto& one_worker : workers)
	{
		work_threads.emplace_back([=]()
			{
				one_worker->run();
			});
	}
	json_stub::start();
}

void redis_server::do_start(const redis_config &redis_servers, std::uint8_t worker_num)
{
	
	for (std::uint8_t i = 0; i < worker_num; i++)
	{
		auto cur_task_logger = utility::get_logger("redis_worker_" + std::to_string(i) );
		workers.push_back(std::make_shared<redis_worker>(redis_servers, redis_task_channels, cur_task_logger));
	}
	for (auto &one_worker : workers)
	{
		work_threads.emplace_back([=]()
									{ one_worker->run(); });
	}
	json_stub::start();
}

由于任务都是主线程通过RPC接收的,而所有的worker都拥有自己的线程,如何安全的将任务从主线程传递到worker线程就是一个容易出错的问题,这里mosaic_game采用了一种比较简单的解决方案:主线程在收到RPC请求之后,直接将请求参数和回调函数打包成一个task对象,然后投递到一个线程安全队列里:

// redis 的task封装
class redis_task
{
	public:
	using channel_type = std::string;
	using callback_t = std::function<void(const std::vector<redis_reply>&)>;
protected:
	const redis_task_desc m_desc;
	callback_t m_callback;
public:
	redis_task(const redis_task_desc cur_desc,
		callback_t in_callback)
		: m_desc(cur_desc)
		, m_callback(in_callback)
	{

	}
	redis_task(const redis_task& other) = delete;
	redis_task& operator=(const redis_task& other) = delete;
	const redis_task_desc& desc() const
	{
		return m_desc;
	}
	void finish(const std::vector<redis_reply>& replys)
	{
		if (m_callback)
		{
			m_callback(replys);
		}

	}
	const std::string& channel_id() const
	{
		return m_desc.channel;
	}
};
// 主线程redis_server::on_server_control_msg将redis_task投递到redis_task_channels
auto cur_task = std::make_shared<redis::redis_task>(cur_task_desc, cur_lambda);
m_logger->info("add redis_task {}", json(cur_task_desc).dump());
redis_task_channels.add_task(cur_task);
// mongo 的task封装
class mongo_task
{
	std::shared_ptr<const db_task_desc::base_task> _db_task_desc;
	db_task_desc::reply_callback_t _callback;
	db_task_desc::task_reply _reply;
	const cost_time_t begin_time;
	cost_time_t run_begin_time;
	cost_time_t run_end_time;
	
	logger_t logger;

public:
	using channel_type = std::string;
	using callback_t = db_task_desc::reply_callback_t;
	const std::string& channel_id() const;
	std::shared_ptr<const db_task_desc::base_task> db_task_desc() const;
	mongo_task(std::shared_ptr<const db_task_desc::base_task> in_db_task_desc,
		db_task_desc::reply_callback_t in_callback, logger_t in_logger);
	mongo_task(const mongo_task& other) = delete;
	void run(mongocxx::database& db);
	mongocxx::read_preference::read_mode read_mode(db_task_desc::read_prefer_mode in_read_mode) const;
	void finish(const std::string& error);
protected:
	void run_find_task(mongocxx::database& db);
	void run_count_task(mongocxx::database& db);
	void run_insert_task(mongocxx::database& db);
	void run_update_task(mongocxx::database& db);
	void run_modify_task(mongocxx::database& db);
	void run_delete_task(mongocxx::database& db);

	void run_impl(mongocxx::database& db);

};
// 主线程db_server::on_server_control_msg将db_task投递到mongo_task_channels
auto cur_db_task = std::make_shared<mongo_task>(cur_task, cur_lambda, m_logger);
m_logger->info("add db_task {}", db_request_detail.dump(4));
mongo_task_channels.add_task(cur_db_task);

这里mongo_task的实现比较复杂,主要是因为mongoCRUD操作比较复杂,需要根据不同的task类型来调用不同的API,而redis_task只需要对接执行字符串即可。在这两个task上都提供了一个run函数和finish函数,这两个函数都会在worker线程中调用。 run函数负责执行业务请求,finish函数负责将执行结果通知回主线程。由于将数据从worker线程传递到主线程是一个多线程并行操作,所以这里为了安全起见使用了之前介绍过的add_task_to_main_loop将一个lambda投递到主线程的线程安全回调通知队列m_main_loop_tasks里去:

void basic_stub::add_task_to_main_loop(std::function<void()>&& cur_task)
{
	m_main_loop_tasks.push_msg(std::forward<std::function<void()>>(cur_task));
}
// mongo_task 的finish回调
auto cur_lambda = [=](const db_task_desc::task_reply& db_reply)
{
	json reply;
	reply["cmd"] = "reply_db_request";
	json reply_param;
	reply_param["errcode"] = std::string();
	reply_param["callback_id"] = callback_id;
	reply_param["result"] = db_reply.to_json();
	reply["param"] = reply_param;
	auto reply_str = std::make_shared<const std::string>(reply.dump());
	add_task_to_main_loop([=,reply_str=std::move(reply_str)]()
	{
		m_router->push_msg(m_local_name_ptr, from, reply_str, enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	});
};

// redis_task 的finish回调

auto cur_lambda = [=](const std::vector<redis::redis_reply> &redis_replys)
{
	json reply;
	reply["cmd"] = "reply_redis_request";
	json reply_param;
	reply_param["callback_id"] = callback_id;
	json::array_t array_result;
	for (const auto &one_reply : redis_replys)
	{
		json one_result;
		one_result["error"] = one_reply.error;
		// 省略解析错误逻辑
		array_result.push_back(one_result);
	}
	reply_param["result"] = array_result;

	reply["param"] = reply_param;
	auto reply_str = std::make_shared<const std::string>(reply.dump());
	add_task_to_main_loop([=, reply_str=std::move(reply_str)]()
	{
		m_router->push_msg(m_local_name_ptr, from, reply_str, enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	});
	
};

然后在主线程的main loop里调用poll_mainloop_tasks就可以将这些finish任务安全取出并依次执行:

std::size_t basic_stub::poll_mainloop_tasks()
{
	const static std::uint32_t batch_task_num  =10;
	std::array<std::function<void()>, batch_task_num> temp_tasks;
	std::uint64_t pop_get_num = 0;
	std::uint64_t total_pop_num = 0;
	while((pop_get_num = m_main_loop_tasks.pop_bulk_msg(temp_tasks.data(), batch_task_num)))
	{
		total_pop_num += pop_get_num;
		for(std::uint32_t i = 0; i< pop_get_num; i++)
		{
			temp_tasks[i]();
		}
	}
	return total_pop_num;
}

在业务逻辑中,我们期望在同一个Entity发起的多个db_task/redis_task可以按照发起顺序依次执行,而不是乱序执行,这样可以避免在业务逻辑中处理复杂的状态机。但是不同的Entity发起的db_task/redis_task之间不会存在先后关系,可以乱序执行。所以在设计db_task/redis_task的线程队列时,mosaic_game需要考虑上述的有序和乱序需求,为此在db_task/redis_task上都添加了一个std::string channel字段:

// mongo task的描述信息 有个m_channel字段代表有序通道
class base_task
{
protected:
	std::string m_collection = "";
	std::string m_channel = "";
	std::string m_request_id = "";
	task_op m_op_type = task_op::invalid;
	json::object_t m_extra;
}
// redis task的描述信息 有个channel字段代表有序通道
struct redis_task_desc
{
	std::string channel;
	std::vector<std::string> cmds;
	NLOHMANN_DEFINE_TYPE_INTRUSIVE(redis_task_desc, channel, cmds)
};

如果db_task/redis_task上携带的channel字段为默认值空字符串,说明这些task没有任何先后顺序要求,可以任意顺序执行。如果db_task/redis_task上携带的channel字段不为空,那么要求相同channeltask按照到达redis_server/db_server的顺序去执行。

由于这样的考虑了channel的线程安全任务队列是一个比较公用的需求,因此这个功能的实现独立出来了一个小项目task_channel。项目的代码其实比较简单,主要是利用了std::array<task_queue, HASH_BUCKET_COUNT> m_task_buckets来存储不同channeltask队列,利用std::mutex来保护对这些队列的访问:

template <typename T, bool threading = true>
class task_channels
{
public:
	using channel_type = typename T::channel_type;
	using task_ptr = std::shared_ptr<T>;
	struct task_queue
	{
		std::uint32_t executor_id = 0;
		std::deque<task_ptr> queue;
	};

	static constexpr std::size_t HASH_BUCKET_COUNT = 32;
	static constexpr std::size_t HASH_MASK = HASH_BUCKET_COUNT - 1;

protected:
	const channel_type m_default_channel_id;
	std::array<task_queue, HASH_BUCKET_COUNT> m_task_buckets;
	task_queue m_tasks_without_channel;
	mutable std::mutex m_task_mutex;

	std::atomic<std::size_t> m_add_task_count = 0;
	std::atomic<std::size_t> m_run_task_count = 0;
	std::atomic<std::size_t> m_finish_task_count = 0;
	// 省略很多字段
};

在执行任务的添加的时候,根据taskchannel字段来判断将task添加到哪个channel的队列中。如果taskchannel字段是default_channel,说明这个task没有任何先后顺序要求,可以任意顺序执行,因此将这个task添加到m_tasks_without_channel队列中。如果taskchannel字段不等于default_channel,那么要求相同channeltask按照到达redis_server/db_server的顺序去执行,所以将这个task添加到m_tasks_by_channel[hash(task.channel)]队列中:

void add_task_impl(task_ptr task)
{
	auto cur_channel_id = task->channel_id();

	if (!is_default_channel(cur_channel_id))
	{
		// 计算channel的哈希值,确定要使用的bucket
		std::size_t bucket_index = hash_channel(cur_channel_id);
		auto &task_queue = m_task_buckets[bucket_index];
		task_queue.queue.push_back(task);
	}
	else
	{
		m_tasks_without_channel.queue.push_back(task);
	}
	m_add_task_count++;
}

由于channel的数量会很多,而worker的数量是有限的,所以无法将channelworker执行一一对应。所以这里将channel进行hash来分配到m_task_buckets这个array里。只要保证m_task_buckets里的一个元素同时只能被一个worker消费,那么相同task_queuetask总是被同一个worker处理,这样就可以保证channel内任务的有序性。此外m_task_buckets的数量是固定的,而worker的数量不确定,所以需要对每个task_queue做一个标记executor_id,代表这个task_queue目前正在被哪个worker执行。如果一个非默认task_queueexecutor_id0,说明这个task_queue目前没有被任何worker执行。了解了这些设计之后,worker获取任务的代码就好理解了:

task_ptr poll_one_task_impl(channel_type prefer_channel, std::uint32_t cur_executor_id)
{
	auto result_queue = select_task_queue(prefer_channel);
	if (!result_queue)
	{
		return {};
	}
	result_queue->executor_id = cur_executor_id;
	auto cur_task = result_queue->queue.front();
	result_queue->queue.pop_front();
	m_run_task_count++;
	return cur_task;
}

worker在请求一个新的task的时候,会使用poll_one_task_impl这个接口,第一个参数是当前worker执行的上一个taskchannel, 第二个参数是当前workerexecutor_id。这个函数会调用select_task_queue来获取最终选取的channel任务队列,并更新这个channel任务队列的executor_id字段,标记这个任务队列目前被当前worker独占了。注意这里会将task从队列里pop出去,所以一个task_queue为空并不代表这个队列的任务为空,可能还有一个正在被某个worker占据正在执行,因此只能用executor_id是否为0来判定当前task_queue是否还在被独占。

获取任务队列的接口select_task_queue逻辑也非常简洁: 如果当前worker执行的上一个taskchannel不是默认channel,那么说明当前worker之前执行过需要保证channel内顺序的task,所以需要优先选择这个channel所属task_queuetask。但是如果上一个channel所属task_queue已经空了,则先清空这个task_queueexecutor_id,代表目前没有worker在执行这个task_queue的任务,再尝试去默认队列里拿取一个任务。如果默认队列里也拿不到,则遍历整个m_tasks_by_channel去获取一个当前没有worker占用的非空channel:

task_queue *select_task_queue(channel_type prefer_channel, std::uint32_t cur_executor_id)
{

	// 1. 如果指定了非默认channel,优先检查对应的bucket
	if (!is_default_channel(prefer_channel))
	{
		std::size_t prefer_bucket = hash_channel(prefer_channel);
		auto &queue = m_task_buckets[prefer_bucket];
		if (!queue.queue.empty() && (queue.executor_id == 0 || queue.executor_id == cur_executor_id))
		{
			return &queue;
		}
	}

	// 2. 检查是否有默认channel的任务
	if (!m_tasks_without_channel.queue.empty())
	{
		return &m_tasks_without_channel;
	}

	// 3. 遍历所有bucket,找到一个非空且未被其他worker占用的队列
	// 每个worker会处理一个队列直到为空
	// 使用随机起点避免每次都从第一个bucket开始 减少饥饿概率
	auto random_start = m_run_task_count % HASH_BUCKET_COUNT;
	for (std::size_t i = 0; i < HASH_BUCKET_COUNT; ++i)
	{
		auto &backet = m_task_buckets[(random_start + i) % HASH_BUCKET_COUNT];
		if (!backet.queue.empty() && backet.executor_id == 0)
		{
			return &backet;
		}
	}

	return nullptr;
}

当一个任务被执行完成之后,会通过finish_task_impl接口来通知task_channel,这个函数会判断当前task_queue是否为空,如果为空,则将executor_id设置为0,代表当前task_queue没有被任何worker执行了,后续添加的任务可以被其他worker来执行:

void finish_task_impl(task_ptr cur_task)
{
	m_finish_task_count++;
	// 如果队列现在为空,重置executor_id
	if (!is_default_channel(cur_task->channel_id()))
	{
		std::size_t bucket_index = hash_channel(cur_task->channel_id());
		auto &task_queue = m_task_buckets[bucket_index];
		if (task_queue.queue.empty())
		{
			task_queue.executor_id = 0;
		}
	}
}