聊天系统

聊天功能也是联网游戏必不可少的功能,这个系统的初衷是为了团队协作中的信息交流,以及玩家之间的休闲交流。但是MMO游戏在生涯后期常见日常玩法基本都变成了一键挂机,游戏的日活完全就由用户间的聊天交互支撑,逐渐蜕变成为了大型在线聊天游戏。聊天系统作为社交游戏的基石,需要一个简单高效且容易扩展的设计来对接后续的任意系统的接入。任何有网络编程经验的开发者应该都尝试过写一个非常简单的聊天系统,但是这种简单的聊天系统一般就只实现了点对点或者全广播的在线聊天功能。网上能看到的功能更加齐全的聊天系统设计基本都牵涉到了使用互联网中常见的中间件,这些设计对于游戏来说有一些参考价,不过在游戏内用互联网架构去做一个聊天系统就有点杀鸡有牛刀了。因为游戏一般都使用分区分服架构,同服务器的玩家数量不会超过10w,所以单点服务就基本可以满足要求,不需要考虑各种分布式的问题,也不需要考虑大规模数据存储的问题。本文就来介绍一下mosaic_game中的聊天系统设计,介绍一下其实现单人聊天、组队聊天、群组聊天以及广播聊天的各种细节。

聊天消息格式

聊天系统中最基础的就是单人聊天,只需要明确聊天消息格式以及对应的收发RPC即可。一般的聊天系统中定义的消息格式一般都包括如下几个字段:

  1. 发送者 发送者一些身份信息,包括全局唯一id以及一些客户端显示头像所需要的门派等级等信息。所以一般会拆分为两个字段,一个player_id字段和一个player_info字段,这个player_info字段一般都设计成为了json::object_t形式来支持各种不定类型的字段
  2. 接收者 如果是一对一聊天的话接收者就是消息发送对象的唯一id,如果是群组聊天、组队聊天、全服聊天等聊天目标的话,一般会保持类型一致的同时,添加一个易于区分的前缀,作为聊天的频道,组成group_xxx,team_xxx,world_xxx这样的形式。这里的xxx都是在对应系统里的唯一标识符,例如群组id,队伍id
  3. 时间戳,这个是聊天消息的发送时间戳,不过这个发送时间并不是客户端发起消息发送rpc的时间,而是聊天服务接受到发送消息请求之后合法性判定通过后的消息投递开始时间,这样就可以维护消息在客户端显示时的有序性
  4. 基本正文 这个就是玩家输入的消息文本,或者一些系统生成的固定格式文本
  5. 消息类型 由于各个系统都可以接入到消息系统中去展示各自的内容,而这些内容一般都不是纯文本形式能够表达的,会引入一些更加高级的展示形式。如向其他玩家炫耀自己的装备道具时,需要指定当前的消息类型为道具展示消息;发送红包时,需要指定当前的消息类型为红包消息。这里的消息类型为了可维护一般都会指定为整数类型,并在消息类型表中指定这个整数的意义
  6. 额外字段 对于非纯文本类型的消息,展示这个消息的细节一般会需要提供很多的额外参数。如装备系统中的各项数值,红包系统中的红包id以及总额等信息。为了对接各种消息类型的额外信息,这个字段一般都是json::object_t类型,满足最大程度的灵活性
  7. 消息计数器,对于一些需要知道未读消息个数以及一些已读确认的消息而言,需要用一个局部范围内的唯一且不减标识符来区分不同的消息,所以这个字段一般设计为uint64,这样可以避免各种可能的数值溢出问题

在客户端发送聊天时,上述字段中的发送者、时间戳、消息计数器这三个字段并不需要提供。因为这三个字段都涉及到正确性,所以相关数据由服务端填充。在mosaic_game中,使用了一个json::object_t去包装上面提到的基本正文、消息类型和额外字段,此时一个客户端请求发送聊天消息的rpc就很简单了:

void player_chat_component::chat_add_msg_request(const utility::rpc_msg& msg, std::uint8_t chat_type, const std::string& to_player_id, const json::object_t& detail);

这里的chat_type对应mosaic_game内部规定的一个枚举类型,支持了一下单聊、群组聊天、组队聊天和广播聊天:

enum class chat_type
{
	personal = 0,
	group,
	team,
	broadcast,
	max
};

聊天消息收发

上面提到的chat_add_msg_request是整个发送聊天消息的入口,除了广播聊天之外,其他几种聊天都会中转到chat_service去处理:

auto cur_chat_key = misc::chat_utils::gen_chat_key(enums::chat_type(chat_type), m_owner->entity_id(), other_id);
if(chat_type != std::uint8_t(enums::chat_type::broadcast))
{
	auto cur_chat_prop_proxy = m_player->prop_proxy().chats().get_insert(cur_chat_key);
	if(cur_chat_prop_proxy.other_id().get().empty())
	{
		cur_chat_prop_proxy.other_id().set(other_id);
		cur_chat_prop_proxy.chat_type().set(chat_type);
	}
	utility::rpc_msg request_msg;
	request_msg.cmd = "request_add_chat";
	request_msg.set_args(m_owner->entity_id(), cur_chat_key, detail);
	m_owner->call_service("chat_service", request_msg);
}
else
{
	// 广播消息处理暂时省略
}

这里的misc::chat_utils::gen_chat_key内部由于根据传入的聊天类型拼接出一个聊天消息投递标识符,主要处理单人聊天的标识符归一化,这样A-B两个人私聊的时候会使用同一个投递标识符:

