Mosaic Game 的网络通信设计

连接管理

mosaic_game为了简化实现复杂度,在网络通信这里并没有考虑UDP,只支持了TCP的通信。同时为了支持上层逻辑中的断线重连与Service的动态迁移,构造出来了一个概念叫做anchor即锚点,作为整个mosaic_game的通信网络中的逻辑层通信通道来使用。底层的网络数据收发管理类是由asio::tcp::socket的封装net_connection作为基类:

class net_connection : public std::enable_shared_from_this<net_connection>
{
	asio::io_context& io;
	asio::ip::tcp::socket remote;
	std::shared_ptr<net_controller> net_controller_;
	block_t read_buffer;
	block_t send_buffer;
	std::shared_ptr<std::string> connection_name;
	std::string remote_endpoint_name;
};

但是业务层并不在乎消息的传递细节,消息的具体来源并不重要,所以业务收发消息的时候并不直接对接这个net_connection。桥接业务层的对象是net_controller,这个类型对接业务逻辑收发数据。

class net_controller
{
public:
	// 数据接收到时的回调 length为成功读取的数据大小 返回值为错误信息
	virtual std::string on_data_read(const unsigned char* data, std::size_t length) = 0;
	// 数据发送时的回调 length为发送成功的数据大小 返回值为错误信息
	virtual std::string on_data_send(const unsigned char* data, std::size_t length) = 0;
	// 尝试获取一个完整数据包,返回下一次读取数据时的最少数据大小
	virtual std::size_t data_should_read(std::size_t max_length) = 0;
	// 往buffer中填充发送数据,返回填充的数据大小
	virtual std::size_t data_should_send(unsigned char* buffer, std::size_t max_length) = 0;
	virtual ~net_controller()
	{
		
	}
};

这个net_controller是一个纯虚类型,声明了一些接口函数,目前的实例化类型是net_channel来中转网络层数据。net_channel主要实现机制是提供两个线程安全的消息队列output_channelinput_channel,分别作为发送消息队列与接收消息队列。这样业务层读写数据只需要读写这两个队列,而无需关心底层信道的实现。同时net_connection里包含net_controller的指针,数据收发时会通过这个指针来调用相关的接口函数,来读写这两个队列。


class net_channel :public net_controller
{
	struct packet_header
	{
		std::uint32_t total_sz; // 当前包的完整大小
		std::uint16_t packet_cmd; // 当前包的消息类型
		std::uint8_t from_name_sz; // 发送者的anchor长度
		std::uint8_t dest_name_sz;// 目的地的anchor长度
	};

protected:
	// 下面两个成员是线程安全队列
	std::shared_ptr<channel<con_msg_task>> input_channel; // 接收到的packet放入到这个队列 等待业务层处理
	std::shared_ptr<channel<msg_task>> output_channel; // 业务层传递过来的发送数据
	input_block_buffer input_buffer; // 未读取完的多个packet所在的缓冲区
	output_block_buffer output_buffer; // 未发送完的多个packet所在的缓冲区

	std::vector<msg_task> sending_tasks; // 当前正在发送的多个packet
	std::shared_ptr<net_connection> connection;
	std::uint64_t m_temp_data_send_length = 0;
public:
	net_channel(std::shared_ptr<net_connection> in_connection, std::shared_ptr<channel<con_msg_task>> in_input_channel, std::shared_ptr<channel<msg_task>> in_output_channel);
}

此外为了管理net_channel所需的各项资源,额外在这个net_channel之外再构造了一个connection_resource对象,用来存储这个网络连接所绑定线程安全输入输出队列:

struct connection_resource
{
	std::shared_ptr<net_channel> connection_controller;
	std::unordered_set<std::string> anchors;
	std::shared_ptr<channel<con_msg_task>> input_channel;
	std::shared_ptr<channel<msg_task>> output_channel;

};

这里的anchors存储了在这个物理连接上暴露出来的所有实体地址的集合。业务主线程负责对接这个connection_resource的类型为network_router,每次有新连接创建时会创建一个对应的connection_resource,创建这个连接的输入输出队列,以及对应的队列控制块net_channel:

connection_resource* network_router::create_connection_resource(std::shared_ptr<net_connection> con)
{
	auto cur_iter = m_connection_resources.find(con.get());
	if (cur_iter != m_connection_resources.end())
	{
		return cur_iter->second.get();
	}
	auto new_resource = std::make_unique<connection_resource>();
	auto result = new_resource.get();
	new_resource->input_channel = m_input_msg_queue;
	new_resource->output_channel = std::make_shared<mutex_channel<msg_task>>();
	auto connection_channel = std::make_shared<net_channel>(con, new_resource->input_channel, new_resource->output_channel);

	new_resource->connection_controller = connection_channel;
	m_connection_resources.emplace(con.get(), std::move(new_resource));
	if (!con->is_outbound_connection())
	{
		m_inbound_connections[con->inbound_connection_idx] = con.get();
	}
	return result;
}

为了在逻辑层维护这个net_channelbasic_stub需要感知到物理连接的建立和断开,要求这些事件发生时推送一些连接控制消息到basic_stub中处理:

enum class connection_ctrl_msg_type
{
	// 主动连接建立
	on_connect = 0,
	// 连接断裂
	on_disconnect,
	// 被动连接建立
	on_accepted,
};
struct connection_ctrl_msg
{
	connection_ctrl_msg_type type;
	std::shared_ptr<net_connection> connection;
};

void basic_stub::on_conn_ctrl_msg(const network::connection_ctrl_msg& msg)
{
	switch (msg.type)
	{
	case network::connection_ctrl_msg_type::on_connect:
		return on_connect(msg.connection);
	case network::connection_ctrl_msg_type::on_disconnect:
		return on_disconnected(msg.connection);
	case network::connection_ctrl_msg_type::on_accepted:
		return on_accepted(msg.connection);
	default:
		break;
	}
}

连接控制消息的添加与读取都通过mutex_channel<connection_ctrl_msg>这个线程安全队列来实现,避免可能出现的多线程问题:


mutex_channel<connection_ctrl_msg> m_conn_ctrl_msgs;

void network_router::push_ctrl_msg(std::shared_ptr<net_connection> conn, connection_ctrl_msg_type msg_type)
{
	connection_ctrl_msg cur_msg{ msg_type, conn };
	m_conn_ctrl_msgs.push_msg(std::move(cur_msg));
}

void network_router::poll_ctrl_msg(const conn_ctrl_msg_callback_t& msg_handler)
{
	std::array<connection_ctrl_msg, 10> temp_tasks;
	while (true)
	{
		auto cur_poll_size = m_conn_ctrl_msgs.pop_bulk_msg(temp_tasks.data(), temp_tasks.size());
		if (cur_poll_size == 0)
		{
			return;
		}
		for (std::size_t i = 0; i < cur_poll_size; i++)
		{
			msg_handler(temp_tasks[i]);
		}
	}
}

当物理连接被建立时,会添加on_connected或者on_accepted消息,具体的消息类型取决于自己是不是监听方:

void net_connection::on_connected()
{
	router->push_ctrl_msg(shared_from_this(), connection_ctrl_msg_type::on_connect);
	request_send_data();
	this->async_read_data(true);

}
void basic_stub::do_accept()
{
	auto cur_listen_socket = std::make_shared<asio::ip::tcp::socket>(m_io_context);

	m_asio_wrapper->m_acceptor.async_accept([this](const asio_error_code& error, asio::ip::tcp::socket socket)
		{
			if (!m_asio_wrapper->m_acceptor.is_open())
			{
				return;
			}
			if(error)
			{
				m_logger->error("async_accept with error {}", error.message());
				return;
			}
			anchor_endpoint remote_endpoint(socket.remote_endpoint().address().to_string(), socket.remote_endpoint().port(), endpoint_type::tcp);

			auto cur_connection_idx = ++m_inbound_connection_counter;
			std::shared_ptr<network::net_connection> connection;
			if (m_local_server.rsa_key.empty())
			{
				connection = network::net_connection::create(m_io_context, std::move(socket), cur_connection_idx, m_logger, m_connection_timeout, m_router.get());
			}
			else
			{
				connection = network::encrypt_connection::create(m_io_context, std::move(socket), cur_connection_idx, m_logger, m_connection_timeout, m_router.get(), m_local_server.rsa_key);
			}
			
			this->m_router->push_ctrl_msg(connection, network::connection_ctrl_msg_type::on_accepted);
			//connection->start_as_server();
			//this->router->accept_endpoint(connection);
			do_accept();
		});
}

这里的on_accepted对应的是当前进程作为监听服务器收到了一个入站连接,其逻辑很简单,开启对消息的接收:

void basic_stub::on_accepted(std::shared_ptr<network::net_connection> connection)
{
	connection->start_as_server();
	m_router->accept_endpoint(connection);
}

这里的accept_endpoint会为这个连接构造一个代理对象connection_resource,维护这个连接对应的收发队列input_channeloutput_channel:

bool network_router::accept_endpoint(std::shared_ptr<net_connection> in_connection)
{
	auto resource_ptr = create_connection_resource(in_connection);
	in_connection->set_controller(resource_ptr->connection_controller);
	return true;
}

on_connect刚好与on_accepted相反,代表本进程的一个出站连接被成功接收,这里的处理逻辑是开启这个连接的保活,定期发送一个心跳包:

void basic_stub::on_connect(std::shared_ptr<network::net_connection> connection)
{
	const auto& cur_connection_name = connection->get_connection_name();
	m_logger->info("on_connect for {}", *cur_connection_name);
	m_keep_alive_servers[*cur_connection_name] = connection;
}

void basic_stub::keep_alive_callback()
{
	std::string temp_keep_alive_anchors;
	for (const auto& one_server : m_keep_alive_servers)
	{
		if (one_server.second)
		{
			send_keep_alive(one_server.first);
			temp_keep_alive_anchors += one_server.first + " ";
		}
	}
	if(!temp_keep_alive_anchors.empty())
	{
		m_logger->debug("keep_alive_callback servers {}", temp_keep_alive_anchors);
	}
	
	add_timer_with_gap(std::chrono::milliseconds(m_connection_timeout / 2), [this]()
		{
			this->keep_alive_callback();
		});
}

当物理连接断开的时候,会往network_router里推送连接断开消息:

void net_connection::close_connection()
{
	if (stopped)
	{
		return;
	}
	stopped = true;
	this->cancel_all_timers();

	if (connection_name)
	{
		logger->error("close connection for {} {}", *connection_name, remote_endpoint_name);
	}
	else
	{
		logger->error("close connection for {} ", remote_endpoint_name);
	}
	asio_error_code ec;
	if (this->remote.is_open())
	{
		this->remote.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
		this->remote.close(ec);
	}
	if (net_controller_)
	{
		net_controller_.reset();
	}
	router->push_ctrl_msg(shared_from_this(), connection_ctrl_msg_type::on_disconnect);
	return;
}

对应的回调处理on_disconnected则复杂一些,需要考虑一下异常断线还是正常断线,是否需要重新连接。

void basic_stub::on_disconnected(std::shared_ptr<network::net_connection> connection)
{
	m_router->disconnect(connection);

	const auto& cur_connection_name = connection->get_connection_name();
	if (!cur_connection_name || cur_connection_name->empty())
	{
		m_logger->info("empty conn name for {}", connection->get_remote_endpoint_name());
		return;
	}

	if (!should_reconnect(connection))
	{
		return;
	}

	// re connect
	m_logger->info("reconnect to server {} after {} ms", *cur_connection_name, m_timer_check_gap_ms);
	add_timer_with_gap(std::chrono::milliseconds(m_timer_check_gap_ms), [cur_connection_name, this]()
		{
			connect_to_server(*cur_connection_name);
		});
}

