Mosaic Game 的玩家流程管理
玩家登录流程
在服务器完全准备好之后,就可以接受客户端的登录请求了。由于客户端只能与gate_server进行通信,所以客户端需要以某种方式去获取目标gate_server的ip:port。服务器可以用各种方式去分发当前可用的gate_server列表,这些列表里包含了每个gate_server对外服务的ip:port信息,运维人员会配置好这些ip:port的流量转发到gate_server的监听端口上。客户端选择了一个gate_server之后会将这个gate_server当作其上游进程来开启连接,连接建立之后开始创建client_session:
void basic_client::on_connect(std::shared_ptr<network::net_connection> connection)
{
basic_stub::on_connect(connection);
auto cur_con_name = get_connection_name(connection.get());
if (cur_con_name && *cur_con_name == m_upstream_server.name)
{
m_gate_connection = connection;
m_logger->info("set gate connection with name {}", *cur_con_name);
if (!m_main_player)
{
request_create_session(connection);
}
else
{
request_reconnect_session(connection);
}
}
}
这个client_session其实就是唯一会话id,同一个会话里保证网络包的顺序以及实现断线重连功能 这里会通过m_main_player是否已经初始化来判断是创建一个新的client_session还是复用之前的client_session:
void gate_server::on_request_create_session(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
if(m_stopped)
{
return;
}
std::string error_info = std::string();
std::string cur_session_str;
std::shared_ptr<network::net_connection> outbound_con;
do {
if (m_connection_sessions.find(con->inbound_connection_idx) != m_connection_sessions.end())
{
error_info = "already has session";
break;
}
outbound_con = choose_space_server();
if (!outbound_con)
{
error_info = "no game server available";
break;
}
cur_session_str = generate_session_str();
} while (0);
json reply_msg, reply_param;
reply_msg["cmd"] = "reply_create_session";
reply_param["errcode"] = error_info;
if (error_info.empty())
{
reply_param["account_id"] = on_session_created(con, outbound_con, cur_session_str);
}
else
{
reply_param["account_id"] = std::string{};
}
reply_param["session"] = cur_session_str;
reply_msg["param"] = reply_param;
m_router->push_msg(con.get(), m_local_name_ptr, {}, std::make_shared<const std::string>(reply_msg.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
当客户端选择好一个gate_server后就会发送request_create_session消息,如果gate_server允许登录则会发送一个reply_create_session的消息,这个消息里会带上协商好的session与对应新创建的account_entity的account_id:
void basic_client::on_reply_create_session(std::shared_ptr<network::net_connection> con, const json& msg)
{
std::string errcode;
std::string account_id;
std::string session_str;
try
{
msg.at("errcode").get_to(errcode);
msg.at("account_id").get_to(account_id);
msg.at("session").get_to(session_str);
}
catch (std::exception& e)
{
m_logger->error("on_reply_create_session fail to parse {} error {}", msg.dump(), e.what());
errcode = e.what();
}
if (errcode.empty())
{
m_main_account = entity::entity_manager::instance().create_entity<entity::client_account>(account_id, 0);
m_main_account->set_basic_client(this);
m_main_account->init({});
return;
}
else
{
m_logger->error("on_reply_create_session error {}", errcode);
restore();
}
}
basic_client会以这个account_id创建一个client_account,内部会创建一个登录状态机对象client_login_statem:
class client_account final: public entity_manager::sub_class<client_account>, public utility::component_owner<account_component, client_account>
{
protected:
std::map<utility::persist_entity_id, json> m_player_datas;
statem::client_login_statem m_statem;
// 省略很多代码
};
在服务端的account_entity上也有一个登录状态机login_statem:
class Meta(rpc(owner=1)) account_entity final: public entity_manager::sub_class<account_entity>, public utility::component_owner<account_component, account_entity>
{
using account_entity_RpcSuper = utility::component_owner<account_component, account_entity>;
protected:
std::unordered_map<utility::persist_entity_id, json> m_player_datas;
statem::login_statem m_statem;
// 省略很多代码
};
由于登录流程比较冗长,期间可能出现各种逻辑错误与网络错误,所以客户端与服务端都会用状态机来维护当前的登录状态。这里的登录状态机使用了一个自己做的简单实现,每个状态都继承自下面的state:
template <typename Owner, typename... Args>
class state_machine;
template <typename Owner, typename... Args>
class state
{
public:
state_machine<Owner, Args...>& m_statem;
dispatcher<Args...> m_dispatcher;
virtual void on_create(){}
virtual void on_enter(){}
virtual void on_exit(){}
virtual std::string name() const
{
return "invalid";
}
static std::string static_name()
{
return "invalid";
}
bool change_to(const std::string& dest_state)
{
return m_statem.change_to(dest_state);
}
template <typename K, typename T>
void process_event(const K& event, const T& data)
{
m_dispatcher.dispatch(event, data);
}
template <typename K, typename T>
void notify_statem(const K& event, const T& data);
public:
state(state_machine<Owner, Args...>& in_statem)
: m_statem(in_statem)
{
}
virtual ~state()
{
}
bool active() const
{
return m_statem.active_state() == this;
}
};
这里的state作为所有状态的基类,提供了状态的标识接口与状态的创建与进出回调接口,子类继承的时候需要对这些virtual的接口进行复写。同时process_event接口与notify_statem接口负责提供状态与状态机之间的事件通信。以basic_client为例,登录状态机client_login_statem创建的时候会注册好各种状态,同时设置默认状态为wait_login:
client_login_statem::client_login_statem(entity::client_account* owner)
: utility::state_machine<entity::client_account, enums::account_statem_actions, std::string>(wait_login::static_name(), owner)
{
add_state<auth_account>();
add_state<fetch_players>();
add_state<show_players>();
add_state<wait_login>();
add_state<player_online>();
add_state<create_account>();
add_state<replace_account>();
add_state<logout_account>();
change_to(wait_login::static_name());
}
客户端的输入目前通过basic_client:on_gm_cmd的input_action来通知到client_account的dispatcher:
void basic_client::on_gm_cmd(const std::string& cmd, const json& param, msg_seq_t req_seq)
{
if (cmd == "rpc_msg")
{
// 省略很多代码
}
else if (cmd == "input_action")
{
std::string action_cmd;
std::vector<json> action_args;
try
{
param.at("cmd").get_to(action_cmd);
param.at("args").get_to(action_args);
}
catch (std::exception& e)
{
auto reply = fmt::format("fail to decode action_msg {} e {}", param.dump(), e.what());
m_logger->error("on_gm_cmd {}", reply);
on_http_reply(req_seq, reply);
return;
}
if(m_main_player)
{
m_main_player->set_http_reply("");
m_main_player->dispatcher().dispatch(action_cmd, action_args);
on_http_reply(req_seq, m_main_player->http_reply());
return;
}
if (m_main_account)
{
m_main_account->set_http_reply("");
m_main_account->dispatcher().dispatch(action_cmd, action_args);
on_http_reply(req_seq, m_main_account->http_reply());
return;
}
on_http_reply(req_seq, "no entity");
}
else
{
json_stub::on_gm_cmd(cmd, param, req_seq);
}
}
同时client_account的所有rpc消息处理都是直接发往login_statem上,通过当前状态m_cur_state的dispatcher执行分发:
utility::rpc_msg::call_result client_account::on_rpc_msg(const utility::rpc_msg& msg) override
{
return rpc_owner_on_rpc(msg);
}
utility::rpc_msg::call_result client_account::rpc_owner_on_rpc(const utility::rpc_msg& msg)
{
return m_statem.on_msg(msg.cmd, msg);
}
utility::rpc_msg::call_result client_login_statem::on_msg(const std::string& cmd, const utility::rpc_msg& detail)
{
if(!m_cur_state)
{
return utility::rpc_msg::call_result::rpc_not_found;
}
if(m_cur_state->m_dispatcher.dispatch(cmd, detail))
{
return utility::rpc_msg::call_result::suc;
}
return utility::rpc_msg::call_result::rpc_not_found;
}
每个状态创建的时候都会注册自己感兴趣的相关事件回调,这里既可以注册到状态机自身的state::m_dispatcher上也可以注册到owner()->dispatcher()上,m_dispatcher上的事件对应rpc处理,owner()->dispatcher()对应输入处理:
void wait_login::on_create()
{
owner()->dispatcher().add_listener(std::string("auth_account"), make_active_listener(&wait_login::try_auth_account, this));
owner()->dispatcher().add_listener(std::string("create_account"), make_active_listener(&wait_login::try_create_account, this));
}
void replace_account::on_create()
{
m_dispatcher.add_listener<std::string, utility::rpc_msg, replace_account>("reply_replace_account", &replace_account::on_reply_replace_account, this);
owner()->dispatcher().add_listener(std::string("replace_account"), make_active_listener(&replace_account::try_replace_account, this));
}
但是处理owner()->dispatcher()的时候需要判断自己这个state的状态是不是active的,这样才能维护好状态之间的跳转,所以这里使用了一个封装函数make_active_listener,这样就避免每次在真正的回调里去判断是否是active的:
template<typename K, typename V, typename S>
static std::function<void(const K&, const V&)> make_active_listener(void(S::*cur_callback)(const K&, const V&), S* self)
{
return [=](const K& event, const V& arg)
{
if(!self->active())
{
self->owner()->logger()->error("login state {} not active when handle cmd {}", self->name(), event);
return;
}
else
{
(self->*cur_callback)(event, arg);
}
};
}
账号的登录状态机初始状态为wait_login,在初始的空白账号情况下,wait_login需要先通过create_account来创建一个账号,当创建账号成功的时候会自动切换到展示空角色列表的状态show_players中:
void wait_login::try_create_account(const std::string& cmd, const std::vector<json>& data)
{
if(!active())
{
owner()->logger()->error("login state {} not active when handle cmd {}", name(), cmd);
return;
}
auto cur_owner = owner();
std::string account_name;
std::string account_passwd;
try
{
data.at(0).get_to(account_name);
data.at(1).get_to(account_passwd);
}
catch (const std::exception& e)
{
cur_owner->logger()->error("try_create_account fail to parse {} error {}", json(data).dump(), e.what());
return;
}
cur_owner->try_create_account(account_name, account_passwd);
if(!change_to("create_account"))
{
owner()->logger()->info("wait_login change to create_account false");
return;
}
owner()->logger()->info("wait_login change to create_account true");
}
void create_account::on_reply_create_account(const std::string& cmd, const utility::rpc_msg& data)
{
if(!active())
{
owner()->logger()->error("login state {} not active when handle cmd {}", name(), cmd);
return;
}
if (!data.err.empty())
{
change_to("wait_login");
return;
}
else
{
owner()->logger()->info("on_reply_create_account enter show_players");
change_to("show_players");
}
}
cur_owner->try_create_account会发起一个rpc,并最终转移到服务端登录状态机里的wait_login状态去处理:
void wait_login::try_create_account(const std::string& cmd, const utility::rpc_msg& data)
{
owner()->logger()->info("try_create_account with data {}", json(data).dump());
auto cur_owner = owner();
std::uint32_t cmd_err = 0;
std::string account_name;
std::string account_passwd;
try
{
data.args.at(0).get_to(account_name);
data.args.at(1).get_to(account_passwd);
}
catch (const std::exception& e)
{
cur_owner->logger()->error("try_create_account fail to parse {} error {}", json(data).dump(), e.what());
cmd_err = int(account_errcodes::invalid_msg);
}
if (cmd_err == int(account_errcodes::ok))
{
utility::rpc_msg cur_msg;
cur_msg.cmd = "request_create_account";
cur_msg.args.push_back(account_name);
cur_msg.args.push_back(account_passwd);
owner()->call_service("login_service", cur_msg);
change_to("create_account");
}
else
{
utility::rpc_msg return_info;
return_info.cmd = "reply_create_account";
return_info.err = std::to_string(cmd_err);
return_info.args.push_back(account_name);
owner()->call_client(return_info);
}
}
如果参数校验通过,则会将账号密码信息发送到登录服务login_service上去执行。如果login_service上的创建账号检查通过,会返回一个reply_create_account的rpc到服务端的account_entity,此时服务端的登录状态机的create_account状态会接收这个rpc来执行状态切换以及消息下发:
void create_account::on_create()
{
m_dispatcher.add_listener<std::string, utility::rpc_msg>("reply_create_account", &create_account::reply_create_account, this);
}
void create_account::reply_create_account(const std::string& cmd, const utility::rpc_msg& detail)
{
std::string account_name;
utility::rpc_msg return_info;
return_info.err = detail.err;
try
{
detail.args.at(0).get_to(account_name);
}
catch(std::exception& e)
{
owner()->logger()->error("on_create_account_back fail to parse with error {}", e.what());
return_info.err = "invalid server msg";
}
return_info.cmd = "reply_create_account";
owner()->call_client(return_info);
if(return_info.err.empty())
{
owner()->set_account(account_name);
owner()->dispatcher().dispatch(enums::event_category::account, std::string("auth_account_suc"));
change_to("show_players");
}
else
{
owner()->dispatcher().dispatch(enums::event_category::account,std::string("auth_account_fail"));
change_to("wait_login");
}
}
这个reply_create_account的rpc传递到客户端之后,会由客户端登录状态机的create_account状态来接力处理,这样就是客户端与服务端的登录状态机的典型交互流程:
void create_account::on_create()
{
m_dispatcher.add_listener<std::string, utility::rpc_msg, create_account>("reply_create_account", &create_account::on_reply_create_account, this);
}
void create_account::on_reply_create_account(const std::string& cmd, const utility::rpc_msg& data)
{
if (!data.err.empty())
{
change_to("wait_login");
return;
}
else
{
owner()->logger()->info("on_reply_create_account enter show_players");
change_to("show_players");
}
}
注意这里的password是明文密码,为了增强账号密码的安全性最好对密码执行一个单向变换生成一个password_hash字符串,最简单的就是password_hash=md5(password),或者password_hash=md5(account_name+password)。更优的策略是每次账号注册的时候给这个account生成一个随机的长字符串salt,同时存储到数据库之中。在账号创建时,对密码做加盐变换password_hash=md5(password+salt),将(account,password_hash,salt)这个三元组同时存储在数据库中,这样就可以有效避免数据库泄露之后的暴力破解。如果网络连接是未加密状态的话,客户端需要提前做一次password=md5(password)这样的映射,避免网络中出现明文密码。
如果登录一个已经创建的账号,则会通过try_auth_account来执行账号验证,如果账号验证通过则会切换到获取玩家列表的状态fetch_players:
void wait_login::try_auth_account(const std::string& cmd, const std::vector<json>& data)
{
if(!active())
{
owner()->logger()->error("login state {} not active when handle cmd {}", name(), cmd);
return;
}
auto cur_owner = owner();
std::string account_name;
std::string account_passwd;
try
{
data.at(0).get_to(account_name);
data.at(1).get_to(account_passwd);
}
catch (std::exception& e)
{
cur_owner->logger()->error("try_auth_account fail to parse {} error {}", json(data).dump(), e.what());
return;
}
cur_owner->try_auth_account(account_name, account_passwd);
change_to("auth_account");
}
void auth_account::on_reply_auth_account(const std::string& cmd, const utility::rpc_msg& data)
{
if(!active())
{
owner()->logger()->error("login state {} not active when handle cmd {}", name(), cmd);
return;
}
if(data.err.empty())
{
owner()->enum_dispatcher().dispatch(enums::event_category::account, std::string("auth_account_suc"));
change_to("fetch_players");
}
else
{
if(data.err == "replace account needed")
{
change_to("replace_account");
}
else
{
owner()->enum_dispatcher().dispatch(enums::event_category::account, std::string("auth_account_fail"));
change_to("wait_login");
}
}
}
在fetch_players状态下,服务端会去拉取这个账号下创建的所有角色列表,当角色列表数据返回之后,就会切换到展示角色列表的状态show_players:
void fetch_players::on_reply_fetch_players(const std::string& cmd, const utility::rpc_msg& player_data)
{
if(!active())
{
owner()->logger()->error("login state {} not active when handle cmd {}", name(), cmd);
return;
}
if (!player_data.err.empty())
{
owner()->enum_dispatcher().dispatch(enums::event_category::account, std::string("fetch_players_fail"));
return;
}
std::vector<json::object_t> temp_player_data;
try
{
player_data.args.at(0).get_to(temp_player_data);
}
catch (std::exception& e)
{
owner()->logger()->error("on_fetch_players_back fail to parse args: {} error: {}", json(player_data.args).dump(), e.what());
return;
}
owner()->set_player_datas(temp_player_data);
m_statem.change_to("show_players");
owner()->enum_dispatcher().dispatch(enums::event_category::account, std::string("fetch_players_suc"));
}
在show_players状态下,支持对角色的增删,以及选择一个角色进行become_player上线操作:
void show_players::on_create()
{
owner()->dispatcher().add_listener(std::string("create_player"),make_active_listener(&show_players::try_create_player, this));
owner()->dispatcher().add_listener(std::string("delete_player"), make_active_listener(&show_players::try_delete_player, this));
owner()->dispatcher().add_listener(std::string("become_player"),make_active_listener(&show_players::try_become_player, this));
owner()->dispatcher().add_listener(std::string("logout_account"),make_active_listener(&show_players::try_logout_account, this));
owner()->dispatcher().add_listener(std::string("show_players"),make_active_listener(&show_players::on_show_players, this));
m_dispatcher.add_listener<std::string, utility::rpc_msg, show_players>("reply_create_player", &show_players::on_reply_create_player, this);
m_dispatcher.add_listener<std::string, utility::rpc_msg, show_players>("reply_delete_player", &show_players::on_reply_delete_player, this);
m_dispatcher.add_listener<std::string, utility::rpc_msg, show_players>("reply_become_player", &show_players::on_reply_become_player, this);
}
这个become_player成功之后,服务端与客户端都会以这个player的数据执行player_entity的创建,同时切换到player_online状态:
void show_players::on_reply_become_player(const std::string& cmd, const utility::rpc_msg& detail)
{
m_pending_cmd.clear();
if(!detail.err.empty())
{
owner()->enum_dispatcher().dispatch(enums::event_category::account,std::string("become_player_fail"));
return;
}
utility::persist_entity_id player_id;
json::object_t player_init_info;
try
{
detail.args.at(0).get_to(player_id);
detail.args.at(1).get_to(player_init_info);
}
catch (const std::exception& e)
{
owner()->logger()->warn("on_reply_become_player decode fail {} error {}", json(detail.args).dump(), e.what());
return;
}
m_statem.change_to("player_online");
owner()->on_become_player(player_id, player_init_info);
}
玩家登出流程
在player_online状态,唯一支持的操作是退出当前玩家的登录:
void player_online::try_logout_player(const std::string& cmd, const std::vector<json>& data)
{
owner()->logger()->info("player_online::try_logout_player");
owner()->try_logout_player();
owner()->enum_dispatcher().dispatch(enums::event_category::account, std::string("logout"));
}
void client_account::try_logout_player()
{
return call_server("request_logout_player", {});
}
在client_account::try_logout_player函数中,只会发送一个request_logout_player的RPC到服务端的account_entity去处理。这里会检查当前是否已经创建了player_entity,如果已经有player_entity,则需要先执行这个player_entity的注销:
void account_entity::request_logout_account(const utility::rpc_msg& msg)
{
if(m_statem.active_state_name() == "logout_account")
{
return;
}
if(m_player_id.empty())
{
utility::rpc_msg request_msg;
request_msg.cmd = "request_logout_account";
call_service("login_service", request_msg);
dispatcher().dispatch(enums::event_category::account, "logout");
m_statem.change_to("logout_account");
}
else
{
utility::rpc_msg request_msg;
request_msg.cmd = "request_logout_player";
call_player(request_msg);
m_statem.change_to("logout_account");
}
}
不管有没有对应的在线玩家,这个RPC执行之后都会强行切换到logout_account状态下,此时会等待退出登录的RPC返回,然后切换到未登录wait_login状态。
void logout_account::reply_logout_account(const std::string& cmd, const utility::rpc_msg& data)
{
owner()->call_client(data);
owner()->finish_logout();
m_statem.change_to("wait_login");
}
上面的流程对应的是客户端主动发起的退出登录操作,实际上游戏内会有服务端主动将一个玩家踢出登录的需求,因此在服务端登录状态机player_online监听了退出登录的RPC:
void player_online::on_create()
{
m_dispatcher.add_listener<std::string, utility::rpc_msg, player_online>("notify_player_logout", &player_online::notify_player_logout, this);
}
void player_online::on_enter()
{
}
void player_online::notify_player_logout(const std::string& cmd, const utility::rpc_msg& data)
{
change_to("show_player");
owner()->dispatcher().dispatch(enums::event_category::account, std::string("logout_player"));
owner()->call_client(data);
}
但是此时只是在客户端与服务端同时删除了对应的在线玩家角色player_entity,账号account_entity是依然在线的,登录状态机会切换到show_player状态,即账号仍然在线。如果要彻底回到登录前状态,需要在show_players状态下发起logout_account请求,这样才会将服务端客户端的account_entity彻底销毁,同时服务端客户端之间的网络连接也会断开。
void show_players::try_logout_account(const std::string& cmd, const std::vector<json>& data)
{
owner()->try_logout_account();
m_statem.change_to("logout_account");
}
玩家进出场景与迁移流程
当客户端登录完成并创建出对应的player_entity之后,还需要进入游戏场景中与环境和其他玩家进行交互。这个进入场景的操作由player_space_component::request_enter_space这个rpc来发起:
void player_space_component::request_enter_space(const utility::rpc_msg& msg, std::uint32_t space_no, const std::string& space_id, json::object_t& enter_info);
这里rpc制定了要进入的场景编号space_no和场景实例space_id,enter_info里可以携带进入场景的位置与朝向信息。在各项检查都通过之后,player_entity会向管理场景的服务space_service发出进入场景请求,同时将正在进入的场景编号记录在当前玩家的属性上,避免rpc返回之前发起下一个切换场景的请求:
if(cur_space)
{
leave_space_impl();
}
utility::rpc_msg request_msg;
request_msg.cmd = "request_enter_space";
request_msg.args.push_back(m_owner->entity_id());
request_msg.args.push_back(m_player->prop_data().team().id());
request_msg.args.push_back(space_no);
request_msg.args.push_back(space_id);
request_msg.args.push_back(enter_info);
m_owner->call_service("space_service", request_msg);
m_player->prop_proxy().space().entering_space_no().set(space_no);
return;
如果玩家当前已经在一个场景中,此时还会自动的触发场景的离开操作:
void player_space_component::leave_space_impl()
{
utility::rpc_msg notify_service_msg;
notify_service_msg.cmd = "report_leave_space";
notify_service_msg.args.push_back(m_owner->entity_id());
m_owner->call_service("space_service", notify_service_msg);
auto cur_space = m_owner->get_space();
cur_space->leave_space(m_owner);
}
这里的space_service::report_leave_space里会在space_service上解除当前玩家绑定的场景,而request_enter_space则会添加玩家与指定场景的映射关系,因此需要先执行leave操作,才能去执行enter。
值得注意的是这里的space_id可以传递空字符串,代表选择当前space_no的任一场景实例进入即可,对应的常见的进入主城场景的任意分线。如果space_id不为空且对应的场景实例不存在,这个请求将会执行失败。如果对应的场景实例存在,且各种场景进入的检查都通过了,space_service会发送reply_enter_space到这个player_entity上:
if(!msg.err.empty())
{
m_player->prop_proxy().space().entering_space_no().set(0);
return;
}
if(game_id == m_owner->get_local_server_name())
{
auto new_space = server::space_manager::instance().get_space(space_id);
if(!new_space)
{
m_owner->logger()->error("cant find space {}", space_id);
return;
}
new_space->enter_space(m_owner, enter_info);
return;
}
else
{
utility::rpc_msg request_msg;
request_msg.cmd = "request_migrate_begin";
request_msg.args.push_back(game_id);
request_msg.args.push_back(space_id);
request_msg.args.push_back(union_space_id);
request_msg.args.push_back(enter_info);
m_owner->call_relay_anchor(request_msg);
}
这里首先把之前记录的正在进入的场景编号清空,然后判断要进入的场景是否是同一个进程上。如果是同一个进程,则直接调用这个space_entity::enter_space即可,这个函数里会执行一些数据的记录以及玩家的位置设置,并触发AOI的重新计算。
如果不是同一个进程,则首先需要将当前玩家迁移到目标进程,此时会将玩家的所有数据进行打包,但是打包之前需要先用request_migrate_begin通知对应的relay_anchor设置当前player_entity的状态为迁移中,避免迁移时的消息丢失。至于relay_anchor的具体作用将在后面进行详细解释。
当relay_anchor设置后迁移中的状态之后,会发送reply_migrate_begin回当前的player_entity,此时会通知当前进程的space_manager来执行迁移出去的操作:
void player_space_component::reply_migrate_begin(const utility::rpc_msg& msg, const std::string& game_id, const std::string& space_id, const std::string& union_space_id, const json::object_t& enter_info)
{
auto new_enter_info = enter_info;
server::space_manager::instance().migrate_out(m_owner, game_id, space_id, union_space_id, new_enter_info);
}
void space_manager::migrate_out(entity::actor_entity *cur_entity, const std::string &game_id, const std::string &space_id, const std::string &union_space_id, json::object_t &enter_info)
{
json::object_t migrate_info;
bool enter_new_space = true;
auto pre_space = cur_entity->get_space();
if(pre_space && pre_space->union_space_id() == union_space_id)
{
enter_new_space = false;
enter_info["pos"] = cur_entity->pos();
enter_info["yaw"] = cur_entity->yaw();
}
if(pre_space && enter_new_space)
{
pre_space->leave_space(cur_entity);
}
enter_info["enter_new_space"] = enter_new_space;
cur_entity->migrate_out(migrate_info, enter_new_space);
utility::rpc_msg cur_msg;
cur_msg.cmd = "migrate_in";
m_logger->info("migrate out entity {} to game {} space {} union_space_id {} with info {} enter_new_space {} ", cur_entity->entity_id(), game_id, space_id, union_space_id, json(migrate_info).dump(), enter_new_space);
cur_msg.args.push_back(cur_entity->entity_id());
cur_msg.args.push_back(cur_entity->online_entity_id());
cur_msg.args.push_back(cur_entity->m_base_desc.m_type_name);
cur_msg.args.push_back(space_id);
cur_msg.args.push_back(union_space_id);
cur_msg.args.push_back(enter_info);
cur_msg.args.push_back(migrate_info);
if(enter_new_space)
{
m_server->destroy_entity(cur_entity);
}
m_server->call_server(utility::rpc_anchor::concat(game_id, "space_manager"), cur_msg);
}
这个space_manager::migrate_out会通知当前player_entity先退出已有场景,然后通过player_entity::migrate_out接口来将所有需要迁移的数据打包到migrate_info中,数据打包完成之后会往目标进程的space_manager发送一个migrate_in请求。
void actor_entity::migrate_out(json::object_t& migrate_info, bool enter_new_space)
{
encode_migrate_out_data(migrate_info, enter_new_space);
auto migrate_out_lambda = [=](actor_component* cur_comp)
{
cur_comp->migrate_out(enter_new_space);
};
call_component_interface(migrate_out_lambda);
if(!enter_new_space)
{
m_is_ghost = true;
auto become_ghost_lambda = [=](actor_component* cur_comp)
{
cur_comp->on_become_ghost();
};
call_component_interface(become_ghost_lambda);
}
}
发送到目标进程之后再进行解包并重新创建player_entity,然后执行player_entity::migrate_in操作来恢复一些打包的数据:
if(enter_new_space)
{
cur_entity = dynamic_cast<entity::actor_entity*>(m_server->create_entity(type_id, entity_id, online_entity_id, init_info, error));
if (!cur_entity)
{
m_logger->error("fail to create entity {} type {} with error {}", entity_id, type_id, error);
return;
}
}
else
{
// 省略一些无缝大世界代码
}
cur_entity->migrate_in(init_info, enter_new_space);
if(enter_new_space)
{
cur_space_iter->second->enter_space(cur_entity, enter_info);
}
最后利用打包的场景数据enter_info来执行玩家的进入场景操作,至此一个带迁移的场景切换流程就结束了。
玩家断线流程
玩家的网络状态是不可控的,特别是使用移动网络的手机设备在执行位置移动时可能会出现基站的切换从而导致连接的断开。如果粗暴的将连接断开处理为客户端被动下线的话,玩家的体验就会变得非常的差。因为重新走一遍登录流程来继续游戏会导致服务端角色的销毁后再重新创建,会浪费好几秒的时间,同时玩家状态的重新同步也是非常耗流量的。为了提升玩家在网络突然断开然后又重新连接时的游戏体验,减少重新全同步时的流量成本,现在的游戏服务器基本都会有断线重连这个功能。断线重连功能需要服务端和客户端一起协作:
- 服务端在往客户端发消息的时候,每个包都赋予一个唯一递增序列号,同时本地用滑动窗口的形式存储最近一小段时间内发送到客户端的所有消息,
- 当服务端发现客户端断线时,需要将下发到这个客户端的消息进行缓存住,避免消息的丢失,
- 客户端在发现自己断线之后,立即开启连接到服务器的重试,重试的时候带上最后收到的服务端消息包序列号
- 服务端收到重连的消息请求时,根据上发过来的包序列号进行比对,
- 如果这个包序列号在滑动窗口内,则重连成功,服务端将这个序列号之后的所有消息执行重发
- 如果这个包序列号不在滑动窗口内,则重连失败,服务端通知客户端重新走登录流程
上面介绍的只是一个大概的断线重连步骤,实际的实现中会涉及到很多的细节,这些细节将在网络相关章节中具体阐述。
玩家顶号流程
当一个账号A已经在某个客户端B上成功执行了登录后,玩家选择在另外一个客户端C上尝试登录同样的账号A时,会收到该账号已经在线的错误提示。如果我们要求账号必须要在不在线的时候执行登录,则玩家需要手动在设备B上执行登出操作,然后再在客户端C上执行登录操作,这个流程会伴随着服务端角色的销毁与重建,以及新客户端的重新同步,也是非常影响玩家的体验以及浪费服务端资源。面对这种情景,最好的方式是如果客户端C收到了账号已经在线的错误提示,可以选择执行顶号操作:即通知服务端将这个角色的客户端连接由B切换为C,这样就可以避免服务端角色的销毁与创建。因此在auth_account状态下收到账号已经在线的通知时,会自动切换到replace_account状态:
void auth_account::on_reply_auth_account(const std::string& cmd, const utility::rpc_msg& data)
{
if(data.err.empty())
{
owner()->enum_dispatcher().dispatch(enums::event_category::account, std::string("auth_account_suc"));
change_to("fetch_players");
}
else
{
if(data.err == "replace account needed")
{
change_to("replace_account");
}
else
{
owner()->enum_dispatcher().dispatch(enums::event_category::account, std::string("auth_account_fail"));
change_to("wait_login");
}
}
}
在replace_account状态下接收到客户端发起的顶号确认时,会发送顶号try_replace_account这个RPC到服务器,这里需要带上被顶号的账号other_account_id,这个参数在上面的reply_auth_account会下发:
void replace_account::try_replace_account(const std::string& cmd, const std::vector<json>& data)
{
std::string account_name;
std::string other_account_id;
try
{
data.at(0).get_to(account_name);
data.at(1).get_to(other_account_id);
}
catch(const std::exception& e)
{
owner()->logger()->error("try_replace_account fail to parse {} with error {}", json(data).dump(), e.what());
return;
}
owner()->try_replace_account(account_name, other_account_id);
}
服务端对应的状态机replace_account会将这个请求转发到管理登录与在线的服务login_service上,检查通过之后会通知到replace_account状态机开启顶号流程:
void login_service::replace_account_impl(const std::string& new_account_id, const std::string& account_name, const std::string& gate_name)
{
auto pre_account_info = m_online_accounts[account_name];
utility::rpc_msg reply;
reply.cmd = "notify_account_client_replaced";
reply.args.push_back(new_account_id);
reply.args.push_back(gate_name);
auto cur_server = get_server();
cur_server->call_server(this, pre_account_info.account_id, reply);
}
void account_entity::notify_account_client_replaced(const utility::rpc_msg& msg, const std::string& new_account, const std::string& new_gate_name)
{
m_logger->info("{} notify_account_client_replaced new_account {} new_gate {}", m_base_desc.m_persist_entity_id, new_account, new_gate_name);
get_server()->rebind_gate_client_begin(this, new_account, new_gate_name);
}
顶号的时候需要通知原有的pre_account_info.account_id对应的account_entity重新绑定到新的客户端连接上,这个客户端连接的标识符就是(new_gate_name,new_account)这两个字符串组成的pair:
void space_server::rebind_gate_client_begin(entity::account_entity* cur_account, const std::string& new_account, const std::string& new_gate_name)
{
auto temp_iter = new_account.rfind(utility::rpc_anchor::seperator);
if(temp_iter == std::string::npos)
{
m_logger->error("invalid new_account id {}", new_account);
return;
}
auto new_account_id = new_account.substr(temp_iter + 2);
// 这里可能出现同进程的情况
auto new_account_gate_con = m_router->get_connection_with_name(new_gate_name);
if(!new_account_gate_con)
{
m_logger->error("fail to find gate connection for client {}", new_gate_name);
return;
}
json replace_msg, replace_param;
// 通知老的client 对应的gate 被顶号
auto old_gate_id = cur_account->get_gate_id();
auto new_gate_id = get_connection_inbound_idx(new_account_gate_con);
replace_msg["cmd"] = "notify_client_remove_account";
replace_msg["param"] = json::object_t();
m_router->push_msg(old_gate_id, m_local_name_ptr, cur_account->get_shared_global_id(), std::make_shared<std::string>(replace_msg.dump(4)), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
// 通知新的account对应的entity顶号成功 触发这个account的自动销毁
utility::rpc_msg replace_info;
replace_info.cmd = "reply_replace_account";
replace_info.args.push_back(cur_account->is_player_online());
call_server(cur_account, new_account, replace_info);
replace_msg["cmd"] = "notify_client_rebind_account";
replace_param["pre_connected_account"] = new_account_id;
replace_param["new_connected_account"] = cur_account->entity_id();
replace_msg["param"] = replace_param;
m_router->push_msg(new_gate_id, m_local_name_ptr, {}, std::make_shared<std::string>(replace_msg.dump(4)), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
// 切换entity的gate 开启客户端掉线的timer 等待重新绑定之后会取消这个timer
m_gate_entities[old_gate_id].erase(cur_account);
cur_account->set_gate(std::string{}, 0);
}
在这个执行顶号的核心函数中,主要完成了如下任务:
- 使用
notify_client_remove_account,通知老的client连接,其连接因为顶号被断开,避免其触发断线重连
void gate_server::on_notify_client_remove_account(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
if(!dest)
{
return;
}
json replay_msg;
replay_msg["msg"] = "client replaced";
request_client_close_impl(*dest, replay_msg);
}
- 使用
reply_replace_account通知新的account其顶号成功,并触发新account的自动销毁
void replace_account::on_reply_replace_account(const std::string& cmd, const utility::rpc_msg& data)
{
if(!data.err.empty())
{
owner()->call_client(data);
change_to("wait_login");
return;
}
else
{
// 顶号成功时 需要在切换完client之后再下发这个顶号成功消息
owner()->on_replace_account_suc(data);
}
}
void account_entity::on_replace_account_suc(const utility::rpc_msg& msg)
{
m_logger->info("{} on_replace_account_suc {} ", m_base_desc.m_persist_entity_id, json(msg.args).dump());
get_server()->replace_client_remove_account(this);
}
void space_server::replace_client_remove_account(entity::account_entity* cur_account)
{
auto cur_gate_id = cur_account->get_gate_id();
m_gate_entities[cur_gate_id].erase(cur_account);
entity::entity_manager::instance().destroy_entity(cur_account);
}
-
使用
notify_client_rebind_account通知新client对应的gate将其绑定的account的地址切换为老的account地址,这个rpc函数如果执行成功,会以notify_rebind_gate_client_finish这个rpc来通知回当前的account_entity -
通过
set_gate来清空当前account的gate信息,触发创建自动倒计时销毁的计时器,同时通知在线玩家客户端已经丢失
void account_entity::set_gate(const std::string& gate_name, std::uint64_t gate_id, bool during_replace)
{
m_gate_name = gate_name;
m_gate_id = gate_id;
m_relay_entity->setup_client_info(m_gate_id, get_call_proxy());
if(gate_id != 0)
{
// 设置新的有效gate部分代码 此处先暂时省略
}
else
{
if(is_player_online())
{
utility::rpc_msg client_destory_msg;
client_destory_msg.cmd = "notify_player_client_destroyed";
call_player(client_destory_msg);
}
cancel_timer(m_destroy_client_timer);
m_destroy_client_timer.reset();
m_destroy_client_timer = add_timer_with_gap(std::chrono::seconds(m_auto_logout_second_when_client_destroy), [this]()
{
request_logout_account(utility::rpc_msg());
});
}
}
当notify_rebind_gate_client_finish通知回当前的account_entity之后,完整的顶号流程就算完成了,这里会通过set_gate重更新绑定一下新client连接对应的gate,并删除之前添加的自动销毁计时器,同时通知对应的在线玩家连接到了新的客户端:
void space_server::notify_rebind_gate_client_finish(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const json& msg)
{
std::string entity_id;
try
{
msg.at("entity_id").get_to(entity_id);
}
catch (std::exception& e)
{
m_logger->error("notify_rebind_gate_client_finish fail parse {} error {}", msg.dump(), e.what());
return;
}
auto cur_account = entity::entity_manager::instance().get_entity<entity::account_entity>(entity_id);
if (!cur_account)
{
m_logger->error("notify_rebind_gate_client_finish cant find account entity {}", entity_id);
return;
}
if(cur_account->get_gate_id() != 0)
{
m_logger->error("notify_rebind_gate_client_finish {} expect gate_id 0 while meet", entity_id, cur_account->get_gate_id());
return;
}
auto cur_con_id = get_connection_inbound_idx(con.get());
m_gate_entities[cur_con_id].insert(cur_account);
// 触发重新同步数据
cur_account->set_gate(*get_connection_name(con.get()), cur_con_id, true);
}
void account_entity::set_gate(const std::string& gate_name, std::uint64_t gate_id, bool during_replace)
{
m_gate_name = gate_name;
m_gate_id = gate_id;
m_relay_entity->setup_client_info(m_gate_id, get_call_proxy());
if(gate_id != 0)
{
m_logger->info("{} notify_rebind_gate_client_finish with new_gate {}", m_base_desc.m_persist_entity_id, gate_name);
cancel_timer(m_destroy_client_timer);
m_destroy_client_timer.reset();
if(during_replace)
{
utility::rpc_msg replace_info;
replace_info.cmd = "reply_replace_account";
replace_info.args.push_back(is_player_online());
call_client(replace_info);
}
if(!is_player_online())
{
m_statem.change_to("show_players");
}
else
{
// 触发重新同步数据
utility::rpc_msg account_replace_msg;
account_replace_msg.cmd = "notify_player_client_replaced";
account_replace_msg.args.push_back(m_gate_name);
account_replace_msg.args.push_back(m_relay_entity->gate_version());
call_player(account_replace_msg);
}
}
// 省略之前介绍的客户端断线部分
}
这里的set_gate传递了during_replace=true,因此会向client发送顶号彻底完成的消息,让其客户端登录状态机进行状态切换。
当服务端玩家player_entity收到其连接到新客户端的notify_player_client_replaced请求之后,会将当前玩家的数据打包之后进行重新同步:
void player_entity::notify_player_client_replaced(const utility::rpc_msg& msg, const std::string& new_gate_name, std::uint8_t new_gate_version)
{
m_logger->warn("player client replaced by new gate {} version {}", new_gate_name, new_gate_version);
m_gate_version = new_gate_version;
if(new_gate_name.empty())
{
return;
}
auto sync_info = encode_with_flag(std::uint32_t(enums::encode_flags::self_client));
utility::rpc_msg full_sync_msg;
full_sync_msg.cmd = "create_player";
full_sync_msg.args.push_back(entity_id());
full_sync_msg.args.push_back(std::move(sync_info));
call_client(full_sync_msg);
m_login_dispatcher.dispatch(true);
}
当客户端接收到这个create_player数据之后,就会创建出对应的玩家、所在的场景以及周围的其他同步entity。