std::string chat_utils::gen_chat_key(enums::chat_type chat_type, const std::string& self_id, const std::string& other_id)
{
	switch (chat_type)
	{
	case enums::chat_type::personal:
	{
		if(self_id < other_id)
		{
			return std::to_string(std::uint8_t(chat_type)) + "_" + self_id + "_" + other_id;
		}
		else
		{
			return std::to_string(std::uint8_t(chat_type)) + "_" + other_id + "_" + self_id;
		}
	}
	default:
		return std::to_string(std::uint8_t(chat_type)) + "_" + other_id;
	}
}

chat_service::request_add_chat中通过内部的m_chat_data_mgr将这个聊天消息持久化之后,再通过on_add_msg来执行聊天消息的推送:

void chat_service::request_add_chat(const utility::rpc_msg& msg, const std::string& from_player_id, const std::string& chat_key, const json::object_t& chat_info)
{
	// 时间戳用 ms
	auto cur_chat_ts = std::chrono::steady_clock::now().time_since_epoch().count()/(1000*1000);
	m_chat_data_mgr->add_msg(chat_key, from_player_id, chat_info, cur_chat_ts, [from_player_id, chat_key, this, chat_info, cur_chat_ts](uint32_t new_msg_seq)
	{
		on_add_msg(from_player_id, chat_key, cur_chat_ts, new_msg_seq, chat_info);
	});
}

on_add_msg按照这个投递标识符的生成规则,使用decode_chat_key重新解析原始的参数:

std::pair<enums::chat_type, std::string_view>  chat_utils::decode_chat_key(std::string_view chat_key, std::string_view self_id)
void chat_service::on_add_msg(const std::string& from_player_id, const std::string& chat_key, std::uint64_t chat_ts, uint64_t new_msg_seq, const json::object_t& chat_info)
{
	auto cur_decode_result = misc::chat_utils::decode_chat_key(chat_key, from_player_id);
	if(cur_decode_result.second.empty())
	{
		return;
	}
	std::vector<json> notify_args;
	notify_args.push_back(chat_key);
	notify_args.push_back(chat_ts);
	notify_args.push_back(new_msg_seq);
	notify_args.push_back(chat_info);
	json::object_t notify_msg;
	switch (cur_decode_result.first)
	{
	case enums::chat_type::personal:
	{
		notify_msg["cmd"] = "chat_add_msg_notify";
		notify_msg["args"] = notify_args;
		server::offline_msg_manager::instance().add_msg(
			std::string(cur_decode_result.second), notify_msg);
		break;
	}
	case enums::chat_type::group:
	{
		utility::rpc_msg service_msg;
		service_msg.cmd = "group_add_chat";
		service_msg.set_args(std::stol(std::string(cur_decode_result.second)),chat_ts, new_msg_seq, chat_info, from_player_id);
		get_server()->call_service("group_service", service_msg);
		break;
	}
	case enums::chat_type::team:
	{
		utility::rpc_msg service_msg;
		service_msg.cmd = "team_add_chat";
		service_msg.set_args(std::string(cur_decode_result.second),chat_ts, new_msg_seq, chat_info, from_player_id);
		get_server()->call_service("team_service", service_msg);
		break;
	}
	
	default:
		break;
	}

	
	notify_msg["cmd"] = "chat_add_msg_reply";
	notify_msg["args"] = notify_args;
	server::offline_msg_manager::instance().add_msg(from_player_id,  notify_msg);
}

这里根据聊天频道类型,走不同的消息推送逻辑。单人聊天时的处理最简单,通过offline_msg_manager来执行聊天消息的推送,保证目标能够接收到这个消息的提示。这个chat_add_msg_notify函数里会对这个chat_key构造一个chat_item,来记录最近聊天时间以及最新聊天信息序列号:

void player_chat_component::chat_add_msg_notify(const utility::rpc_msg& msg, const std::string& chat_key, std::uint64_t chat_ts, std::uint64_t msg_seq, const json::object_t& detail)
{
	auto cur_chat_prop_proxy = m_player->prop_proxy().chats().get_insert(chat_key);
	if(cur_chat_prop_proxy.other_id().get().empty())
	{
		auto cur_decode_result = misc::chat_utils::decode_chat_key(chat_key, m_owner->entity_id());
		cur_chat_prop_proxy.other_id().set(std::string(cur_decode_result.second));
		cur_chat_prop_proxy.chat_type().set(std::uint8_t(cur_decode_result.first));
	}
	if(msg_seq >= cur_chat_prop_proxy.next_msg_seq().get())
	{
		cur_chat_prop_proxy.next_msg_seq().set(msg_seq + 1);
		cur_chat_prop_proxy.last_chat_ts().set(chat_ts);
	}
	
	m_player->call_client(msg);
}

对于群组聊天频道,会将相关消息转发到group_service之后,找到对应群组并对群组数据进行修改,然后再使用group_broadcast对群组内在线人员进行推送:

void group_service::group_add_chat(const utility::rpc_msg& msg, std::uint32_t dest_group_idx, std::uint64_t chat_ts, std::uint64_t new_seq, const json& chat_info, const std::string& action_player_id)
{
	enums::group_errcode cur_err = enums::group_errcode::ok;
	group_resource* dest_group = nullptr;
	do
	{
		auto temp_group_iter = m_group_resources.find(dest_group_idx);
		if(temp_group_iter == m_group_resources.end())
		{
			cur_err = enums::group_errcode::invalid_group_id;
			break;
		}
		dest_group = temp_group_iter->second.get();
		cur_err = dest_group->m_handler.add_chat(new_seq, action_player_id);
	} while (false);

	if(cur_err == enums::group_errcode::ok)
	{

		std::vector<json> temp_args;
		
		group_sync_props(dest_group, std::uint8_t(enums::group_action::chat), {}, {});

		utility::rpc_msg new_msg;
		new_msg.cmd = "chat_add_msg_notify";
		new_msg.set_args(misc::chat_utils::gen_chat_key(enums::chat_type::group, action_player_id, std::to_string(dest_group_idx)), chat_ts, new_seq, chat_info);
		group_broadcast(dest_group, new_msg.cmd, new_msg.args, {});
		
	}
}

队伍聊天可以认为是一个专门化的群组聊天,所以两者的逻辑都是相似的:

void team_service::team_add_chat(const utility::rpc_msg& msg,  const std::string& tid, std::uint64_t chat_ts, std::uint64_t new_seq, const json& chat_info, const std::string& from_player_id)
{
	auto cur_team_iter = m_team_resources.find(tid);
	if(cur_team_iter == m_team_resources.end())
	{
		return;
	}
	auto cur_team_ptr = cur_team_iter->second.get();
	auto cur_err = cur_team_ptr->m_handler.add_chat(new_seq, from_player_id);
	if(cur_err == enums::team_errcode::ok)
	{
		std::vector<json> temp_sync_args;
		team_sync_props(cur_team_ptr, std::uint32_t(enums::team_action::add_chat), std::move(temp_sync_args), std::string{});
		utility::rpc_msg new_msg;
		new_msg.cmd = "chat_add_msg_notify";
		new_msg.set_args(misc::chat_utils::gen_chat_key(enums::chat_type::team, from_player_id, tid), chat_ts, new_seq, chat_info);
		team_broadcast(cur_team_ptr, new_msg.cmd, new_msg.args, {});
	}

}

对于广播聊天,则走我们之前介绍notify_component上的客户端群组广播接口:

auto cur_notify_component = m_player->get_component<player_notify_component>();
if(!cur_notify_component)
{
	return;
}
utility::rpc_msg request_msg;
request_msg.cmd = "chat_add_broadcast_msg_notify";
request_msg.set_args(m_owner->entity_id(), cur_chat_key, detail, utility::timer_manager::now_ts());
cur_notify_component->send_msg_to_broadcast_group(other_id, request_msg);

聊天历史记录

任何一个聊天系统都会有查看历史记录的功能,mosaic_game中也不例外。聊天历史记录相关的功能由于比较独立,这里将这部分功能构造为了一个单独的库,见于huangfeidian/chatmosaic_game上的chat_service其实就是一个对这个chat库的一个简单封装,用来做聊天历史记录功能。 聊天历史记录里最核心的一点就是需要维护同一个聊天标识符对应的聊天数据里的序列号是递增的。这里我们使用chat_data_proxy类来封装一个聊天标识符对应的聊天数据,内部使用一个std::uint64_t m_next_seq来记录这个递增标识符,最终执行添加新聊天消息的函数内部会对这个字段进行自增:

void chat_data_proxy::add_chat_impl(const std::string &from_player_id, const json::object_t &chat_info, std::uint64_t chat_ts)
{
	chat_record cur_chat_record;
	cur_chat_record.detail = chat_info;
	cur_chat_record.from = from_player_id;
	cur_chat_record.seq = m_next_seq;
	cur_chat_record.ts = chat_ts;
	m_loaded_docs.rbegin()->second.records.push_back(std::move(cur_chat_record));
	m_next_seq++;
	m_dirty_count++;
	// 暂时省略一些存库代码
}

分配好唯一递增标识符之后,这可以将这条聊天数据存库了。在数据库的聊天表里对这个seq做好唯一索引,查询聊天记录就非常简单,直接利用数据库对这个字段的区间查询支持做一个[begin_seq, end_seq]的查询即可。这个简单的方案虽然可行,但是有一个非常大的性能弊端,就是每个聊天消息都作为一个数据库的行记录的话,数据库的读写就会非常的频繁。所以游戏里为了减轻对数据库的压力,一般会将若干连续的seq对应的聊天数据聚合起来作为数据库聊天表里的读写基础单位,这样可以显著的降低对数据库的读写需求。在huangfeidian/chat中,chat_doc就是这样的聚合起来的存库消息类型:

struct chat_doc
{
	std::string chat_key;
	chat_record_seq_t doc_seq;
	std::vector<chat_record> records;
	std::uint32_t ttl; // 在缓存中的剩余有效时间
	NLOHMANN_DEFINE_TYPE_INTRUSIVE(chat_doc, chat_key, doc_seq, records)
};

这里用一个doc_seq来作为存库消息的递增序列号,这个序列号与单个聊天消息的序列号之间的关系很简单,就是设置单个chat_doc最大聊天数据量record_num_in_doc。只有一个chat_docrecords数量达到record_num_in_doc时才会新建一个chat_doc,同时对应的doc_seq会进行加一操作,这部分的逻辑对应的就是上面add_chat_impl里暂时省略的存库代码:

if (m_loaded_docs.rbegin()->second.records.size() == m_record_num_in_doc)
{
	save();
	chat_doc new_chat_doc;
	new_chat_doc.chat_key = m_chat_key;
	new_chat_doc.doc_seq = m_loaded_docs.rbegin()->second.doc_seq + 1;
	new_chat_doc.ttl = m_default_loaded_doc_ttl;
	m_loaded_docs[new_chat_doc.doc_seq] = std::move(new_chat_doc);
}