数据发送

net_channel中用一个packet_header的结构体来进行封包,成员变量packet_seq代表包的流水号,total_sz代表当前包的整体长度,packet_cmd代表当前包里业务数据的类型:

struct packet_header
{
	std::uint64_t packet_seq; // 当前包序列号 
	std::uint32_t total_sz; // 当前包的完整大小
	std::uint16_t packet_cmd; // 当前包的消息类型
	std::uint8_t from_name_sz; // 发送者的anchor长度
	std::uint8_t dest_name_sz;// 目的地的anchor长度
};

这里的packet_seq只有在客户端与服务器之间的数据包才会赋值,其他情况下默认为0,发送数据的时候network_router提供如下几个接口:

bool push_msg(const std::string& from, const std::string& dest, const std::string& data, std::uint16_t cmd);
void broadcast_msg(const std::string& from, const std::vector<std::string>& ids, const std::string& data, std::uint16_t cmd);

bool push_msg(const std::string& from, const std::string& dest, std::shared_ptr<const std::string> data, std::uint16_t cmd);
bool push_msg(std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data , std::uint16_t cmd, msg_seq_t msg_seq = 0);
;
bool push_msg(const net_connection* connection, std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd, msg_seq_t msg_seq = 0);
bool push_msg(std::uint64_t inbound_con_idx, std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd);

这里的from to都是anchor,每个anchor最多绑定一个net_connection

class anchor_resource
{
	friend class anchor_collection;
public:
	const std::string name;
private:
	const net_connection* connection;

	channel<msg_task> output_channel;
};

往一个anchor发送消息执行逻辑分为两种情况:

  1. 如果有绑定的net_connection, 就是往对应的net_connectionoutput_channel里添加队尾数据。
  2. 如果没有绑定的net_connection,则往自身的output_channel里添加队尾数据

当一个anchor绑定到net_connection时,将output_channel里的数据拼接到net_connectionoutput_channel中。当一个net_connection断开时,按序取出output_channel中的所有数据,投递到对应的anchor_resource内部的output_channel中。在这样的设计下,物理连接的断开不至于导致发送消息的丢失。这样的机制下可以保证一个anchor切换连接时,其数据的发送仍然保持可靠且有序。

bool network_router::push_msg(std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd)
{
	if (!dest)
	{
		dest = m_empty_dest;
	}
	if(!from)
	{
		from = m_empty_dest;
	}
	if(dest->rfind(m_local_anchor_name, 0) == 0)
	{
		// 说明是本进程地址 直接推送数据到input_msg_queue
		network::con_msg_task local_msg_task;
		local_msg_task.first = {};
		local_msg_task.second = msg_task::construct(from, dest, data, cmd);
		m_input_msg_queue->push_msg(std::move(local_msg_task));
	}
	auto cur_proxy_resource = m_anchor_collection.find_proxy_for_anchor(*dest);
	if(!cur_proxy_resource)
	{
		m_logger->error("push_msg cant find anchor_resources from {} dest {}  data {}", *from, *dest, *data);

		return false;
	}
	auto cur_proxy_con = cur_proxy_resource->get_connection();
	if (cur_proxy_con)
	{
		if (push_msg(cur_proxy_con, from, dest, data, cmd))
		{
			return true;
		}
	}
	return cur_proxy_resource->try_push(from, dest, data, cmd);

}

上述的消息发送接口并不直接把发送数据打包为packet_header,而是封装为一个中间类型msg_task,添加到对应的发送队列中:

// bool network_router::push_msg(const net_connection* connection, std::shared_ptr<const std::string> from, std::shared_ptr<const std::string> dest, std::shared_ptr<const std::string> data, std::uint16_t cmd)
cur_connection_resource_iter->second->output_channel->push_msg(msg_task::construct(from, dest, data, cmd));


using msg_seq_t = std::uint64_t;
template <typename T, typename U>
struct channel_task
{
	std::shared_ptr<const T> data;
	std::shared_ptr<const U> dest;
	std::shared_ptr<const U> from;
	std::uint16_t cmd;
};
using msg_task = channel_task<std::string, std::string>;

这个类型的存在可以更好的维护发送与接受数据的生命周期,同时构造出逻辑层的消息包概念。但是真正执行消息发送的时候我们需要将这个msg_task转换到之前定义好的packet_header规定的格式。由于TCP是一个数据流协议,所以每次发送的时候可以发送多个包数据,为此我们提供了一个缓冲区结构output_block_buffer来保存正在发送的多个包数据。底层网络连接在被建立时,会立即调用request_send_data来尝试填充缓冲区并发送:

void net_connection::on_connected()
{
	router->push_ctrl_msg(shared_from_this(), connection_ctrl_msg_type::on_connect);
	request_send_data();
	this->async_read_data(true);

}

void net_connection::request_send_data()
{
	if (net_controller_)
	{
		auto send_msg_sz = net_controller_->data_should_send(send_buffer.data(), send_buffer.size());
		if (send_msg_sz)
		{
			async_send_data(send_msg_sz);
			return;
		}
	}
	cancel_timer(timer_type::check_send);

	set_timer(timer_type::check_send, std::chrono::milliseconds(5));
}

这里的net_channel::data_should_send来不断的获取发送队列中的msg_task来填充output_buffer对应的数据缓冲区,然后将相关数据复制到真正的发送缓冲区buffer,直到发送队列为空或者缓冲区已经满了。此函数内部会进行packet_header封包以及按buffer::max_length进行拆包。

std::size_t net_channel::data_should_send(unsigned char* buffer, std::size_t max_length)
{
	
	std::size_t consume_sz = 0;
	// 从output_buffer中提取剩下的数据到buffer中
	consume_sz += output_buffer.consume(buffer + consume_sz, max_length - consume_sz);
	channel_task<std::string, std::string> temp_task;
	while(consume_sz < max_length)
	{
		// 如果buffer没有满 则尝试从channel中获取下一个发送的task
		if (!output_channel->pop_msg(temp_task))
		{
			break;
		}
		packet_header header;
		header.dest_name_sz = std::uint8_t(temp_task.dest->size());
		header.from_name_sz = std::uint8_t(temp_task.from->size());
		header.packet_cmd = temp_task.cmd;
		header.total_sz = sizeof(packet_header) + header.from_name_sz + header.dest_name_sz + static_cast<std::uint32_t>(temp_task.data->size());
		//std::cout << "cur packet sz is " << header.total_sz << std::endl;

		auto header_ptr = std::make_shared<std::string>(reinterpret_cast<const char*>(&header), sizeof(packet_header));
		output_buffer.add(header_ptr);
		output_buffer.add(temp_task.from);
		output_buffer.add(temp_task.dest);
		output_buffer.add(temp_task.data);
		// 将新的发送task打包好之后放入到output_buffer中 然后继续填充buffer
		consume_sz += output_buffer.consume(buffer + consume_sz, max_length - consume_sz);
		m_sending_tasks.push_back(temp_task);
		m_packet_send_counter++;
		m_data_send_sz += header.total_sz;
	}
	return consume_sz;
}

这里的sending_task记录了当前正在output_buffer缓冲区内packet,这个数组的存在是为了支持后续的断线重连操作,

void net_connection::async_send_data(std::size_t size)
{
	logger->trace("async_send_data with data size {}",  size);
	async_send_data_impl(0, size);
}
void net_connection::async_send_data_impl(std::size_t offset,  std::size_t total_size)
{
	auto self(this->shared_from_this());
	this->set_timer(timer_type::send, timeout);

	this->remote.async_write_some(asio::buffer(send_buffer.data() + offset, total_size - offset),
		asio::bind_executor(this->strand, [this, self, offset, total_size](const asio_error_code& error, std::size_t bytes_transferred)
	{
		if (this->cancel_timer(timer_type::send))
		{
			if (!error)
			{
				on_data_send(bytes_transferred);
				if (bytes_transferred + offset  < total_size)
				{
					logger->trace("send with bytes transferred {}", bytes_transferred);
					this->async_send_data_impl(offset + bytes_transferred, total_size);
				}
				else
				{
					on_buffer_all_send(total_size);
				}
			}
			else
			{
				logger->warn("report error at {}", "async_send_data_impl");
				this->on_error(error);
			}
		}
	})
	);
}
void net_connection::on_data_send(std::size_t bytes_transferred)
{
	if (net_controller_)
	{
		auto control_msg = net_controller_->on_data_send(send_buffer.data(), bytes_transferred);
		if (!control_msg.empty())
		{
			report_error(fmt::format("on_data_send error with msg {}", control_msg));
			return;
		}
	}
}

这里的net_connection::async_send_data_impl负责逐渐的发送缓冲区的数据,每次发送若干字节都会调用net_connection::on_data_send来通知network_channel::on_data_send,当缓冲区里的所有数据都发送完成之后,通过net_connection::on_buffer_all_send去从network_channel中获取新数据来填充net_connectionbuffer:

void net_connection::on_buffer_all_send(std::size_t total_size)
{
	logger->trace("on_buffer_all_send  with size {}", total_size);
	request_send_data();
}

void net_connection::request_send_data()
{
	if (net_controller_)
	{
		auto send_msg_sz = net_controller_->data_should_send(send_buffer.data(), send_buffer.size());
		if (send_msg_sz)
		{
			async_send_data(send_msg_sz);
			return;
		}
	}
	cancel_timer(timer_type::check_send);

	set_timer(timer_type::check_send, std::chrono::milliseconds(5));
}

void net_connection::on_timeout(timer_type cur_timer)
{
	if (cur_timer == timer_type::check_send)
	{
		return request_send_data();
	}
	else if (cur_timer == timer_type::check_read)
	{
		return request_read_data();
	}
	logger->error("on_timeout for timer {}", timer_type_to_string(cur_timer));
	report_error( fmt::format("on_timeout for timer {}", timer_type_to_string(cur_timer)));
	return;
}

如果request_send_data里发现network_channel中的发送队列已经空了,则会开启一个计时器来定期检查发送队列里是否有新数据的到来。这里的开启timer来检查新数据是一个偷懒的方法,正确的方式应该是给这个network_channel设置一个队列增加新数据的回调,但是由于这里是多线程操作,实现一个正确的多线程通知接口可能有点复杂,所以加一个计时器最省事,代价就是新数据的发送可能有5ms的延迟。

数据接收

对应的解包逻辑在net_channel::on_data_read里,每收到一个完整的packet都会将这个packet放到input_channel的末尾:

void net_channel::on_data_read(const unsigned char* data, std::size_t length)
{
	input_buffer.add(data, length);
	while (true)
	{
		// 如果读取到的数据小于packet_header的长度 则等待下一次填充数据
		if (input_buffer.total_size() < sizeof(packet_header))
		{
			break;
		}
		std::uint32_t total_sz = 0;
		input_buffer.consume(reinterpret_cast<std::uint8_t*>(&total_sz), sizeof(std::uint32_t), true);
		// 获取当前packet的长度
		if (total_sz <= input_buffer.total_size())
		{
			// 如果读取到的数据大小大于当前packet的总长度 则代表接收到了一个包 开始解包
			channel_task<std::string, std::string> cur_task;
			packet_header cur_header;
			input_buffer.consume(reinterpret_cast<std::uint8_t*>(&cur_header), sizeof(packet_header), false);
			std::uint32_t cur_data_sz = total_sz - sizeof(packet_header) - cur_header.from_name_sz - cur_header.dest_name_sz;
			auto from_str = std::make_shared<std::string>(cur_header.from_name_sz, '0');
			input_buffer.consume(reinterpret_cast<std::uint8_t*>(from_str->data()), cur_header.from_name_sz, false);
			auto dest_str = std::make_shared<std::string>(cur_header.dest_name_sz, '0');
			input_buffer.consume(reinterpret_cast<std::uint8_t*>(dest_str->data()), cur_header.dest_name_sz, false);
			auto data_str = std::make_shared<std::string>(cur_data_sz, '0');
			input_buffer.consume(reinterpret_cast<std::uint8_t*>(data_str->data()), cur_data_sz, false);
			cur_task.data = std::move(data_str);
			cur_task.from = std::move(from_str);
			cur_task.dest = std::move(dest_str);
			cur_task.cmd = cur_header.packet_cmd;
			m_data_read_sz += cur_header.total_sz;
			m_packet_last_read_seq = cur_task.msg_seq;
			// 接收到的包发送到input_channel等待业务层轮询
			input_channel->push_msg(std::make_pair(connection, cur_task));
			// 执行下一次while 因为可能已经接收到了多个包
		}
		else
		{
			break;
		}
	}
}

这里所有的net_controllerinput_channel指向的都是network_router里的m_input_msg_queue,这样就方便业务层去处理接收到的数据:


std::size_t network_router::poll_msg(const msg_handle_callback_t& msg_handler)
{
	std::array<con_msg_task, 10> temp_tasks;
	std::size_t result = 0;
	while (true)
	{
		auto cur_poll_size = m_input_msg_queue->pop_bulk_msg(temp_tasks.data(), temp_tasks.size());
		result += cur_poll_size;
		if (cur_poll_size == 0)
		{
			return result;
		}
		for (std::size_t i = 0; i < cur_poll_size; i++)
		{
			if (!msg_handler(temp_tasks[i].first, temp_tasks[i].second))
			{
				m_logger->error("fail to handler msg dest {}  cmd {} info {}", *(temp_tasks[i].second.dest),   temp_tasks[i].second.cmd, temp_tasks[i].second.data->substr(0, 50));
			}
			temp_tasks[i].second.clear();
			temp_tasks[i].first.reset();

		}
	}
}

然后在进程的主循环里会传递业务方自己的消息handler进去来处理消息:

void basic_stub::main_loop()
{
	m_logger->flush();
	auto cur_msg_handler = [this](std::shared_ptr<network::net_connection> con, const network::msg_task& one_msg)
	{
		return this->on_msg(con, one_msg);
	};
	auto cur_http_handler = [this](const http_utils::request& req, msg_seq_t req_seq)
	{
		return this->on_http_request(req, req_seq);
	};

	auto cur_conn_ctrl_msg_handler = [this](const network::connection_ctrl_msg& msg)
	{
		return this->on_conn_ctrl_msg(msg);
	};
	do
	{
		on_new_frame();
		poll_mainloop_tasks();
		auto poll_begin_ts = utility::timer_manager::now_ts();
		m_router->poll_msg(cur_msg_handler);
		http::http_request_mgr::poll_request(cur_http_handler);
		poll_timers(utility::timer_manager::now_ts());
		m_router->poll_ctrl_msg(cur_conn_ctrl_msg_handler);
		auto poll_end_ts = utility::timer_manager::now_ts();

		if (poll_end_ts - poll_begin_ts > m_high_load_threshold * m_timer_check_gap_ms)
		{
			continue;
		}
		else
		{
			break;
			
		}

	}while (!m_stopped);
}

以这样的形式来处理已经完整接受的消息数据,以线程安全队列的形式来保证消息已经在主线程处理,可以大大的减少逻辑编程的难度。

basic_stub的类型定义中,on_msg函数被声明为了一个纯虚类,因为在basic_stub中并没有提供进程间通信的消息编码格式。真正的业务消息分发是在basic_stub的直接子类json_stub以及json_stub的后续子类之中。在json_stub中,给出了一个非常基础的消息格式定义。开头是一个uint16格式的消息包类型说明符,由两个uint8拼接而成,其中第一个uint8由全局枚举类型packet_cmd控制,第二个uint8的意义则完全依赖与第一个uint8的值。当前mosaic_gamepacket_cmd中提供了七种消息类型:

enum class packet_cmd: std::uint8_t
{
	server_control = 0, // 进程控制消息
	client_to_game, // 客户端向服务端发消息
	game_to_client, // 服务端向客户端发消息
	server_rpc_msg, // 服务器之间的json rpc消息
	server_raw_msg, // 服务器之间的非json 消息
	entity_msg, 	// 发往场景进程里entity的消息
	actor_migrate_msg, // actor的迁移控制消息
	max,
};

为了方便的对这个消息包类型两个uint8的拼接与解析,提供了两个辅助函数:

struct packet_cmd_helper
{
	static std::uint16_t encode(packet_cmd in_packet_cmd, std::uint8_t in_cmd_detail)
	{
		std::uint16_t result = std::uint16_t(in_packet_cmd);
		result <<= 8;
		result += in_cmd_detail;
		return result;
	}

	static std::pair<packet_cmd, std::uint8_t> decode(std::uint16_t in_combine_cmd)
	{
		std::pair<packet_cmd, std::uint8_t> result;
		result.second = in_combine_cmd % 256;
		result.first = packet_cmd(in_combine_cmd / 256);
		return result;
	}
};

basic_stub的直接基类json_stub中,只提供了最基础的server_control类型消息的分发,用来处理进程之间的相互注册:

bool json_stub::on_msg(std::shared_ptr<network::net_connection> con, const network::msg_task& one_msg)
{
	
	auto cur_cmd_detail = enums::packet_cmd_helper::decode(one_msg.cmd);
	switch (cur_cmd_detail.first)
	{
	case enums::packet_cmd::server_control:
		parse_and_dispatch_server_control_msg(con, one_msg);
		return true;
		break;

	default:
		return false;
	}
}

这里的parse_and_dispatch_server_control_msg就是将msg_task里的data字符串解析为一个json::object,内部包含一个类型为字符串的cmd字段和一个类型为json::objectparam字段。解析完成之后再调用on_server_control_msg来处理进程角色的增删改查指令:

bool json_stub::on_server_control_msg(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from,const std::string& cmd, const json& msg)

至于其他类型的消息处理,将放到RPC相关章节进行介绍。

在这个on_server_control_msg函数中处理了如下六种进程管理指令:

  1. set_stub_info,将当前网络连接关联一个进程角色,这个指令都是在连接发起者在连接建立之后的json_stub::on_connect函数中发出的
  2. query_stub_info,查询一个指定名字的进程角色信息,或者指定upstream的下游角色信息列表,对应的返回消息为reply_query_stub_info
  3. send_keep_alive, 进程之间的心跳处理,由于连接长时间没有接收到消息会认为此连接已经断开,所以连接发起者会定期的发送这个心跳包来保活,对应的返回消息为reply_send_keep_alive
  4. remove_stub_info,通知删除一个进程角色,一般是mgr_server通知一个进程的下线时对全服务器集群进行广播
  5. notify_stop,通知当前进程启动关服流程
  6. notify_clear_connection,关服流程中的一个子流程,通知所有的space_serverservice_server开始断开所有的网络连接

在不同的json_stub的子类中,还可以继续拓展on_server_control_msg接受的命令类型,例如在redis_server中就额外支持了redis_request类型:

bool redis_server::on_server_control_msg(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const std::string& cmd, const json& msg)
{
	if(json_stub::on_server_control_msg(con, from, cmd, msg))
	{
		return true;
	}
	if (cmd == "redis_request")
	{
		std::string errcode;
		redis::redis_task_desc cur_task_desc;
		std::uint64_t callback_id;
		do
		{
			try
			{
				msg.at("callback_id").get_to(callback_id);
				msg.at("request_detail").get_to(cur_task_desc);
			}
			catch (std::exception &e)
			{
				errcode = "invalid redis request msg";
				m_logger->error("on_server_control_msg fail to parse {} error {}", msg.dump(), e.what());
				break;
			}
		} while (0);
		// 这里省略了具体的消息处理逻辑
	}
	return false;
}

数据加密

为了避免客户端与服务器之间的消息协议被外挂人员轻松解析,客户端与服务器之间的消息基本都会经过加密之后再进行传输。在MosaicGame中,同样提供了加密连接的功能,在basic_stub::do_accept函数中,如果发现自己配置了rsa_key,则会将传入连接升级为加密连接encrypt_connection,否则就是普通的非加密连接net_connection:

void basic_stub::do_accept()
{
	auto cur_listen_socket = std::make_shared<asio::ip::tcp::socket>(m_io_context);

	m_asio_wrapper->m_acceptor.async_accept([this](const asio_error_code& error, asio::ip::tcp::socket socket)
		{
			if (!m_asio_wrapper->m_acceptor.is_open())
			{
				return;
			}
			if(error)
			{
				m_logger->error("async_accept with error {}", error.message());
				return;
			}
			anchor_endpoint remote_endpoint(socket.remote_endpoint().address().to_string(), socket.remote_endpoint().port(), endpoint_type::tcp);

			auto cur_connection_idx = ++m_inbound_connection_counter;
			std::shared_ptr<network::net_connection> connection;
			if (m_local_server.rsa_key.empty())
			{
				connection = network::net_connection::create(m_io_context, std::move(socket), cur_connection_idx, m_logger, m_connection_timeout, m_router.get());
			}
			else
			{
				connection = network::encrypt_connection::create(m_io_context, std::move(socket), cur_connection_idx, m_logger, m_connection_timeout, m_router.get(), m_local_server.rsa_key);
			}
			
			this->m_router->push_ctrl_msg(connection, network::connection_ctrl_msg_type::on_accepted);
			//connection->start_as_server();
			//this->router->accept_endpoint(connection);
			do_accept();
		});
}

对应的在主动发起连接的一方会根据目标服务器是否配置了rsa_key来选择使用加密连接还是普通连接,这样就能保证连接的双方同时启用加密连接或者普通连接:

bool network_router::connect_endpoint(const std::string& name, const anchor_endpoint& endpoint, const std::string& rsa_key)
{
	std::string log_key = fmt::format("{} {}:{}", name, endpoint.host, int(endpoint.port));
	auto connection_logger = utility::get_logger(log_key);
	if (m_named_connection.find(name) != m_named_connection.end())
	{
		m_logger->error("connection for {} already exist", name);
		return false;
	}
	m_logger->info("get logger for {}", log_key);
	connection_logger->info("begin connect_endpoint");
	std::shared_ptr<network::net_connection> cur_connection;
	if (rsa_key.empty())
	{
		cur_connection = net_connection::create(m_io_context, asio::ip::tcp::socket(m_io_context), 0, connection_logger, m_timeout, this);
	}
	else
	{
		cur_connection = encrypt_connection::create(m_io_context, asio::ip::tcp::socket(m_io_context), 0, connection_logger, m_timeout, this, rsa_key);
	}
	// 省略一些代码
}

由于只有客户端与服务器之间的通信有这个加密的需求,同时gate_server开启的监听端口只有客户端才会连接过来,所以只需要在gate_server的配置文件里写入这个rsa_key即可开启客户端与服务器之间的通信加密,同时保持服务器之间的通信仍然是明文的,避免加密带来的一些性能损耗。