这里的save负责调用m_save_func函数存储两个聊天表行数据到数据库中,这个函数的具体实现需要外部传递过来,这样可以对接各种数据库:

using chat_data_save_func = std::function<void(const std::string&, const json::object_t&, const json&)>;
chat_data_save_func m_save_func;

bool chat_data_proxy::save()
{
	if (!m_dirty_count)
	{
		return false;
	}
	json::object_t temp_query;
	temp_query["chat_key"] = m_chat_key;
	temp_query["doc_seq"] = m_loaded_docs.rbegin()->second.doc_seq;
	json cur_doc_json = m_loaded_docs.rbegin()->second;
	m_save_func(m_chat_key, temp_query, cur_doc_json);
	temp_query["doc_seq"] = std::numeric_limits<chat_record_seq_t>::max();
	json cur_meta_doc = temp_query;
	cur_meta_doc["next_seq"] = m_next_seq;
	m_save_func(m_chat_key, temp_query, cur_meta_doc);
	m_dirty_count = 0;
	return true;
}

第一个数据是当前chat_doc对应的数据,第二个数据是记录当前chat_key对应的聊天历史记录元数据,这个元数据负责记录下一个可以使用的聊天消息序列号。这里的元数据行用的索引是doc_seq == std::numeric_limits<chat_record_seq_t>::max(),这里存储的信息压根就不是chat_doc格式,但是也放在聊天表里,其实就是在偷懒取巧,因为正常情况下这个doc_seq应该不可能达到到uint64_t::max这个值。

这里的save触发时机就是当一个chat_doc里存储的chat_record数量达到设定的m_record_num_in_doc时触发,此外还有一个定时自动存库的机制来触发save,入口在chat_manager::tick_save中,这里使用了std::vector<std::shared_ptr< chat_data_proxy>> m_dirty_chat_datas作为存储队列,每次添加成功一个聊天消息之后都将这个chat_data_proxy放到这个队列的末尾,这里使用dirty_count作为已经在这个队列中的标记位来使用:

void chat_manager::add_msg_cb(std::shared_ptr<chat_data_proxy> cur_data, chat_record_seq_t msg_seq, std::function<void(chat_record_seq_t)> seq_cb)
{
	if (cur_data->dirty_count() == 1)
	{
		m_dirty_chat_datas.push_back(cur_data);
	}
	seq_cb(msg_seq);
}

std::vector<std::string> chat_manager::tick_save(chat_record_seq_t max_num)
{
	std::vector<std::string> result;
	std::reverse(m_dirty_chat_datas.begin(), m_dirty_chat_datas.end());
	chat_record_seq_t result_num = 0;
	for (int i = 0; i < max_num; i++)
	{
		if (m_dirty_chat_datas.empty())
		{
			break;
		}
		auto cur_back = m_dirty_chat_datas.back();
		m_dirty_chat_datas.pop_back();
		if (cur_back->save())
		{
			result.push_back(cur_back->m_chat_key);
			result_num++;
		}
		if (result_num >= max_num)
		{
			break;
		}
	}
	std::reverse(m_dirty_chat_datas.begin(), m_dirty_chat_datas.end());
	return result;
}

chat_service启动的时候不可能加载聊天表中的所有chat_data_proxy,运行时采取类似于LRU的形式去控制内存中的chat_data_proxy的加载与卸载。当一个chat_data_proxy被初始化之后,先从聊天数据表中读取存储了元数据的那个chat_doc,然后用这个元数据chat_doc去加载最新的chat_doc。为了尽可能的增加聊天服务的吞吐量,这里对数据库的读写都是异步的,避免卡住主线程,所以整个chat_data_proxy的构造过程被异步读取数据库切分为了三个阶段:

chat_data_proxy::chat_data_proxy(const std::string chat_key, chat_data_load_meta_func load_meta_func, chat_data_load_normal_func load_normal_func, chat_data_save_func save_func, chat_record_seq_t record_num_in_doc, chat_record_seq_t fetch_record_max_num)
	: m_chat_key(chat_key)
	, m_chat_key_hash(std::hash<std::string>{}(chat_key))
	, m_load_meta_func(load_meta_func), m_load_normal_func(load_normal_func), m_save_func(save_func)
	, m_record_num_in_doc(record_num_in_doc)
	, m_create_ts(utility::timer_mgr::now_ts())
	, m_fetch_record_max_num(fetch_record_max_num)
{
	json::object_t temp_query;
	temp_query["chat_key"] = m_chat_key;
	temp_query["doc_seq"] = std::numeric_limits<chat_record_seq_t>::max();
	json::object_t temp_doc;
	temp_doc["chat_key"] = m_chat_key;
	temp_doc["doc_seq"] = std::numeric_limits<chat_record_seq_t>::max();
	temp_doc["next_seq"] = 0;
	m_load_meta_func(m_chat_key, temp_query, temp_doc);
}

当这个m_load_meta_func执行数据库查询获取元数据chat_doc回来之后,需要调用chat_manager::on_meta_doc_loaded去调用chat_data_proxy上的元数据初始化接口:

void chat_manager::on_meta_doc_loaded(const std::string& chat_key, const json::object_t& doc)
{
	auto cur_iter = m_chat_datas.find(chat_key);
	if (cur_iter == m_chat_datas.end())
	{
		return;
	}
	cur_iter->second->on_meta_doc_loaded(doc);
}

bool chat_data_proxy::on_meta_doc_loaded(const json::object_t &meta_doc)
{
	try
	{
		meta_doc.at("next_seq").get_to(m_next_seq);
	}
	catch (const std::exception &e)
	{
		return false;
	}
	if (m_next_seq % m_record_num_in_doc == 0)
	{
		m_is_ready = true;
		chat_doc temp_doc;
		auto cur_doc_seq = m_next_seq / m_record_num_in_doc;
		temp_doc.doc_seq = cur_doc_seq;
		temp_doc.chat_key = m_chat_key;
		temp_doc.ttl = m_default_loaded_doc_ttl;
		m_loaded_docs[cur_doc_seq] = std::move(temp_doc);
		on_ready();
		return true;
	}
	json::object_t temp_query;
	temp_query["chat_key"] = m_chat_key;
	temp_query["doc_seq"] = m_next_seq / m_record_num_in_doc;
	m_load_normal_func(m_chat_key, temp_query);
	return true;
}

这个元数据初始化接口在被调用到时,会检查最新的一个chat_doc是否已经满了:

  1. 满了就立即创建一个新的chat_doc,同时m_is_ready设置为true,并使用on_ready来通知chat_data_proxy已经初始化好
  2. 没有满则再次去数据库中加载这个最新chat_doc,加载完成后通过chat_manager::on_load通知对应的chat_data_proxy里执行on_ready,这里也需要将m_is_ready设置为true
void chat_manager::on_load(const std::string& chat_key, const json::object_t& doc)
{
	auto cur_iter = m_chat_datas.find(chat_key);
	if (cur_iter == m_chat_datas.end())
	{
		return;
	}
	cur_iter->second->on_normal_doc_loaded(doc);
}

bool chat_data_proxy::on_normal_doc_loaded(const json::object_t&cur_doc)
{
	chat_doc temp_doc;
	try
	{
		json(cur_doc).get_to(temp_doc);
	}
	catch (const std::exception &e)
	{
		return false;
	}
	auto cur_temp_doc_seq = temp_doc.doc_seq;
	m_pending_load_docs.erase(cur_temp_doc_seq);
	temp_doc.ttl = m_default_loaded_doc_ttl;
	m_loaded_docs[cur_temp_doc_seq] = std::move(temp_doc);
	if (cur_temp_doc_seq == m_next_seq / m_record_num_in_doc)
	{
		m_is_ready = true;
		on_ready();
		return true;
	}
	// 暂时省略一些其他逻辑代码
}

在聊天消息的添加和历史记录的读取时,对应的chat_data_proxy可能都没有从数据库加载出来,所以相关接口也要支持异步模式,都需要考虑如果有多个人同时操作的情况,因此异步操作的代码相对于同步操作来说复杂很多。

在添加一个新的聊天消息时,需要判断当前最新的chat_doc是否已经加载,然后走同步处理或者异步处理:

void chat_data_proxy::add_chat(const std::string &from_player_id, const json::object_t &chat_info, std::uint64_t chat_ts, std::function<void(chat_record_seq_t)> add_cb)
{
	if (ready())
	{
		if (m_next_seq != std::numeric_limits<chat_record_seq_t>::max())
		{
			auto cur_record_seq = m_next_seq;
			add_chat_impl(from_player_id, chat_info, chat_ts);
			add_cb(cur_record_seq);
			return;
		}
		else
		{
			// 消息编号达到了 uint64::max 基本不可能
			add_cb(std::numeric_limits<chat_record_seq_t>::max());
		}
	}
	else
	{
		chat_add_task cur_add_task;
		cur_add_task.add_cb = add_cb;
		cur_add_task.detail = chat_info;
		cur_add_task.from = from_player_id;
		cur_add_task.chat_ts = chat_ts;
		m_add_tasks.push_back(std::move(cur_add_task));
	}
}

同步处理很简单,使用add_chat_impl添加最新聊天之后,直接调用add_cb来执行回调。而异步处理则需要将这个添加消息的操作放到当前chat_data_proxy的内部队列m_add_tasks中。最新的chat_doc和会检查这个队列中是否有值,并依照添加顺序执行:

void chat_data_proxy::on_ready()
{
	for (auto& one_cb : m_on_meta_doc_loaded_cbs)
	{
		one_cb(*this);
	}
	m_on_meta_doc_loaded_cbs.clear();
	for (auto &one_add_task : m_add_tasks)
	{
		auto cur_add_seq = m_next_seq;
		add_chat_impl(one_add_task.from, one_add_task.detail, one_add_task.chat_ts);
		one_add_task.add_cb(cur_add_seq);
	}
	m_add_tasks.clear();
	// 此处省略与添加消息无关的代码
}

类似的,查询聊天历史记录的时候也需要注意chat_data_proxy没有ready的问题,首先将对应的查询任务挂载到一个队列上,如果已经ready了则检查任务是否可以完成或者可以启动所需数据的加载:

void chat_manager::fetch_history(const std::string& chat_key, chat_record_seq_t seq_begin, chat_record_seq_t seq_end, std::function<void(const std::vector<chat_record>&)> fetch_cb)
{
	auto cur_chat_proxy = get_or_create_chat_data(chat_key);
	cur_chat_proxy->fetch_records(seq_begin, seq_end, fetch_cb);
}