在前文中我们曾经提到:基于RSA的非对称加密是非常消耗性能的,而基于AES的对称加密在性能上有非常大的优势,实际的加密系统中都是先用RSA加密的数据来执行握手并初始化两者之间的AES加密密钥,初始化完成之后后续的数据将只使用AES加密。在MosaicGame中也是这样设计的,客户端在创建encrypt_connection的时候,就会选择一种AES加密密钥,:

std::shared_ptr<encrypt_connection> encrypt_connection::create(asio::io_context& in_io, asio::ip::tcp::socket&& _in_remote_socket, std::uint64_t inbound_con_idx, std::shared_ptr<spdlog::logger> logger, std::chrono::milliseconds _in_timeout, network_router* in_router, const std::string& in_ras_key)
{
	auto new_connection = std::make_shared< encrypt_connection>(in_io, std::move(_in_remote_socket), inbound_con_idx, logger, _in_timeout, in_router, in_ras_key);
	if (new_connection->is_outbound_connection() && !new_connection->init_cipher("aes-256-cfb"))
	{
		logger->error("fail to init cipher");
		return {};
	}
	return new_connection;
}


encrypt_connection中我们提供了十几种基于opensslAES加密接口,这里偷懒就直接选择了aes-256-cfb这个加密方法。这里的init_cipher会根据指定的加密算法来初始化一个基础的加密密钥,并加上一个公用的前缀cipher_prefix,经过RSA加密之后存储到encrypted_cipher_info中:

std::string encrypt_connection::cipher_prefix()
{
	return "mosaic_game_cipher";
}
bool encrypt_connection::init_cipher(const std::string& cipher_name)
{
	std::string cipher_info_raw;

	cipher_info_raw += cipher_prefix();
	char cipher_code = 0;
	std::vector<unsigned char> ivec(16);
	std::vector<unsigned char> key_vec;
	if (cipher_name.size() > 7 && std::equal(cipher_name.begin(), cipher_name.begin() + 3, "aes"))
	{
		// aes
		encrypt::aes_generator::generate(cipher_name, cipher_code, ivec, key_vec, encryptor, decryptor);
	}

	if (!encryptor || !decryptor)
	{
		logger->error("failt to encrypt::aes_generator::generate");
		return false;
	}

	// 5 cipher code
	cipher_info_raw.append(1, static_cast<char>(cipher_code));
	cipher_info_raw.append(reinterpret_cast<char*>(ivec.data()), ivec.size());
	cipher_info_raw.append(reinterpret_cast<char*>(key_vec.data()), key_vec.size());

	std::array<unsigned char, 128> cipher_info_packed;
	if (cipher_info_raw.size() >= 128)
	{
		logger->error("cipher_info_raw.size() >= 128");
		return false;
	}
	
	std::copy(cipher_info_raw.begin(), cipher_info_raw.end(), cipher_info_packed.data());
	if (rsa_key.modulus_size() < 128)
	{
		logger->warn("invalid rsa public key");
		return false;
	}

	encrypted_cipher_info.resize(rsa_key.modulus_size());
	if (int(encrypted_cipher_info.size()) != rsa_key.encrypt(static_cast<std::uint32_t>(cipher_info_packed.size()), reinterpret_cast<unsigned char*>(cipher_info_packed.data()), encrypted_cipher_info.data(), encrypt::rsa_padding::pkcs1_oaep_padding))
	{
		logger->warn("invalid rsa encrypt size");
		return false;
	}
	return true;
}

随后在成功的与远端建立连接之后,在on_connected回调中会首先将这个encrypted_cipher_info发送到对端,同时开启对指定的密钥确认数据的等待:

void encrypt_connection::on_connected()
{
	std::copy(encrypted_cipher_info.begin(), encrypted_cipher_info.end(), send_buffer.data());
	this->async_send_data(this->encrypted_cipher_info.size());
	this->async_read_data(true, cipher_accept_reply_info().size());
}

encrypt_connection的接收端在启动的时候会首先设置当前连接的状态为等待RSA握手状态,然后启动对握手密钥的读取,由于我们在客户端连接中已经将握手数据扩张为了rsa_key.modulus_size()大小,所以这里会等待指定字节的数据到达:

void encrypt_connection::start_as_server()
{
	encrypt_key_send = true;
	encrypt_key_accepted = false;
	async_read_data(true, rsa_key.modulus_size());
}

当数据到达之后,如果自己是加密连接的接收端,会检查握手是否已经完成,如果没有完成则检查传入的数据是否是合法的握手数据,如果是则使用指定的数据来初始化AES加密密钥,同时将密钥确认信息发送回远端:

std::string encrypt_connection::cipher_accept_reply_info()
{
	return "entity mesh cipher accepted";
}
void encrypt_connection::on_data_read(std::size_t bytes_transferred)
{
	logger->trace("encrypt_connection::on_data_read bytes_transferred {}", bytes_transferred);
	if (!is_outbound_connection())
	{
		if (!this->encryptor)
		{
			if (!accept_cipher(read_buffer.data(), bytes_transferred))
			{
				report_error("accept_cipher");
				return;
			}
			logger->info("accept_cipher suc for client {}:{}", remote.remote_endpoint().address().to_string(), remote.remote_endpoint().port());
			auto cipher_ack_msg = cipher_accept_reply_info();
			encryptor->encrypt(reinterpret_cast<unsigned char*>(cipher_ack_msg.data()), send_buffer.data(), cipher_ack_msg.size());
			async_send_data(cipher_ack_msg.size());
			async_read_data();
			return;
		}
	}
	// 暂时省略后续代码

}

由于accept_cipher会初始化encryptor,开启后续数据的加密,所以这里的cipher_ack_msg也会经过这样的加密,然后再发送回发起端。由于发送端在创建初始密钥的时候已经用这个密钥创建了一个解密的decryptor,所以接受来的消息都需要经过decryptor进行解密。由于我们采用的是AES的流式加密算法,加密前后的字节数量永远相等,所以发起端的on_data_read第一次回调的时候,基本可以保证是握手成功的消息:

void encrypt_connection::on_data_read(std::size_t bytes_transferred)
{
	logger->trace("encrypt_connection::on_data_read bytes_transferred {}", bytes_transferred);
	// 省略前面介绍了的接收端握手处理
	decryptor->decrypt(read_buffer.data(), decrypt_buffer.data(), bytes_transferred);
	std::copy(decrypt_buffer.data(), decrypt_buffer.data() + bytes_transferred, read_buffer.data());
	if (!encrypt_key_accepted)
	{
		encrypt_key_accepted = true;
		auto accept_str = cipher_accept_reply_info();
		if (bytes_transferred != accept_str.size())
		{
			logger->error("expect accept_str {} while size {} mot match ", accept_str, bytes_transferred);
			return;
		}
		for (std::size_t i = 0; i < bytes_transferred; i++)
		{
			if (read_buffer[i] != accept_str[i])
			{
				logger->error("expect accept_str {} while received {}", accept_str, std::string(read_buffer.data(), read_buffer.data() + bytes_transferred));
				return;
			}
		}
		router->push_ctrl_msg(shared_from_this(), connection_ctrl_msg_type::on_connect);
		request_send_data();
		async_read_data();

		return;
	}
	net_connection::on_data_read(bytes_transferred);

}

当确认了收到的消息是握手成功消息之后,连接建立的真正回调才会推送到业务主线程,同时通过request_send_data开始读取业务消息推送队列,不过这里填充了发送数据的buffer之后,还需要通过encryptor->encrypt执行一次加密:

void encrypt_connection::request_send_data()
{
	if (net_controller_)
	{
		auto send_msg_sz = net_controller_->data_should_send(encrypt_buffer.data(), encrypt_buffer.size());
		if (send_msg_sz)
		{
			encryptor->encrypt(encrypt_buffer.data(), send_buffer.data(), send_msg_sz);
			async_send_data(send_msg_sz);
			return;
		}
	}
	else
	{
		logger->warn("encrypt_connection::request_send_data net_controller not set");
	}
	set_timer(timer_type::check_send, std::chrono::milliseconds(5));
}

此时客户端与服务器的连接握手过程完整结束,后续数据的读取与发送都会通知到对应的net_controller,两端发送数据的时候都会执行AES加密,两端接收数据的时候都会执行AES解密。由于加密和解密这两个过程都能保证前后的字节数量一摸一样,因此断线重连的相关流程也不需要修改,因为一个完整的packet所占用的字节数量在加密之后是一样的,这样packet_header里记录的total_sz仍然是正确的。

断线重连

为了避免客户端由于弱网和切换网络导致的断线引发游戏重新登录这种不良体验,我们在gate_server层做了客户端断线重连的逻辑。断线重连的核心就是维护一个session会话,在同一个session内的gate_server发往client的所有数据包的packet_seq是递增的,客户端记录自己业务层已经接收到的最大packet_seq,在断线重连的时候告知gate_server这个最大已接收packet_seq,然后gate_server将对序列号大于此packet_seq的消息包进行重新传输,这样就保证了重连之后客户端接收到的消息包packet_seq永远是连续递增的,不会出现丢失与重复。至于client发往gate_server的数据则不需要做这样的可靠性保证,因为客户端发送到服务器的请求总是可以重试的。

当物理链路断开时,net_channel需要将未发送完成的packet重新插入到output_channel的头部,以保证消息不丢失以及有序。为此我们需要记录那些还没有发送成功的消息。之前我们在net_channel::data_shoud_send函数中会将业务层推送过来的消息按顺序填充到发送缓冲区output_buffer,然后再由net_connection::async_send来将这个发送缓冲区里的数据进行发送。由于TCP是流式协议,填充到发送缓冲区的数据并不保证一次性全量发送完成,如果这个缓冲区数据有多个逻辑packet,可能会出现远端已经接收了开头的若干packet的情况。为了避免这些已经被接收的packet在断线重连之后被重新发送,在一个业务层消息被填充到发送缓冲区的时候我们将这个消息填充到正在发送队列sending_tasks的末尾。同时在网络连接这边需要在每次发送若干字节后都调用这里提供的on_data_send方法,这个函数负责检查一个或多个packet是否已经发送成功,如果发送成功就从sending_tasks队列的头部弹出。这里还有个特别需要注意的地方,async_send发送成功并不代表TCP连接的对端接收成功,只是代表数据已经转移到了系统提供的发送缓冲区。当真正的断线发生的时候,这个残留在发送缓冲区里的数据仍然是没有被对端收到的,因此如果要做一个正确的断线重连,我们还需要保留一些async_send发送成功的消息包到发送完成队列m_already_send_tasks中,这个保留的任务最大数量由m_already_send_task_max_num变量来控制, 其实更好的方法是通过时间戳来控制:


std::string net_channel::on_data_send(const unsigned char* data, std::size_t length)
{
	(void)data;
	m_temp_data_send_length += length;
	while (m_temp_data_send_length)
	{
		if (sending_tasks.empty())
		{
			return "wtf sending_tasks.empty()";
		}
		const auto& temp_task = sending_tasks.front();
		std::size_t total_sz = sizeof(packet_header) + temp_task.from->size() + temp_task.dest->size() + temp_task.data->size();
		//std::cout<<"total_sz "<<total_sz<<" length "<<length<<" task count "<<sending_tasks.size()<<std::endl;
		if (total_sz <= m_temp_data_send_length)
		{
			// 如果开启了断线重连的支持 则将已发送数据包添加到已发送队列的末尾
			// 通知控制队列最大元素数量
			if(m_already_send_task_max_num)
			{
				m_already_send_tasks.push_back(temp_task);
				if(m_already_send_tasks.size() > m_already_send_task_max_num)
				{
					m_already_send_tasks.pop_front();
				}
			}
			sending_tasks.erase(sending_tasks.begin());
			m_temp_data_send_length -= total_sz;
		}
		else
		{
			break;
		}
	}
	return std::string();
}