void chat_data_proxy::fetch_records(chat_record_seq_t seq_begin, chat_record_seq_t seq_end, std::function<void(const std::vector<chat_record> &)> fetch_cb)
{
	std::vector<chat_record> temp_result;
	if (seq_end < seq_begin)
	{
		return fetch_cb(temp_result);
	}
	if (seq_end - seq_begin >= m_fetch_record_max_num)
	{
		return fetch_cb(temp_result);
	}
	if (ready()) // 如果已经ready了
	{
		if (seq_end >= m_next_seq) // 如果最大序列号大于最新序列号 说明请求非法
		{
			return fetch_cb(temp_result);
		}
		if (fetch_record_impl(seq_begin, seq_end, temp_result)) // 如果现有数据满足要求 立即执行
		{
			fetch_cb(temp_result);
			return;
		}
	}
	// 添加聊天记录获取任务
	chat_fetch_task cur_fetch_task;
	cur_fetch_task.chat_seq_begin = seq_begin;
	cur_fetch_task.chat_seq_end = seq_end;
	cur_fetch_task.fetch_cb = fetch_cb;
	m_fetch_tasks.push_back(std::move(cur_fetch_task));
	if (!m_is_ready) // 没有初始化的情况下 先暂存请求
	{
		return;
	}

	// 计算好要加载哪些doc_seq 
	auto cur_fetch_doc_begin = seq_begin / m_record_num_in_doc;
	auto cur_fetch_doc_end = seq_end / m_record_num_in_doc + 1;
	for (auto i = cur_fetch_doc_begin; i < cur_fetch_doc_end; i++)
	{
		auto cur_iter = m_loaded_docs.find(i);
		if (cur_iter == m_loaded_docs.end())
		{
			m_pending_load_docs.insert(i);
		}
	}
	
	if (m_fetch_tasks.size() != 1)
	{
		return;// 已经在执行加载任务了 等待之前的加载任务执行完
	}
	json::object_t temp_query;
	temp_query["chat_key"] = m_chat_key;
	temp_query["doc_seq"] = *m_pending_load_docs.rbegin();
	m_load_normal_func(m_chat_key, temp_query);
	return;
}

由于chat_data_proxyready之后也会检查这个m_fetch_tasks队列,所以在没有ready的情况下就不会发起最后的m_load_normal_func去查询数据库。同时如果m_fetch_tasks的大小不是1,说明现在已经在处理这个队列了,所以也不需要发起数据库查询任务。只有在ready下且自己是第一个添加查询任务的调用时才会发起数据查询。当一个chat_doc被加载之后,末尾会检查一下这个数据的加载是否会导致某个聊天历史查询任务所需数据都得到了满足:

bool chat_data_proxy::on_normal_doc_loaded(const json::object_t&cur_doc)
{
	// 这里省略了之前贴出的相关代码 新增加检查聊天记录查询的代码
	check_fetch_complete(cur_temp_doc_seq);
	if (!m_pending_load_docs.empty())
	{
		json::object_t temp_query;
		temp_query["chat_key"] = m_chat_key;
		temp_query["doc_seq"] = *m_pending_load_docs.rbegin();
		m_load_normal_func(m_chat_key, temp_query);
	}
	return true;
}

这里的check_fetch_complete实现的比较暴力,直接对所有的fetch_task进行检查,使用fetch_record_impl来判定是否满足回调执行条件:

void chat_data_proxy::check_fetch_complete(chat_record_seq_t cur_doc_seq)
{
	chat_record_seq_t cur_doc_record_seq_begin = cur_doc_seq * m_record_num_in_doc;
	chat_record_seq_t cur_doc_record_seq_end = cur_doc_record_seq_begin + m_record_num_in_doc;
	chat_record_seq_t has_cb_invoked = 0;
	std::vector<chat_record> cur_fetch_result;
	for (std::uint32_t i = 0; i < m_fetch_tasks.size(); i++)
	{
		auto &cur_cb = m_fetch_tasks[i];
		if (cur_cb.chat_seq_begin >= cur_doc_record_seq_end || cur_cb.chat_seq_end < cur_doc_record_seq_begin)
		{
			continue;
		}
		cur_fetch_result.clear();
		if (!fetch_record_impl(cur_cb.chat_seq_begin, cur_cb.chat_seq_end, cur_fetch_result))
		{
			continue;
		}
		cur_cb.fetch_cb(cur_fetch_result);
		cur_cb.chat_seq_begin = std::numeric_limits<chat_record_seq_t>::max(); // 标记已经执行的任务
		has_cb_invoked++;
	}
	// 删除已经执行了的任务
	for (std::uint32_t i = 0; i < m_fetch_tasks.size(); i++)
	{
		if (m_fetch_tasks[i].chat_seq_begin != std::numeric_limits<chat_record_seq_t>::max())
		{
			continue;
		}
		while (!m_fetch_tasks.empty() && m_fetch_tasks.back().chat_seq_begin == std::numeric_limits<chat_record_seq_t>::max())
		{
			m_fetch_tasks.pop_back();
		}
		if (i >= m_fetch_tasks.size())
		{
			break;
		}
		if (i + 1 != m_fetch_tasks.size())
		{
			std::swap(m_fetch_tasks[i], m_fetch_tasks.back());
		}
		m_fetch_tasks.pop_back();
	}
}

后面的那个for循环使用swap模式去删除已经执行完成的查询任务,同时维护队列的有序性。

如果每个聊天记录获取任务执行完成之后都在内存中释放对应的chat_doc,就会导致一些热点chat_doc被重复加载,这种情况在群组聊天中非常常见。但是获取任务执行完不去清楚这些chat_doc的话,随着聊天记录的拉取,m_loaded_docs这里的存储数据会不断的增多。为了限制这部分内存的增长,chat_doc上加入了一个ttl字段来代表其存活时间,每次一个新的chat_doc被创建时都会设置这个字段为m_default_expire_ttl,外部定期执行expire_loaded来删除不再被使用的chat_doc:

std::uint64_t chat_data_proxy::expire_loaded()
{
	std::unordered_set<chat_record_seq_t> doc_seq_needed;
	for (const auto& one_fetch_task : m_fetch_tasks)
	{
		auto cur_fetch_doc_begin = one_fetch_task.chat_seq_begin / m_record_num_in_doc;
		auto cur_fetch_doc_end = one_fetch_task.chat_seq_end / m_record_num_in_doc + 1;
		for (auto i = cur_fetch_doc_begin; i < cur_fetch_doc_end; i++)
		{
			doc_seq_needed.insert(i);
		}
	}
	auto current_doc_seq = m_next_seq / m_record_num_in_doc;
	
	const std::uint32_t always_in_loaded_doc_num = 3; // 最新的若干页面永驻
	for (std::uint32_t i = 0; i < always_in_loaded_doc_num; i++)
	{
		doc_seq_needed.insert(current_doc_seq);
		if (current_doc_seq == 0)
		{
			break;
		}
		current_doc_seq--;
	}
	std::vector<std::uint64_t> doc_seqs_to_delete;
	for (auto& one_pair : m_loaded_docs)
	{
		if (doc_seq_needed.find(one_pair.first) != doc_seq_needed.end())
		{
			one_pair.second.ttl = m_default_loaded_doc_ttl;
			continue;
		}
		one_pair.second.ttl--;
		if (one_pair.second.ttl == 0)
		{
			doc_seqs_to_delete.push_back(one_pair.first);
		}
	}
	for (auto one_doc_seq : doc_seqs_to_delete)
	{
		m_loaded_docs.erase(one_doc_seq);
	}
	return doc_seqs_to_delete.size();
}

这个expire_loaded不仅仅使用ttl字段来判定是否需要删除,如果一个chat_doc会被历史记录读取任务队列使用或者是最新的若干chat_doc,则暂时不处理其ttl,因为这些chat_doc即将被使用的概率是很大的。

同时由于chat_data_proxy也是chat_manager按需加载创建的,存储在std::unordered_map<std::string, std::shared_ptr<chat_data_proxy>> m_chat_datas中,随着请求的不断被处理这个map也是逐渐增大的。 chat_manager里不能无限制的去保留所有的chat_data_proxy,因此这里也是用一个ttl机制去剔除一定时间内不被使用的chat_data_proxy:

std::vector<std::string> chat_manager::tick_expire(chat_record_seq_t max_num)
{
	std::vector<const chat_data_proxy*> result_expire_datas;
	for (const auto& one_pair : m_chat_datas)
	{
		one_pair.second->expire_loaded();
		if (one_pair.second->dirty_count() == 0 && one_pair.second->safe_to_remove())
		{
			result_expire_datas.push_back(one_pair.second.get());
		}
	}
	std::sort(result_expire_datas.begin(), result_expire_datas.end(), [](const chat_data_proxy* a, const chat_data_proxy* b)
		{
			return a->m_create_ts > b->m_create_ts;
		});
	std::vector<std::string> result;
	chat_record_seq_t result_num = 0;
	for (int i = 0; i < max_num; i++)
	{
		if (result_expire_datas.empty())
		{
			break;
		}
		auto cur_back = result_expire_datas.back();
		result_expire_datas.pop_back();
		result.push_back(cur_back->m_chat_key);
		m_chat_datas.erase(cur_back->m_chat_key);
		result_num++;
	}
	return result;
}

这里优先剔除创建时间最早的max_num个可以被剔除的chat_data_proxy

聊天未读管理

未读消息管理也是聊天系统中的必要部分,所有的聊天系统中都会以红点加数字的形式来提示使用者指定的聊天会话中有多少条未读消息。在chat的属性设置中提供了一个字段m_next_msg_seq来存储当前会话的最新消息序列号,同时也有一个字段m_readed_msg_seq来存储客户端汇报上来的最大已读序列号,这两个序列号之间的差值就是就是未读消息的数量:

class Meta(property) chat_item: public spiritsaway::property::property_bag_item<std::string>
{
public:
	Meta(property(sync_clients, save_db)) std::uint64_t m_last_chat_ts = 0;
	Meta(property(sync_clients, save_db)) std::uint64_t m_next_msg_seq = 0;
	Meta(property(sync_clients, save_db)) std::uint64_t m_readed_msg_seq = 0;
	// 第一条可以拉到历史记录的消息编号
	Meta(property(sync_clients, save_db)) std::uint64_t m_visible_msg_seq = 0;
	Meta(property(sync_clients, save_db)) std::uint8_t m_chat_type = 0;
	Meta(property(sync_clients, save_db)) std::string m_other_id;
	#ifndef __meta_parse__
	#include "player/chat_item.generated.inch"
	#endif
};

玩家每次接收到服务端推送的聊天消息时,都会修改这个m_next_msg_seq:

// 被动接收消息成功
void player_chat_component::chat_add_msg_notify(const utility::rpc_msg& msg, const std::string& chat_key, std::uint64_t chat_ts, std::uint64_t msg_seq, const json::object_t& detail)
{
	auto cur_chat_prop_proxy = m_player->prop_proxy().chats().get_insert(chat_key);
	if(cur_chat_prop_proxy.other_id().get().empty())
	{
		auto cur_decode_result = misc::chat_utils::decode_chat_key(chat_key, m_owner->entity_id());
		cur_chat_prop_proxy.other_id().set(std::string(cur_decode_result.second));
		cur_chat_prop_proxy.chat_type().set(std::uint8_t(cur_decode_result.first));
	}
	if(msg_seq >= cur_chat_prop_proxy.next_msg_seq().get())
	{
		cur_chat_prop_proxy.next_msg_seq().set(msg_seq + 1);
		cur_chat_prop_proxy.last_chat_ts().set(chat_ts);
	}
	
	m_player->call_client(msg);
}

// 主动发送消息成功
void player_chat_component::chat_add_msg_reply(const utility::rpc_msg& msg, const std::string& chat_key, std::uint64_t chat_ts, std::uint64_t msg_seq, const json::object_t& detail)
{
	auto cur_chat_prop_proxy = m_player->prop_proxy().chats().get_insert(chat_key);

	if(msg_seq >= cur_chat_prop_proxy.next_msg_seq().get())
	{
		cur_chat_prop_proxy.next_msg_seq().set(msg_seq + 1);
		cur_chat_prop_proxy.last_chat_ts().set(chat_ts);
	}
	
	m_player->call_client(msg);
}

而已读消息序列号则需要客户端进行上报:

void player_chat_component::chat_set_readed_request(const utility::rpc_msg& msg, const std::string& chat_key, std::uint64_t msg_seq)
{
	auto cur_chat_prop_proxy = m_player->prop_proxy().chats().get_insert(chat_key);

	if(msg_seq >= cur_chat_prop_proxy.readed_msg_seq().get())
	{
		cur_chat_prop_proxy.readed_msg_seq().set(msg_seq + 1);
	}
}

这种方式可以非常简单的管理未读消息,但是这种维护方式与主流的消息未读设计很不一样。当前的未读消息设计下,客户端每次打开一个聊天会话的时候,都需要从readed_msg_seq的位置开始拉取后续连续的若干条消息,等到玩家查看新拉下的这些消息之后,客户端上报最新的已读序列号并设置到readed_msg_seq上,然后再继续重复这个循环直到没有未读消息,即整个读取未读消息的流程就是不断的下拉聊天记录。而像微信等主流聊天软件中的设计与之完全相反,客户端每次打开一个聊天会话时,展现的是最新的若干条聊天消息,汇报已读聊天消息需要提供当前显示窗口里对应的聊天序列号区间[a, b], 读取未读消息的流程就是不断的上拉聊天记录。同时由于可能在未清除所有未读消息的情况下又会接收到新的未读消息,所以这种系统中未读消息系统不能单独的存储一个最大已读流水号,而需要存储若干个不相交的已读序列号区间:

// [begin,  end) 左闭右开区间
struct chat_readed_region
{
	std::uint64_t begin;
	std::uint64_t end;
};

class unread_msg_mgr
{
	std::vector<chat_readed_region> readed_regions; // 所有已经设置为已读的不相交上升区间 
	std::uint64_t max_msg_seq; // 最新消息的编号
	std::uint64_t unread_msg_num; // 剩下未读消息的数量
	void add_new_msg(std::uint64_t new_msg_seq) // 添加一个新的未读消息
	{
		if (new_msg_seq > max_msg_seq)
		{
			unread_msg_num += new_msg_seq - max_msg_seq;
			max_msg_seq = new_msg_seq;
		}
	}
	void mark_all_readed()
	{
		readed_regions.clear();
		readed_regions.push_back(chat_readed_region{ 0, max_msg_seq + 1 });
	}
	void mark_readed(std::uint64_t begin, std::uint64_t end); // 执行区间合并相关操作
};

这里的mark_readed需要传入一个与readed_regions里任意区间都不相交的区间,实现时需要比较小心的处理左右两侧已读区间的合并操作,具体可以分为下面的四种情况:

  1. 不与任何区间进行合并,创建一个新区间[begin, end)
  2. 与左侧区间合并,修改左侧区间的end为新的end
  3. 与右侧区间合并, 修改右侧区间的begin为新的begin
  4. 触发左右两侧区间的合并,三个连续区间合并为一个大区间

所以代码实现上有比较多的if判断,需要提前规划好每个分支需要处理的逻辑:

void mark_readed(std::uint64_t begin, std::uint64_t end)
{
	if (end <= begin || end > max_msg_seq + 1 || begin > max_msg_seq)
	{
		return;
	}
	std::uint64_t i = 0;
	for (; i < readed_regions.size(); i++)
	{
		if (readed_regions[i].begin > begin)
		{
			break;
		}
	}
	bool merge_left = false;
	bool merge_right = false;
	if (i != 0)
	{
		assert(readed_regions[i - 1].end <= begin);
		if (readed_regions[i - 1].end == begin)
		{
			readed_regions[i - 1].end = end;
			merge_left = true;
		}
	}
	if (i != readed_regions.size())
	{
		assert(readed_regions[i].begin >= end);
		if (readed_regions[i].begin == end)
		{
			readed_regions[i].begin = begin;
			merge_right = true;
		}
	}
	if (merge_left && merge_right)
	{
		readed_regions[i - 1].end = readed_regions[i].end;
		readed_regions.erase(readed_regions.begin() + i);
	}
	if (!merge_left && !merge_right)
	{
		readed_regions.insert(readed_regions.begin() + i, chat_readed_region{ begin, end });
	}
	unread_msg_num -= end - begin;
}

上面的实现代码里的for循环其实可以利用readed_regions里的有序性质来用二分搜索来快速定位,这样时间复杂度可以从线性降低为对数。