所以在断线时,不仅要将正在发送队列sending_tasks里的消息包重新放回output_channel,还要把已发送数据m_already_send_tasks队列也重新放回output_channel:

void net_channel::on_disconnected()
{
	std::vector<msg_task> temp_all_sending_tasks;
	temp_all_sending_tasks.reserve(m_sending_tasks.size() + m_already_send_tasks.size());
	while(!m_already_send_tasks.empty())
	{
		temp_all_sending_tasks.push_back(m_already_send_tasks.front());
		m_already_send_tasks.pop_front();
	}
	while(!m_sending_tasks.empty())
	{
		temp_all_sending_tasks.push_back(m_sending_tasks.front());
		m_sending_tasks.pop_front();
	}
	output_channel->push_front_msg_bulk(temp_all_sending_tasks);
}

同时在network_router处理断线的函数里,将output_channel里的所有数据都重新推送回anchor_resource内部的一个数据队列output_channel中:

bool network_router::disconnect(const net_connection* connection)
{
	auto connection_iter = m_connection_resources.find(connection);
	if(connection_iter == m_connection_resources.end())
	{
		m_logger->warn("disconnect {} fail", connection->get_remote_endpoint_name());

		return false;
	}
	m_inbound_connections.erase(connection->inbound_connection_idx);
	const auto& cur_con_name = connection->get_connection_name();
	if (cur_con_name)
	{
		m_named_connection.erase(*cur_con_name);
		m_logger->info("erase conn {} {}", *cur_con_name, connection->get_remote_endpoint_name());
	}
	auto& cur_connection_resource = connection_iter->second;
	for(auto one_node: cur_connection_resource->anchors)
	{
		m_anchor_collection.disconnect(one_node, connection);
		
	}
	cur_connection_resource->anchors.clear();
	cur_connection_resource->connection_controller->on_disconnected();
	std::array<network_channel_task<std::string, std::string>, 10> temp_task;
	std::vector< network_channel_task<std::string, std::string>> total_tasks;
	std::size_t temp_count;
	while(true)
	{
		temp_count = cur_connection_resource->output_channel->pop_bulk_msg(temp_task.data(), temp_task.size());
		if(temp_count == 0)
		{
			break;
		}
		else
		{
			total_tasks.insert(total_tasks.end(), temp_task.begin(), temp_task.begin() + temp_count);
		}
	}
	on_disconnect_handle_remain_tasks(connection, total_tasks);
	
	m_connection_resources.erase(connection_iter);
	return true;
}
void network_router::on_disconnect_handle_remain_tasks(const net_connection* connection, std::vector< network_channel_task<std::string, std::string>>& remain_tasks)
{
	const auto& cur_con_name = connection->get_connection_name();
	for (std::size_t i = 0; i < remain_tasks.size(); i++)
	{
		auto& cur_task = remain_tasks[remain_tasks.size() - i - 1];
		if (*cur_task.from == *cur_con_name)
		{
			// from is local server  ignore control msg
			continue;
		}
		if(cur_task.dest->empty())
		{
			cur_task.dest = cur_con_name;
		}
		m_anchor_collection.try_push_front(cur_task);
	}
}

这里的anchor_resourceoutput_channel主要负责在断线的时候缓存住所有要发送的数据,这样就避免了无连接时的数据丢失:

bool anchor_collection::try_push_front(msg_task& task)
{
	auto cur_iter = m_anchor_resources.find(*task.dest);
	if (cur_iter == m_anchor_resources.end())
	{
		return false;
	}
	
	if (cur_iter->second->connection)
	{
		// 在有连接的时候是不能这么做的
		// 以防老连接在断线时还没发出的信息插入到新连接里
		return false;
	}
	cur_iter->second->output_channel.push_front_msg(task);
	return true;
}

同时在basic_stub处理断线的时候,需要使用should_reconnect检查当前连接是否需要重连,如果需要重连,立即开启下一帧的计时器来启动重连:

void basic_stub::on_disconnected(std::shared_ptr<network::net_connection> connection)
{
	m_router->disconnect(connection);

	const auto& cur_connection_name = connection->get_connection_name();
	if (!cur_connection_name || cur_connection_name->empty())
	{
		m_logger->info("empty conn name for {}", connection->get_remote_endpoint_name());
		return;
	}

	if (!should_reconnect(connection))
	{
		return;
	}

	// re connect
	m_logger->info("reconnect to server {} after {} ms", *cur_connection_name, m_timer_check_gap_ms);
	add_timer_with_gap(std::chrono::milliseconds(m_timer_check_gap_ms), [cur_connection_name, this]()
		{
			connect_to_server(*cur_connection_name);
		});
}

上面这些就是MosaicGame网络层为断线重连做的一些底层支持,但是这些代码只是做到了未发送数据和一些已发送数据的缓存。断线重连时,还需要设计额外的逻辑来做逻辑层的接收确认,整个接收确认以及重传的逻辑链路还是比较长的,需要详细的跟踪这期间的流程:

  1. 客户端第一次连接到gate_server时,会向gate_server发送一个request_create_session的请求:

void basic_client::on_connect(std::shared_ptr<network::net_connection> connection)
{
	basic_stub::on_connect(connection);

	auto cur_con_name = get_connection_name(connection.get());
	if (cur_con_name && *cur_con_name == m_upstream_server.name)
	{
		m_gate_connection = connection;
		m_logger->info("set gate connection with name {}", *cur_con_name);
		if (!m_main_player)
		{
			request_create_session(connection);
		}
		else
		{
			request_reconnect_session(connection);
		}	
	}
}

void basic_client::request_create_session(std::shared_ptr<network::net_connection> connection)
{
	json message;
	message["cmd"] = "request_create_session";
	json::object_t params;
	message["param"] = params;
	m_router->push_msg(connection.get(), m_local_name_ptr, get_connection_name(connection.get()), std::make_shared<const std::string>(message.dump(4)),  enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
  1. gate_server分配一个唯一的session_str作为两者session的唯一标识符,以及一个account_id,一并发送到客户端:
void gate_server::on_request_create_session(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
	if(m_stopped)
	{
		return;
	}
	std::string error_info = std::string();
	std::string cur_session_str;
	std::shared_ptr<network::net_connection> outbound_con;
	do {
		if (m_connection_sessions.find(con->inbound_connection_idx) != m_connection_sessions.end())
		{
			error_info = "already has session";
			break;
		}
		outbound_con = choose_space_server();
		if (!outbound_con)
		{
			error_info = "no game server available";
			break;
		}
		cur_session_str = generate_session_str();
	} while (0);

	json reply_msg, reply_param;
	reply_msg["cmd"] = "reply_create_session";
	reply_param["errcode"] = error_info;
	if (error_info.empty())
	{
		reply_param["account_id"] = on_session_created(con, outbound_con, cur_session_str);
	}
	else
	{
		reply_param["account_id"] = std::string{};
	}
	reply_param["session"] = cur_session_str;

	reply_msg["param"] = reply_param;

	m_router->push_msg(con.get(), m_local_name_ptr, {}, std::make_shared<const std::string>(reply_msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));


}
  1. gate_server以这个session_str作为anchor分配一个anchor_resource并关联当前连接,这样所有服务端发向这个客户端的数据都会通过这个anchor_resource内的队列进行中转, 同时根据当前的session关联的entity_id初始化数据包编号为0
std::string gate_server::on_session_created(std::shared_ptr<network::net_connection> inbound_con, std::shared_ptr<network::net_connection> outbound_con, const std::string& session_key)
{
	auto cur_inbound_con_idx = inbound_con->inbound_connection_idx;
	session_info cur_session_info;
	cur_session_info.session = session_key;
	cur_session_info.inbound_con = inbound_con;
	cur_session_info.outbound_con = outbound_con;
	cur_session_info.entity_id = generate_account_id(cur_inbound_con_idx);
	cur_session_info.shared_eid = std::make_shared<std::string>(cur_session_info.entity_id);
	m_connection_sessions[cur_inbound_con_idx] = cur_session_info;
	m_session_to_conn_id[session_key] = cur_inbound_con_idx;
	m_eid_to_conn_id[cur_session_info.entity_id] = cur_inbound_con_idx;
	m_router->link_anchor_to_connection(cur_session_info.entity_id, inbound_con.get());
	inbound_con->set_connection_name(cur_session_info.entity_id, m_logger);
	m_logger->info("link_anchor_to_connection eid {} con_id {}", cur_session_info.entity_id, inbound_con->inbound_connection_idx);
	json create_account_info, create_param, init_info;
	create_account_info["cmd"] = "request_create_account";
	create_param["entity_id"] = cur_session_info.entity_id;
	init_info["connection_idx"] = cur_inbound_con_idx;
	create_param["init_info"] = init_info;
	create_account_info["param"] = create_param;
	m_router->push_msg(outbound_con.get(), m_local_name_ptr, outbound_con->get_connection_name(), std::make_shared<std::string>(create_account_info.dump(4)), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	// 新创建的时候 初始化序列号为0
	m_entity_send_last_seq[cur_session_info.entity_id] = 0;
	return cur_session_info.entity_id;
}
  1. gate_server往客户端发送消息时,会对当前数据包的msg_seq做自增:
// game 发往client
void gate_server::on_call_client(const network::msg_task& one_msg)
{

	m_logger->debug("call client {} with msg {}", *one_msg.dest, *one_msg.data);
	auto temp_iter = m_entity_send_last_seq.find(*one_msg.dest);
	if(temp_iter == m_entity_send_last_seq.end())
	{
		return;
	}
	// 添加唯一有序递增编号
	temp_iter->second++;
	auto cur_msg_seq = temp_iter->second;

	// game发向client 时 直接使用entity id 而不要去查询connection
	// 因为此时可能在断线重连阶段 我们要利用这个按名发送的缓冲机制缓存一下数据
	m_router->push_msg({}, one_msg.dest, one_msg.data, one_msg.cmd, cur_msg_seq);
}
  1. 一个客户端连接断线时,net_channel会把所有未发送数据和一些已发送数据有序的保存在anchor_resourceoutput_channel中,同时保留这个anchor_resource一段时间作为重连时间窗口,这样服务端发往此客户端的消息会暂存到这个anchor_resource内部的消息队列output_channel中,不至于丢失消息:
void gate_server::on_client_disconnected(std::shared_ptr<network::net_connection> connection)
{
	auto cur_con_idx = connection->inbound_connection_idx;
	auto cur_session_iter = m_connection_sessions.find(cur_con_idx);
	if (cur_session_iter == m_connection_sessions.end())
	{
		return;
	}
	cur_session_iter->second.inbound_con.reset();
	m_session_disconnected_ts[cur_con_idx] = std::chrono::system_clock::now();
	
	json notify_client_disconnected_info, param;
	notify_client_disconnected_info["cmd"] = "notify_client_disconnected";
	param["entity_id"] = cur_session_iter->second.entity_id;
	notify_client_disconnected_info["param"] = param;
	m_router->push_msg(cur_session_iter->second.outbound_con.get(), m_local_name_ptr, {}, std::make_shared<std::string>(notify_client_disconnected_info.dump(4)), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
void gate_server::on_disconnected(std::shared_ptr<network::net_connection> connection)
{
	if (!connection->is_outbound_connection())
	{
		on_client_disconnected(connection);
	}
	else
	{
		on_lose_game(connection);
	}
	json_stub::on_disconnected(connection);
}

服务端对应的account_entity会收到客户端断线的通知,不过这里并不会做特殊的处理,只是打印一下日志:

void account_entity::on_notify_client_disconnected()
{
	m_logger->info("{} on_notify_client_disconnected ", m_base_desc.m_persist_entity_id);
}
  1. 这里还记录了一个断线时间戳数据到m_session_disconnected_ts中,gate_server会定期扫描其中掉线时间超过m_lost_client_gapsession,来释放资源,同时通知对应的服务端account_entity客户端掉线的消息:
void gate_server::check_remove_session()
{
	std::vector<std::uint64_t> temp_con_to_remove;
	auto now_ts = std::chrono::system_clock::now();
	for(auto one_pair: m_session_disconnected_ts)
	{
		auto temp_duration = std::chrono::duration_cast<std::chrono::seconds>(now_ts - one_pair.second);
		if(temp_duration.count() > m_lost_client_gap)
		{
			temp_con_to_remove.push_back(one_pair.first);
		}
	}
	for(auto one_con: temp_con_to_remove)
	{
		m_session_disconnected_ts.erase(one_con);
		auto cur_iter = m_connection_sessions.find(one_con);
		if(cur_iter == m_connection_sessions.end())
		{
			continue;
		}
		const auto& cur_session = cur_iter->second.session;
		m_logger->info("remove expired session {} with entity id {}", cur_session, cur_iter->second.entity_id);
		json notify_client_destroy_info, param;
		notify_client_destroy_info["cmd"] = "notify_client_destroy";
		param["entity_id"] = cur_iter->second.entity_id;
		notify_client_destroy_info["param"] = param;
		m_router->push_msg(cur_iter->second.outbound_con.get(), m_local_name_ptr, {}, std::make_shared<std::string>(notify_client_destroy_info.dump(4)), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
		for(auto& one_group: m_clients_for_group)
		{
			one_group.second.erase(std::string_view(cur_iter->second.entity_id));
		}
		m_session_to_conn_id.erase(cur_session);
		m_eid_to_conn_id.erase(cur_iter->second.entity_id);
		m_router->remove_anchor(cur_iter->second.entity_id);
		
		m_connection_sessions.erase(cur_iter);

	}
	m_session_remove_timer = add_timer_with_gap(std::chrono::seconds(1), [this]() 
	{
		this->check_remove_session();
	});
}
  1. 客户端发现自己断线后,如果发现当前已经登陆完成并创建好了玩家角色,则会重新发起到原始上游gate_server的连接,同时记录当前已经接收到的最大消息包序列号到m_reconnect_msg_read_seq,如果没有创建玩家角色,则彻底断线:
bool basic_client::should_reconnect(std::shared_ptr<network::net_connection> connection)
{
	if (!basic_stub::should_reconnect(connection))
	{
		return false;
	}
	if(connection == m_gate_connection)
	{
		// 如果是gate连接 则只有在角色创建之后走重连逻辑 否则走重新登录
		// 如果这个read_seq 为0 代表业务消息包还暂未发送 或者最新的包是控制消息包
		return m_main_player != nullptr && m_router->get_connection_resource(m_gate_connection)->connection_controller->get_packet_read_seq() != 0;
	}
	else
	{
		return true;
	}
}
void basic_stub::on_disconnected(std::shared_ptr<network::net_connection> connection)
{
	m_router->disconnect(connection);

	const auto& cur_connection_name = connection->get_connection_name();
	if (!cur_connection_name || cur_connection_name->empty())
	{
		m_logger->info("empty conn name for {}", connection->get_remote_endpoint_name());
		return;
	}

	if (!should_reconnect(connection))
	{
		return;
	}

	// re connect
	m_logger->info("reconnect to server {} after {} ms", *cur_connection_name, m_timer_check_gap_ms);
	add_timer_with_gap(std::chrono::milliseconds(m_timer_check_gap_ms), [cur_connection_name, this]()
		{
			connect_to_server(*cur_connection_name);
		});
}
void basic_client::on_disconnected(std::shared_ptr<network::net_connection> connection)
{
	bool is_lose_server = connection == m_gate_connection;
	json_stub::on_disconnected(connection);
	if (is_lose_server)
	{
		if (m_main_player)
		{
			m_main_player->on_lose_server();
		}
		else
		{
			if (m_main_account)
			{
				m_main_account->on_lose_server();
				entity::entity_manager::instance().destroy_entity(m_main_account);
				m_main_account = nullptr;
			}
		}
		m_gate_connection = nullptr;
	}
}
  1. 当连接到gate_server成功后,会触发on_connect回调,内部判断角色已经创建的情况下会发送一个重连消息包,附上之前商定好的session_str以及本地接收到的最大包序列号msg_read_seq:
void basic_client::on_connect(std::shared_ptr<network::net_connection> connection)
{
	basic_stub::on_connect(connection);

	auto cur_con_name = get_connection_name(connection.get());
	if (cur_con_name && *cur_con_name == m_upstream_server.name)
	{
		m_gate_connection = connection;
		m_logger->info("set gate connection with name {}", *cur_con_name);
		if (!m_main_player)
		{
			request_create_session(connection);
		}
		else
		{
			request_reconnect_session(connection);
		}	
	}
}

void basic_client::request_reconnect_session(std::shared_ptr<network::net_connection> connection)
{
	json message;
	message["cmd"] = "request_reconnect_session";
	json::object_t params;
	params["pre_session"] = m_session_key;
	params["msg_read_seq"] = m_router->get_connection_resource(m_gate_connection)->connection_controller->get_packet_read_seq();
	message["param"] = params;
	m_router->push_msg(connection.get(), m_local_name_ptr, get_connection_name(connection.get()), std::make_shared<const std::string>(message.dump(4)), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
  1. gate_server收到一个连接的重连消息包之后,检查session_str是否仍然有效,如果有效则将对应的anchor_resource绑定到新的连接,否则通知客户端重连失败,退回到等待登录状态:
void gate_server::on_request_reconnect_session(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
	if(m_stopped)
	{
		return;
	}
	std::string pre_session;
	std::uint64_t last_read_msg_seq = 0;
	std::uint64_t pre_connection_idx = 0;
	std::string error_info = std::string();

	try
	{
		msg.at("pre_session").get_to(pre_session);
		msg.at("msg_read_seq").get_to(last_read_msg_seq);
	}
	catch (std::exception& e)
	{
		m_logger->error("on_request_reconnect_session fail to parse {} error {}", msg.dump(), e.what());
		error_info = "invalid msg format";
	}

	if (error_info.empty())
	{
		do
		{
			if (m_connection_sessions.find(con->inbound_connection_idx) != m_connection_sessions.end())
			{
				error_info = "already has session";
				break;
			}
			auto cur_con_id_iter = m_session_to_conn_id.find(pre_session);
			if(cur_con_id_iter == m_session_to_conn_id.end())
			{
				error_info = "invalid session key";
				break;
			}
			pre_connection_idx = cur_con_id_iter->second;
			auto cur_session_iter = m_connection_sessions.find(pre_connection_idx);
			if (cur_session_iter == m_connection_sessions.end())
			{
				error_info = "invalid pre_connection";
				break;
			}
			if (cur_session_iter->second.session != pre_session)
			{
				error_info = "session not match";
				break;
			}
			if (cur_session_iter->second.inbound_con)
			{
				error_info = "session still online";
				break;
			}
			if(!m_router->remove_readed_msgs(cur_session_iter->second.entity_id, last_read_msg_seq))
			{
				error_info = "invalid last_read_msg_seq";
				break;
			}

		} while (0);
	}
	
	json reply_msg, reply_param;
	reply_msg["cmd"] = "reply_reconnect_session";
	reply_param["errcode"] = error_info;
	reply_msg["param"] = reply_param;
	m_router->push_msg(con.get(), m_local_name_ptr, {}, std::make_shared<const std::string>(reply_msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
	if (error_info.empty())
	{
		on_session_reconnected(pre_connection_idx, con, last_read_msg_seq);
	}
	
	return;
}
  1. 这里的remove_readed_msgs会将anchor_resource的输出队列中所有packet_seq小于等于last_read_msg_seq的数据都删除,因为这些数据已经被客户端确认过了。如果队列中剩余消息包的最小编号大于last_read_msg_seq+1,则代表已确认数据与剩余数据之间有空窗,出现消息丢失,导致无法重连。
bool anchor_collection::remove_readed_msgs(const std::string& anchor_name, std::uint64_t last_read_msg_seq)
{
	auto cur_iter = m_anchor_resources.find(anchor_name);
	anchor_resource* cur_resource = nullptr;
	if (cur_iter != m_anchor_resources.end())
	{
		cur_resource = cur_iter->second.get();
	}
	if (!cur_resource)
	{
		return false;
	}
	if(cur_resource->get_connection())
	{
		return false;
	}
	std::array<network_channel_task<std::string, std::string>, 10> temp_tasks;
	std::size_t cur_count = 0;
	std::size_t min_msg_seq_in_queue = 0; // 记录队列里的最小数据编号
	while (true)
	{
		cur_count = cur_resource->output_channel.pop_bulk_msg(temp_tasks.data(), temp_tasks.size());
		if (cur_count == 0)
		{
			break;
		}
		if(min_msg_seq_in_queue==0)
		{
			min_msg_seq_in_queue = temp_tasks[0].msg_seq;
		}
		for(std::uint32_t i = 0; i< cur_count;i++)
		{
			
			if(temp_tasks[i].msg_seq >last_read_msg_seq)
			{
				std::vector<network_channel_task<std::string, std::string>> remain_tasks(temp_tasks.data() + i, temp_tasks.data() + cur_count);
				cur_resource->output_channel.push_front_msg_bulk(remain_tasks);
				break;
			}
		}
	}
	// 如果出现最小包编号大于已读编号加1 则代表断线重连会出现消息丢失 此时返回false
	return min_msg_seq_in_queue<=last_read_msg_seq+1; 
}
  1. 如果传递过来的session_str是一个有效的已经断线且还没有完全过期的session,同时传递过来的last_read_msg_seq+1大于等于发送缓冲区里最小的数据编号,则将这个session之前绑定的客户端连接替换为当前的新连接:
void gate_server::on_session_reconnected(std::uint64_t pre_con_idx, std::shared_ptr< network::net_connection> new_con)
{
	session_info cur_session_info;
	auto pre_session_iter = m_connection_sessions.find(pre_con_idx);
	cur_session_info.session = pre_session_iter->second.session;
	cur_session_info.inbound_con = new_con;
	cur_session_info.outbound_con = pre_session_iter->second.outbound_con;
	cur_session_info.entity_id = pre_session_iter->second.entity_id; // 这里维持原来的entity_id 因为game上使用最开始的entity_id创建的account
	cur_session_info.shared_eid = std::make_shared<std::string>(cur_session_info.entity_id);
	m_connection_sessions.erase(pre_session_iter);
	m_connection_sessions[new_con->inbound_connection_idx] = cur_session_info;
	m_session_to_conn_id[cur_session_info.session] = new_con->inbound_connection_idx;
	m_eid_to_conn_id[cur_session_info.entity_id] = new_con->inbound_connection_idx;
	new_con->set_connection_name(cur_session_info.entity_id, m_logger);
	m_router->link_anchor_to_connection(cur_session_info.entity_id, new_con.get());

	m_session_disconnected_ts.erase(pre_con_idx);
	json notify_client_reconnected_info, param;
	notify_client_reconnected_info["cmd"] = "notify_client_reconnected";
	param["entity_id"] = cur_session_info.entity_id;
	notify_client_reconnected_info["param"] = param;
	m_router->push_msg(cur_session_info.outbound_con.get(), m_local_name_ptr, {}, std::make_shared<std::string>(notify_client_reconnected_info.dump(4)), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}

如果断线重连成功,account_entity会收到notify_client_reconnected通知,这里同样也只是日志记录一下,因此完整的一次断线重连对于player_entity来说是完全无感知的:

void account_entity::on_notify_client_reconnected()
{
	m_logger->info("{} on_notify_client_reconnected ", m_base_desc.m_persist_entity_id);
}

如果在gate_server指定的时间窗口内都没有断线重连回来,则对应的account_entity会收到一个notify_client_destroy的消息,这里会开启一个计时器,如果这个计时器超时前没有新的客户端登录这个账号,则开启自动下线流程:

void account_entity::on_notify_client_destroy()
{
	m_destroy_client_timer = add_timer_with_gap(std::chrono::seconds(m_auto_logout_second_when_client_destroy), [this]()
	{
		request_logout_account(utility::rpc_msg());
	});
}

迁移保序

在我们目前的服务器架构中, space_server是可以有多个实例,如果一个actor_entity有迁移能力,那么这个actor_entity就可能会在不同的进程中进行迁移。由于迁移前后同一个actor_entity的通信地址会发生变化,所以如果直接使用actor_entity的当前地址作为投递地址的话很可能出现迁移后消息丢失的情况。为了避免出现迁移引发的地址失效问题,对于可以迁移的actor_entity,我们会在创建这个actor_entity的同时,创建一个不参与迁移的relay_entity。这个relay_entity的作用就是作为这个actor_entity的消息中转站来使用,实现方式是将这个不会迁移的relay_entitycall_anchor设置到这个actor_entity上。 player_entity创建时会直接在初始数据里带上call_anchor参数,而其他普通的actor_entity则是在创建之后手动调用set_call_anchor来修改内部的m_call_anchor:

player_entity* account_entity::create_player_entity(const std::string& player_id, const json& player_doc)
{
	std::string cur_err;
	auto cur_relay_entity_id = std::to_string(get_server()->gen_unique_uint64());
	json::object_t relay_init_info;
	relay_init_info["dest_eid"] = player_id;
	relay_init_info["dest_game"] = get_server()->local_stub_info().name;
	auto cur_relay_entity = get_server()->create_entity("relay_entity", cur_relay_entity_id, gen_online_entity_id(),relay_init_info, cur_err);
	if(!cur_relay_entity)
	{
		m_logger->error("fail to create relay_entity");
		return nullptr;
	}
	m_relay_entity = dynamic_cast<relay_entity*>(cur_relay_entity);
	m_relay_entity->setup_client_info(m_gate_id, get_call_proxy());
	json::object_t player_init_info;
	// 省略一些代码
	player_init_info["call_proxy"] = *cur_relay_entity->get_call_proxy();
	auto cur_entity = get_server()->create_entity("player_entity", player_id, gen_online_entity_id(),player_init_info, cur_err);
	// 省略后续代码
}

actor_entity* space_entity::create_entity(const std::string& entity_type, const std::string& entity_id, json::object_t& init_info, const json::object_t& enter_info, std::uint64_t online_entity_id)
{
	utility::entity_load_stat_recorder temp_recorder(entity_load_stat());
	if(online_entity_id == 0)
	{
		online_entity_id = gen_online_entity_id();
	}

	std::string create_entity_error;
	auto cur_entity = get_server()->create_entity(entity_type, entity_id, online_entity_id, init_info, create_entity_error);
	if(!cur_entity)
	{
		m_logger->error("fail to create_entity type {} id {} with error {}", entity_type, entity_id, create_entity_error);
		return nullptr;
	}
	auto cur_actor_entity = dynamic_cast<actor_entity*>(cur_entity);
	if(!cur_actor_entity)
	{
		m_logger->error("fail to create actor_entity with entity_type {}", cur_entity->type_name());
		get_server()->destroy_entity(cur_entity);
		return nullptr;
	}
	if(!cur_actor_entity->is_global_actor() && is_cell_space() && !cur_actor_entity->is_ghost())
	{
		// 非全局actor 都需要建立一个relay entity
		json::object_t relay_init_info;
		relay_init_info["dest_game"] = get_server()->local_stub_info().name;
		relay_init_info["dest_eid"] = entity_id;
		auto cur_relay_entity_id = get_server()->gen_unique_str();
		auto cur_relay_entity = get_server()->create_entity("relay_entity", cur_relay_entity_id, gen_online_entity_id(),  relay_init_info, create_entity_error);
		if(!cur_relay_entity)
		{
			m_logger->error("fail to create relay_entity id {} with error {}", entity_id, create_entity_error);
			get_server()->destroy_entity(cur_entity);
			return nullptr;
		}
		
		m_relay_entities[entity_id] = cur_relay_entity;
		cur_actor_entity->set_call_proxy(*cur_relay_entity->get_call_proxy());
		m_logger->info("create relay entity {} for entity  {}", cur_relay_entity_id, entity_id);
	}
	// 省略一些代码
}

当其他模块需要往这个actor_entity发送一个远程消息时,使用的其实就是relay_entitycall_anchor,此时relay_entity处理rpc的时候如果发现自身没有相关rpc的定义,就会将这个数据转发到相应的actor_entity的最新通信地址上:

utility::rpc_msg::call_result relay_entity::on_rpc_msg(const utility::rpc_msg& msg)
{
	auto temp_result = rpc_owner_on_rpc(msg);
	if(temp_result == utility::rpc_msg::call_result::dest_not_found)
	{
		forward_to_actor(msg);
		return utility::rpc_msg::call_result::suc;
	}
	else
	{
		return temp_result;
	}
}

void account_entity::call_player(const utility::rpc_msg& msg)
{
	if(!m_relay_entity)
	{
		return;
	}
	
	m_relay_entity->forward_to_actor(msg);
}

为了在relay_entity上获得对应actor_entity的最新通信地址,需要actor_entity在迁移之前先获得当前relay_entity的允许,此时relay_entity将目标地址设置为空,在目标地址为空的时候,forward_to_actor的消息会先缓存起来,避免迁移中间的消息丢失:

void relay_entity::request_migrate_begin(const utility::rpc_msg& msg, const std::string& game_id, const std::string& space_id, const std::string& union_space_id, const json::object_t& enter_info)
{
	if(!m_dest_actor)
	{
		m_logger->error("request_migrate_begin while dest_anchor empty dest_game {} dest_eid {}", m_dest_game, m_dest_eid);
		return;
	}
	utility::rpc_msg reply_msg;
	reply_msg.cmd = "reply_migrate_begin";
	reply_msg.args.push_back(game_id);
	reply_msg.args.push_back(space_id);
	reply_msg.args.push_back(union_space_id);
	reply_msg.args.push_back(enter_info);
	call_server(m_dest_actor, reply_msg);
	m_dest_actor.reset();
	m_dest_game = game_id;
	
}

void relay_entity::forward_to_actor(const network::msg_task& cur_msg_task)
{
	if(!m_dest_actor)
	{
		m_cached_msgs.push_back(cur_msg_task);
	}
	else
	{
		auto cur_cmd_detail = enums::packet_cmd_helper::decode(cur_msg_task.cmd);
		call_server(m_dest_actor, cur_msg_task.data, cur_cmd_detail.first, cur_cmd_detail.second);
	}
}

actor_entity只有在接收到reply_migrate_begin之后才会开始真正的迁移,当收到这个消息时,之前通过relay_entity发送到当前actor_entity的消息肯定已经全都收到了,因为底层对应的是同一个物理连接。等到actor_entity迁移完成之后,会通过notify_migrate_finish将最新地址发送过来, 此时再将迁移期间缓存的数据按序发出,并清空这个m_cached_msgs数组:

void relay_entity::notify_migrate_finish(const utility::rpc_msg& msg, const std::string& game_id)
{
	if(m_dest_game != game_id)
	{
		m_logger->error("notify_migrate_finish while  game not match  empty dest_game {} dest_eid {} new_game_id {}", m_dest_game, m_dest_eid, game_id);
		return;
	}
	m_dest_actor = std::make_shared<std::string>(utility::rpc_anchor::concat(m_dest_game, m_dest_eid));
	
	for(const auto& one_msg: m_cached_msgs)
	{
		auto cur_cmd_detail = enums::packet_cmd_helper::decode(one_msg.cmd);
		call_server(m_dest_actor, one_msg.data, cur_cmd_detail.first, cur_cmd_detail.second);
	}
	
	m_cached_msgs.clear();
}

m_dest_actor被设置到值之后,后续的消息发送就直接走call_server接口,不再缓存。

不过relay_entity也有其局限性,他只能保证一个不会迁移的消息发送者发送到当前对应的actor_entity的所有数据按照发出顺序投递。如果消息发送者A在迁移前给relay_entity(B)发出了消息M,在迁移后给relay_entity(B)发出了消息N,那relay_entity(B)只能保证M,N都能投递到actor_entity(B),不能保证MN之前到达。因为relay_entity(B)MN的接收顺序是不确定的,同时relay_entity(B)只能按照消息到达自身的顺序去转发到actor_entity(B)。如果实在是要求所有由A发送到B的消息必须按照发送顺序来接收,可以按照这个方案来操作:

  1. A也创建一个relay_entity
  2. A发送给B的数据先发送到relay_entity(A),此时能够保证relay_entity(A)接受数据的顺序等于A发出数据的顺序
  3. 然后relay_entity(A)将数据依次转发到relay_entity(B),由于两个relay_entity都是不会迁移的,因此这些消息使用的是同一个物理连接,可以保证接受顺序等于发送顺序
  4. relay_entity(B)将所有接收到的数据按照顺序发送到actor_entity(B),这个actor_entity(B)接收顺序也是可以保证的

所以这种强一致性的网络发送方案可以通过两层relay_entity来做到,代价就是消息的延迟增加了两个relay_entity的中转。

迁移时除了往actor_entity投递的消息可能发生乱序和丢失之外,player_entity往客户端发送的消息也可能出现乱序。因为gate_server是不动的,而player_entity则是不断的在进程间迁移的。由于不同进程之间的网络延迟不同,如果player_entity发送给客户端的数据是直接通过gate_server的通信地址直接投递的话,其接收顺序是不能保证的。如果客户端接收到的数据顺序是错乱的,那么很可能会导致各种逻辑问题,典型症状就是同一个属性修改多次的情况下,客户端最后的值与服务端最后的值不一样。

要解决这个顺序问题,就需要跟前述的actor_entity之间通信保序方案一样,在relay_entity上做转发操作,因为当前actor_entity的迁移会在对应的relay_entity之间做一次rpc同步。此时player_entity发往客户端的数据不再直接发送到gate_server,而是先发送到relay_entity,然后再由relay_entity发送到gate_server:


void relay_entity::forward_to_client(const network::msg_task& cur_msg)
{
	if(m_gate_id == 0)
	{
		m_logger->error("fail to forward to client due to gate_id is 0");
		return;
	}
	auto cur_cmd_detail = enums::packet_cmd_helper::decode(cur_msg.cmd).second;
	auto cur_entity_packet_detail = enums::client_entity_packet_helper::decode(cur_cmd_detail);
	if(cur_entity_packet_detail.second != m_gate_version)
	{
		m_logger->error("fail to forward to client due to gate_version not match gate_version {} msg_gate_version {}", m_gate_version, cur_entity_packet_detail.second);
		return;
	}
	get_server()->call_gate(m_gate_id, m_account_anchor, cur_msg.data,cur_entity_packet_detail.first);
}

void player_entity::call_client(enums::entity_packet entity_packet_cmd, std::shared_ptr<const std::string> data)
{
	if(!has_client())
	{
		return;
	}
	if(entity_packet_cmd == enums::entity_packet::json_rpc || entity_packet_cmd == enums::entity_packet::sync_prop)
	{
		m_logger->debug("call_client  cmd {} data {}", std::uint8_t(entity_packet_cmd), *data);
	}
	else
	{
		m_logger->debug("call_client cmd {} sz {}", std::uint8_t(entity_packet_cmd), data->size());
	}
	if(is_ghost())
	{
		m_logger->warn("ghost player_entity call_client ignored");
		return;
	}
	if(b_is_migrating)
	{
		m_to_client_msg_when_migrating.emplace_back(enums::client_entity_packet_helper::encode(entity_packet_cmd, m_gate_version), data);
	}
	else
	{
		call_relay_anchor(enums::packet_cmd::game_to_client, enums::client_entity_packet_helper::encode(entity_packet_cmd, m_gate_version), data);
	}
}

由于actor_entityrelay_entity执行rpc同步是一个异步操作,因此在这个操作期间发往客户端的数据不能直接发送到relay_entity,而是先缓存到自身的一个临时队列m_to_client_msg_when_migrating中。真正开始迁移打包数据的时候会将这个队列一起打包:

void player_entity::encode_migrate_out_data(json::object_t& migrate_info, bool enter_new_space)
{
	actor_entity::encode_migrate_out_data(migrate_info, enter_new_space);
	migrate_info["prop"] = m_prop_data.encode();
	migrate_info["gate_version"] = m_gate_version;
	std::vector<json> cached_client_msgs;
	for(auto& [cmd, msg]: m_to_client_msg_when_migrating)
	{
		json one_msg = serialize::encode_multi(cmd, *msg);
		cached_client_msgs.push_back(std::move(one_msg));
	}
	migrate_info["cached_to_client_msgs"] = cached_client_msgs;
	m_to_client_msg_when_migrating.clear();
}

当迁移完毕之后,会先通知relay_entity迁移完成,然后再将这个缓存的客户端数据依次发出:

void player_entity::player_event_listener(const utility::enum_type_value_pair& ev_cat, const json::object_t& detail)
{
	if(ev_cat == utility::enum_type_value_pair(enums::migrate_event::migrate_in_after_component_decode))
	{
		if(is_ghost())
		{
			return;
		}
		auto cur_gate_version_iter = detail.find("gate_version");
		if(cur_gate_version_iter != detail.end())
		{
			m_gate_version = cur_gate_version_iter->second.get<std::uint8_t>();
		}
		else
		{
			m_logger->error("migrate_in_after_component_decode missing gate_version set to 0");
			m_gate_version = 0;
		}
		if(!has_client())
		{
			return;
		}
		auto cached_msgs_iter = detail.find("cached_to_client_msgs");
		if(cached_msgs_iter == detail.end())
		{
			return;
		}
		auto& cached_msgs = cached_msgs_iter->second;
		for(const auto& one_cached_msg: cached_msgs)
		{
			std::uint8_t cmd;
			std::string msg;
			serialize::decode_multi(one_cached_msg, cmd, msg);
			call_relay_anchor(enums::packet_cmd::game_to_client, cmd, std::make_shared<const std::string>(std::move(msg)));
		}
	}
}

顶号保护

我们目前的设计里,当多个客户端登录同一个账号时,前面的客户端会被后面的客户端顶掉,这个过程也叫顶号。在顶号发生时,player_entitygate将会被更新,会先被通知客户端销毁,

void player_entity::notify_player_client_destroyed(const utility::rpc_msg& msg)
{
	m_logger->warn("player {} notify_player_client_replaced", entity_id());
	m_gate_version = 0;
}

然后等新的gate绑定完成时,再被同步最新的gate信息。此时player_entity会将当前的最新数据完整的打包下去,通知客户端来同步当前的所有状态:

void player_entity::notify_player_client_replaced(const utility::rpc_msg& msg, const std::string& new_gate_name, std::uint8_t new_gate_version)
{
	m_logger->warn("player client replaced by new gate {} version {}", new_gate_name, new_gate_version);
	m_gate_version = new_gate_version;
	if(new_gate_name.empty())
	{
		return;
	}

	auto sync_info = encode_with_flag(std::uint32_t(enums::encode_flags::self_client));
	utility::rpc_msg full_sync_msg;
	full_sync_msg.cmd = "create_player";
	full_sync_msg.args.push_back(entity_id());
	full_sync_msg.args.push_back(std::move(sync_info));
	call_client(full_sync_msg);
	m_login_dispatcher.dispatch(true);

}

但是由于在顶号期间,player_entity只是被动的接收这两个rpc,同时player_entity通过relay_entity往客户端发送数据是主动的。所以可能会出现一些本来应该发往老客户端的数据被发到新客户端的情况:

  1. 时刻1 player_entityrelay_entity发送一条往客户端Client(A)的消息M
  2. 时刻2 player_entity接收到relay_entity转发过来的notify_player_client_destroyed消息,此时relay_entity上的gate信息被清空
  3. 时刻3 player_entity接收到relay_entity转发过来的notify_player_client_replaced消息,此时relay_entity上的gate信息被设置为最新的Client(B)
  4. 时刻4 消息Mrelay_entity上被接收,并通过forward_to_client转发到了Client(B)

为了修正这种错误的客户端数据发送,我们可以在player_entity发送客户端数据时都带上当前的gate信息。当relay_entity接收到这个数据的时候,会将消息里的gate信息与当前relay_entitygate信息进行比对,如果不相等则直接丢弃。

由于顶号是一个低频的操作,为所有给客户端发送的数据都带上一个当前gate的信息会导致数据包被重新打包,比较浪费CPU。因此当前在mosaic_game中,只携带了4bitgate_version信息,这样这个4bitentity_packet可以拼接成一个uint8,可以避免数据的重新打包:

static_assert(int(enums::entity_packet::max) <= 16, "entity_packet max exceed 16, cannot fit in 4 bits because gate_version also need 4 bits");

// 将entity_packet与gate_version结合编码解码 用一个字节表示
// 这样就可以方便的区分数据里的gate版本 避免顶号之后还能收到发往老客户端的数据
struct client_entity_packet_helper
{
	static std::uint8_t encode(entity_packet in_packet_cmd, std::uint8_t in_gate_version)
	{
		std::uint8_t result = std::uint8_t(in_packet_cmd);
		result <<= 4;
		result += in_gate_version;
		return result;
	}

	static std::pair<entity_packet, std::uint8_t> decode(std::uint8_t in_combine_cmd)
	{
		std::pair<entity_packet, std::uint8_t> result;
		result.second = in_combine_cmd % 256;
		result.first = entity_packet(in_combine_cmd / 256);
		return result;
	}
};

void player_entity::call_client(enums::entity_packet entity_packet_cmd, std::shared_ptr<const std::string> data)
{
	// 省略很多之前已经介绍的代码
	call_relay_anchor(enums::packet_cmd::game_to_client, enums::client_entity_packet_helper::encode(entity_packet_cmd, m_gate_version), data);
}

relay_entity在接收到这些向客户端转发的数据时,会将这里面4bitgate_version拿出来进行比对,不匹配则忽略:

void relay_entity::forward_to_client(const network::msg_task& cur_msg)
{
	if(m_gate_id == 0)
	{
		m_logger->error("fail to forward to client due to gate_id is 0");
		return;
	}
	auto cur_cmd_detail = enums::packet_cmd_helper::decode(cur_msg.cmd).second;
	auto cur_entity_packet_detail = enums::client_entity_packet_helper::decode(cur_cmd_detail);
	if(cur_entity_packet_detail.second != m_gate_version)
	{
		m_logger->error("fail to forward to client due to gate_version not match gate_version {} msg_gate_version {}", m_gate_version, cur_entity_packet_detail.second);
		return;
	}
	get_server()->call_gate(m_gate_id, m_account_anchor, cur_msg.data,cur_entity_packet_detail.first);
}

所以目前只要能够正确的在relay_entityplayer_entity之间同步好gate_version,就可以避免老客户端的数据被发送到新客户端。此时只需要在account_entity::set_gate绑定新gate的时候将gate_version进行自增,并发送到player_entity即可:


std::uint8_t relay_entity::gate_version() const
{
	return m_gate_version;
}

void relay_entity::setup_client_info(std::uint64_t in_gate_id, std::shared_ptr<const std::string> in_account_anchor)
{
	m_gate_id = in_gate_id;
	m_account_anchor = std::move(in_account_anchor);
	if(m_gate_id != 0)
	{
		++m_gate_version;
		if(m_gate_version >= 16)
		{
			m_gate_version = 1;
		}
	}
}

void account_entity::set_gate(const std::string& gate_name, std::uint64_t gate_id, bool during_replace)
{
	m_gate_name = gate_name;
	m_gate_id = gate_id;
	m_relay_entity->setup_client_info(m_gate_id, get_call_proxy());
	if(gate_id != 0)
	{
		m_logger->info("{} notify_rebind_gate_client_finish with new_gate {}", m_base_desc.m_persist_entity_id, gate_name);
		cancel_timer(m_destroy_client_timer);
		m_destroy_client_timer.reset();
		if(during_replace)
		{
			// 如果当前正在顶号过程中 通知客户端顶号成功
			utility::rpc_msg replace_info;
			replace_info.cmd = "reply_replace_account";
			replace_info.args.push_back(is_player_online());
			call_client(replace_info);
		}
		if(!is_player_online())
		{
			m_statem.change_to("show_players");
		}
		else
		{
			// 触发重新同步数据
			utility::rpc_msg account_replace_msg;
			account_replace_msg.cmd = "notify_player_client_replaced";
			account_replace_msg.args.push_back(m_gate_name);
			account_replace_msg.args.push_back(m_relay_entity->gate_version());
			call_player(account_replace_msg);
		}
	}
	// 省略后续代码
}

account_entity创建player_entity时,会做第一次的gate同步:

player_entity* account_entity::create_player_entity(const std::string& player_id, const json& player_doc)
{
	std::string cur_err;
	auto cur_relay_entity_id = std::to_string(get_server()->gen_unique_uint64());
	json::object_t relay_init_info;
	relay_init_info["dest_eid"] = player_id;
	relay_init_info["dest_game"] = get_server()->local_stub_info().name;
	auto cur_relay_entity = get_server()->create_entity("relay_entity", cur_relay_entity_id, gen_online_entity_id(),relay_init_info, cur_err);
	if(!cur_relay_entity)
	{
		m_logger->error("fail to create relay_entity");
		return nullptr;
	}
	m_relay_entity = dynamic_cast<relay_entity*>(cur_relay_entity);
	m_relay_entity->setup_client_info(m_gate_id, get_call_proxy());
	json::object_t player_init_info;
	// 省略很多代码
	auto cur_entity = get_server()->create_entity("player_entity", player_id, gen_online_entity_id(),player_init_info, cur_err);
	if(!cur_entity)
	{
		m_logger->error("fail to create player {} error is {} doc is {}", player_id, cur_err, player_doc.dump());
		request_logout_account(utility::rpc_msg());
		return nullptr;
	}
	
	m_player_id = player_id;
	auto cur_player = dynamic_cast<player_entity*>(cur_entity);
	cur_player->set_gate_version_when_create(m_relay_entity->gate_version());
	// 省略很多代码
}