Mosaic Game 的进程生命周期
服务器启动流程
在之前的游戏服务器架构中我们已经初步的介绍了游戏服务器中的一些常见的进程角色,在mosaic_game中也基本复用了相关概念,在服务器集群中的进程角色有如下六种:
- 管理进程,作为全局的单点进程,第一个执行启动,用来提供进程注册和服务发现等功能,其代码在
roles/server/mgr_server目录下 - 场景进程,用来承载各种局内玩法,作为各种
entity与space的容器,其代码在roles/server/space_server目录下 - 网关进程,用来中转客户端与服务端之间的消息,其代码在
roles/server/gate_server目录下 - 服务进程,用来承载各种局外玩法,如聊天、好友、排行榜等,其代码在
roles/server/service_server目录下 - 数据库进程,用来转接游戏内对数据库的读写请求,其使用的后端数据库为
mongodb,其代码在roles/server/db_server目录下 - 缓存进程,用来承接游戏内对一些缓存数据的读写请求,其使用的后端缓存为
redis,其代码在roles/server/redis_server目录下 - 地图进程,用来处理一些场景的寻路、物理、
AOI等资源查询,其代码在roles/server/map_server下
在了解这些进程角色之后,就可以来介绍一下这些进程的启动流程了。我提供了一个非常简陋的服务器启动脚本deploy/scripts/run_servers.py,在这个python文件中,可以指定除了管理进程之外的其他进程的数量,同时还可以指定一些数据文件位置和一些外围服务所需的配置文件位置。在启动流程中,mgr_server会作为第一个进程起来,这个角色的进程只有一个实例。接下来依次会启动缓存进程、数据库进程、服务进程、网关进程、场景进程、地图进程:
cur_server_cmd = "nohup ../bin/mgr_server -c {0} -l {1} -n mgr_server > /dev/null 2>{1}/mgr_server.log &".format(options.config_path, options.log_path)
os.system(cur_server_cmd)
sleep(1)
for i in range(1):
cur_server_cmd = "nohup ../bin/redis_server -c {0} -l {1} -n redis_server_{2} -f {3} > /dev/null 2>{1}/redis_server_{2}.log &".format(options.config_path, options.log_path, i, options.redis_config)
os.system(cur_server_cmd)
for i in range(options.db_num):
cur_server_cmd = "nohup ../bin/db_server -c {0} -l {1} -n db_server_{2} -m {3} > /dev/null 2>{1}/db_server_{2}.log &".format(options.config_path, options.log_path, i, options.mongo_config)
os.system(cur_server_cmd)
for i in range(options.service_num):
cur_server_cmd = "nohup ../bin/service_server -c {0} -l {1} -n service_server_{2} -d {3}> /dev/null 2>{1}/service_server_{2}.log &".format(options.config_path, options.log_path, i, options.data_dir)
os.system(cur_server_cmd)
for i in range(options.gate_num):
cur_server_cmd = "nohup ../bin/gate_server -c {0} -l {1} -n gate_server_{2} > /dev/null 2>{1}/gate_server_{2}.log &".format(options.config_path, options.log_path, i)
os.system(cur_server_cmd)
for i in range(options.game_num):
cur_server_cmd = "nohup ../bin/space_server -c {0} -l {1} -n space_server_{2} -d {3}> /dev/null 2>{1}/space_server_{2}.log &".format(options.config_path, options.log_path, i, options.data_dir)
os.system(cur_server_cmd)
for i in range(options.map_server_num):
cur_server_cmd = "nohup ../bin/map_server -c {0} -l {1} -n map_server_{2} -d {3}> /dev/null 2>{1}/map_server_{2}.log &".format(options.config_path, options.log_path, i, options.data_dir)
os.system(cur_server_cmd)
sleep(1)
开头的第一个sleep(1)是为了保证在其他进程初始化时当前的mgr_server已经初始化好,后面的sleep(1)是为了在自动化测试的时候避免出现客户端启动了但是服务器还没有准备好的情况。
上述启动的各种server进程对应的逻辑类都继承自include/stub/json_stub.h里提供的json_stub,roles/basic_client里提供的客户端也继承自json_stub。同时这个json_stub又继承自basic_stub,这个basic_stub就是mosaic_game中所有进程角色的逻辑框架。在进程启动时,会通过相关配置文件来初始化这个逻辑框架运行所需的一些基础信息:
basic_stub::basic_stub(boost::asio::io_context& in_io_con, const stub_info& in_local_server, const stub_info& in_upstream_server, std::size_t in_timeout, utility::ts_t in_timer_check_gap_ms, std::uint32_t async_thread_num)
: m_io_context(in_io_con)
, m_local_server(in_local_server)
, m_upstream_server(in_upstream_server)
, m_logger(utility::get_logger(in_local_server.name))
, m_connection_timeout(in_timeout)
, m_timer_check_gap_ms(in_timer_check_gap_ms)
, m_local_name_ptr(std::make_shared<const std::string>(in_local_server.name))
, m_upstream_name_ptr(std::make_shared<const std::string>(in_upstream_server.name))
, m_start_ts(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count())
, m_anchor_name_prefix(in_local_server.name + utility::rpc_anchor::seperator)
, m_async_thread_num(async_thread_num)
{
m_asio_wrapper = std::make_unique<asio_wrapper>(m_io_context);
}
这里的in_local_server与in_upstream_server都是stub_info类型,用来明确一个进程的名字、类型与端口信息:
struct stub_info
{
std::string ip;
std::uint16_t port;
std::string rsa_key;
std::string name;
std::string upstream;
std::uint16_t http_port;
std::string type;
};
in_local_server代表当前进程的配置信息, in_upstream_server代表当前进程的上游进程对应的信息。除了mgr_server之外的其他进程在初始化的时候都需要传入对应的mgr_server作为上游管理进程,所以这里的ip,port字段其实只有mgr_server对应的stub_info才需要设置,其他时候保留为空。一个集群中的所有stub_info里的name不得重复,因为这个name字段就是此进程的唯一标识符。如果一个stub_info有上游,则upstream字段就是对应上游进程stub_info里的name。
在basic_stub的start函数里会使用create_router来初始化网络相关资源,然后根据upstream_server里的配置调用connect_to_server去对应的上游进程执行注册。同时调用start_accept根据local_server里的port,http_port来分别开启对正常游戏连接端口以及http调试端口的监听:
void basic_stub::start()
{
m_logger->info("server start");
create_router();
if (!m_upstream_server.name.empty())
{
add_named_server(m_upstream_server);
connect_to_server(m_upstream_server.name);
}
start_accept();
m_asio_wrapper->m_timer.expires_from_now(std::chrono::seconds(1));
m_asio_wrapper->m_timer.async_wait([this](const asio_error_code& error)
{
if(error)
{
m_logger->error("m_asio_wrapper timer error {}", error.message());
exit(1);
}
this->main_loop();
});
keep_alive_callback();
m_async_task_channels = std::vector<mutex_channel<std::function<void()>>>(m_async_thread_num);
m_async_threads.reserve(m_async_thread_num);
for(std::uint32_t i = 0; i< m_async_thread_num; i++)
{
m_async_threads.push_back(std::make_unique<std::thread>([this, i](){
this->async_worker_loop(i);
}));
}
}
第一次启动的时候先用一个1s间隔的计时器去延迟启动框架主循环,keep_alive_callback则负责定期向上游进程发送心跳包来执行进程保活,最后再按照配置去创造一些后台worker线程来作为线程池备用。注意主循环的代码是跑在asio相关线程上的,这里的worker线程并不会处理任何asio相关的任务,asio的线程是在main函数里创建的,不归basic_stub维护。下面就是gate_server在main函数里启动asio线程的相关代码,这里设置为threads为1代表只开启一个asio网络线程:
std::uint8_t const threads = 1;
std::size_t expire_time = 30000;
asio::io_context ioc{ threads };
server::gate_server cur_server = server::gate_server(ioc, local_stub_info, upstream_stub_info, expire_time, utility::ts_t(20));
cur_server.start();
std::vector<std::thread> v;
v.reserve(threads);
for (auto i = threads; i > 0; --i)
v.emplace_back(
[&ioc]
{
ioc.run();
});
for(auto& one_thread: v)
{
one_thread.join();
}
事实上除了上面介绍的这些手动创建的线程之外,还会有一个由spdlog自动创建的日志线程,由于这个线程完全由spdlog托管,所以这里就不去深究了。
不同的进程角色在启动时不仅要执行这个start函数,还有一些自己的额外逻辑去处理。所以在子类中,一般会提供一个自己的do_start函数来执行各自的初始化流程,并在这个流程中去调用basic_stub::start:
- 在
map_server的do_start最为简单,只是初始化了内部的地图设置
void map_server::do_start(const map_config& in_map_config)
{
m_map_config = in_map_config;
json_stub::start();
}
- 在
redis_server的do_start函数中,会构造线程池去连接后端的redis集群
void redis_server::do_start(const redis_config &redis_servers, std::uint8_t worker_num)
{
for (std::uint8_t i = 0; i < worker_num; i++)
{
auto cur_task_logger = utility::get_logger("redis_worker_" + std::to_string(i) );
workers.push_back(std::make_shared<redis_worker>(redis_servers, redis_task_channels, cur_task_logger));
}
for (auto &one_worker : workers)
{
work_threads.emplace_back([=]()
{ one_worker->run(); });
}
json_stub::start();
}
- 在
db_server中的do_start函数中,也会创建线程池去连接后端的mongodb集群
void db_server::do_start(const mongo_config& mongo_servers, std::uint8_t worker_num)
{
auto cur_task_logger = utility::get_logger("mongo_worker");
for (std::uint8_t i = 0; i < worker_num; i++)
{
workers.push_back(std::make_shared<mongo_worker>(mongo_servers, mongo_task_channels, cur_task_logger));
}
for (auto& one_worker : workers)
{
work_threads.emplace_back([=]()
{
one_worker->run();
});
}
json_stub::start();
}
- 在
space_server中的do_start函数,负责初始化entity系统、manager系统以及配置表系统
void space_server::do_start()
{
entity::entity_manager::instance().init();
json_stub::start();
manager_base::init_managers(this);
misc::stuff_utils::init();
global_config_mgr::instance();
}
- 在
service_server中的do_start函数,负责初始化自己的服务管理器以及本地单例manager
void service_server::do_start()
{
m_service_mgr = &service::service_manager::instance();
m_service_mgr->init();
manager_base::init_managers(this);
json_stub::start();
}
- 在
mgr_server中的do_start函数,负责启动一个创建service的计时器
void mgr_server::do_start()
{
json_stub::start();
m_service_create_check_timer = add_timer_with_gap(std::chrono::milliseconds(1000), [this]() {
check_to_create_service();
});
}
- 在
gate_server中的do_start函数, 负责启动一个计时器去定期清理无效的会话
void gate_server::do_start()
{
json_stub::start();
m_session_remove_timer = add_timer_with_gap(std::chrono::seconds(1), [this]()
{
this->check_remove_session();
});
}
服务器主循环
在框架主循环中main_loop,有五种信息需要处理,分别是:
- 普通网络消息
network::msg_task,对应的是服务器进程之间以及客户端与网关进程之间的业务消息 http消息http_utils::request,对应的是此进程开启的http服务接收的消息,目前主要作为调试命令使用- 业务计时器超时消息,对应的是游戏业务逻辑内的各种计时器,而不是
asio提供的计时器 - 连接控制消息
connection_ctrl_msg,对应的是各进程之间的tcp连接的建立与断开消息 - 主线程回调消息
std::function<void()>,对应的是一些非主线程执行的业务回调,例如http请求以及异步线程池处理的相关业务
void basic_stub::main_loop()
{
m_logger->flush();
auto cur_msg_handler = [this](std::shared_ptr<network::net_connection> con, const network::msg_task& one_msg)
{
return this->on_msg(con, one_msg);
};
auto cur_http_handler = [this](const http_utils::request& req, msg_seq_t req_seq)
{
return this->on_http_request(req, req_seq);
};
auto cur_conn_ctrl_msg_handler = [this](const network::connection_ctrl_msg& msg)
{
return this->on_conn_ctrl_msg(msg);
};
std::uint64_t loop_total_ms = 0;
do
{
on_new_frame();
poll_mainloop_tasks();
auto poll_begin_ts = utility::timer_manager::now_ts();
m_router->poll_msg(cur_msg_handler);
http::http_request_mgr::poll_request(cur_http_handler);
poll_timers(utility::timer_manager::now_ts());
m_router->poll_ctrl_msg(cur_conn_ctrl_msg_handler);
auto poll_end_ts = utility::timer_manager::now_ts();
loop_total_ms += poll_end_ts - poll_begin_ts;
if (poll_end_ts - poll_begin_ts > m_high_load_threshold * m_timer_check_gap_ms)
{
continue;
}
else
{
break;
}
}while (!m_stopped);
if(m_stopped)
{
// 暂时忽略进程退出相关代码
}
m_asio_wrapper->m_timer.expires_from_now(std::chrono::milliseconds(std::max(int64_t(1), int64_t(m_timer_check_gap_ms) - int64_t(loop_total_ms))));
m_asio_wrapper->m_timer.async_wait([this](const asio_error_code& error)
{
(void)error;
this->main_loop();
});
return;
}
在单次主循环处理完成之后,会检查此次主循环的执行时间。如果执行时间小于指定的阈值m_high_load_threshold * m_timer_check_gap_ms,则说明当前要处理的任务比较多,负载比较高,此时会继续执行一次逻辑主循环,直到单次逻辑主循环消耗的时间小于指定值或者外部发起了关服请求。在本批次的逻辑主循环执行完成之后,继续创建一个计时器去延迟执行下一次主循环,这里的延迟设置是为了尽量的保证主循环开始的间隔保持为预设的m_timer_check_gap_ms,同时避免主循环空跑造成的性能浪费。
由于网络相关逻辑由network_router管理,与basic_stub的业务逻辑相解耦,所以在处理这些网络消息的时候,外部需要提供相关的回调函数,也就是上面代码中构造的三个msg_handler。这里的on_msg负责处理进程之间的业务消息,on_ctrl_msg负责处理进程间的连接建立与断开,这两个函数的细节将在后续的网络细节相关章节中介绍。on_http_request则负责处理本地http服务器接收到的http请求,这里的http请求主要是一些运维指令相关,内部会调用on_gm_cmd去分发这些运维指令:
void json_stub::on_http_request(const http_utils::request& req, msg_seq_t req_seq)
{
std::string cmd;
json param;
try
{
auto cur_json = json::parse(req.body);
cur_json.at("cmd").get_to(cmd);
cur_json.at("param").get_to(param);
}
catch(const std::exception& e)
{
auto reply = fmt::format("fail to parse http req for req uri {} content {} req_seq {} error {}", req.uri, req.body, req_seq, e.what());
m_logger->info(reply);
on_http_reply(req_seq, reply);
return;
}
on_gm_cmd(cmd, param, req_seq);
}
服务器间连接
目前的流程中,所有的进程角色在启动后都会向其上游角色发起连接。上游连接建立成功之后,会在对应的on_connect函数里执行一些自定义的逻辑,其中最重要的就是通过set_stub_info这个指令告诉mgr_server当前连接发起方的进程角色是什么:
void json_stub::on_connect(std::shared_ptr<network::net_connection> connection)
{
basic_stub::on_connect(connection);
const auto& cur_connection_dest = get_connection_name(connection.get());
auto cur_server_iter = m_named_servers.find(*cur_connection_dest);
if (cur_server_iter == m_named_servers.end())
{
close_connection(connection);
return;
}
json request_msg_1;
request_msg_1["cmd"] = "set_stub_info";
json local_info;
local_info["stub_info"] = m_local_server;
request_msg_1["param"] = local_info;
std::shared_ptr<std::string> msg_ptr = std::make_shared<std::string>(request_msg_1.dump());
m_router->push_msg(connection.get(), m_local_name_ptr, cur_connection_dest, msg_ptr, enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
一个进程角色向mgr_server汇报set_stub_info后并不代表这个进程角色已经可用,只有其向mgr_server发送report_server_ready之后这个进程才能作为完整的角色来提供服务。gate_server的功能比较单一,因此在其on_connect函数中会直接向mgr_server汇报当前进程已经可用
void gate_server::on_connect(std::shared_ptr<network::net_connection> connection)
{
json_stub::on_connect(connection);
auto connection_name_ptr = connection->get_connection_name();
auto cur_server_iter = m_named_servers.find(*connection_name_ptr);
if (cur_server_iter == m_named_servers.end())
{
return;
}
if (cur_server_iter->second.type == "space_server")
{
m_connected_gameservers[*connection_name_ptr] = connection;
}
else if(cur_server_iter->second.type == "mgr_server")
{
json report_ready_info, temp_param;
report_ready_info["cmd"] = "report_server_ready";
temp_param["server_name"] = *m_local_name_ptr;
temp_param["server_type"] = "gate_server";
report_ready_info["param"] = temp_param;
m_router->push_msg(connection.get(), m_local_name_ptr, {}, std::make_shared<std::string>(report_ready_info.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
}
与gate_server类似的进程角色还有map_server,db_server,redis_server,都是上游连接建立之后立即汇报server_ready。但是space_server和service_server就复杂了一些,他们不能在连接到mgr_server之后立即汇报就绪状态,因为这些进程角色需要与其他进程角色相配合才能提供完整的角色服务。这些角色的进程在连接建立之后会向mgr_server请求资源进程角色列表:
void service_server::request_allocate_resource_server(const std::string& resource_server_type)
{
json request_msg, request_param;
request_msg["cmd"] = "request_allocate_resource";
request_param["from_server_name"] = m_local_server.name;
request_param["from_server_type"] = m_local_server.type;
request_param["resource_server_type"] = resource_server_type;
request_msg["param"] = request_param;
auto msg_ptr = std::make_shared<std::string>(request_msg.dump());
auto remote_name_ptr = std::make_shared<std::string>(m_upstream_server.name);
if (!m_router->push_msg(m_local_name_ptr, remote_name_ptr, msg_ptr, enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0)))
{
add_timer_with_gap(std::chrono::milliseconds(2 * 1000), [resource_server_type, this]() {
request_allocate_resource_server(resource_server_type);
});
}
}
void service_server::on_connect(std::shared_ptr<network::net_connection> connection)
{
json_stub::on_connect(connection);
auto connection_name_ptr = get_connection_name(connection.get());
if (*connection_name_ptr == m_upstream_server.name)
{
request_allocate_resource_server("db_server");
request_allocate_resource_server("redis_server");
}
}
在mgr_server中接收到这些资源申请请求之后,会选取当前资源池中连接数最少的进程来返回:
std::string mgr_server::allocate_resource_svr(std::unordered_map<std::string, resource_stub_info>& resource_svrs, const std::string& from_server_name)
{
if(resource_svrs.empty())
{
return {};
}
std::vector<std::pair<std::string, std::size_t>> resource_server_loads;
resource_server_loads.reserve(resource_svrs.size());
for (const auto& one_pair : resource_svrs)
{
if(!one_pair.second.ready)
{
continue;
}
resource_server_loads.emplace_back(one_pair.first, one_pair.second.connected_svrs.size());
}
std::sort(resource_server_loads.begin(), resource_server_loads.end(), [](const std::pair<std::string, std::size_t>& a, const std::pair<std::string, std::size_t>& b)
{
return a.second < b.second;
});
auto dest_resource_svr = resource_server_loads[0].first;
resource_svrs[dest_resource_svr].connected_svrs.insert(from_server_name);
return dest_resource_svr;
}
当space_server与service_server收到分配的资源进程列表之后,会主动的向这些资源进程发起连接:
void space_server::on_reply_allocate_resource(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
stub_info cur_resource_svr;
std::string resource_server_type;
std::string cur_err;
try
{
msg.at("resource_server_type").get_to(resource_server_type);
msg.at("errcode").get_to(cur_err);
if(!cur_err.empty())
{
m_logger->warn("on_reply_allocate_resource errcode {}", cur_err);
add_timer_with_gap(std::chrono::milliseconds(2 * 1000), [resource_server_type, this]() {
request_allocate_resource_server(resource_server_type);
});
return;
}
msg.at("resource_svr").get_to(cur_resource_svr);
}
catch (std::exception& e)
{
m_logger->warn("on_reply_allocate_resource msg invalid {} error {}", msg.dump(4), e.what());
return;
}
m_named_servers[cur_resource_svr.name] = cur_resource_svr;
connect_to_server(cur_resource_svr.name);
}
当各项进程角色连接数量得到满足之后,就会向mgr_server汇报自己这个进程已经可用了。
void space_server::on_connect(std::shared_ptr<network::net_connection> connection)
{
json_stub::on_connect(connection);
auto connection_name_ptr = get_connection_name(connection.get());
if (*connection_name_ptr == m_upstream_server.name)
{
request_allocate_counter("online_session");
request_allocate_resource_server("db_server");
request_allocate_resource_server("redis_server");
}
else
{
auto cur_server_iter = m_named_servers.find(*connection_name_ptr);
if (cur_server_iter == m_named_servers.end())
{
return;
}
if (cur_server_iter->second.type == "db_server")
{
m_router->link_anchor_to_connection("db_server", connection.get());
m_logger->info("m_connected_resource_servers add {}", *connection_name_ptr);
}
else if(cur_server_iter->second.type == "redis_server")
{
m_router->link_anchor_to_connection("redis_server", connection.get());
m_logger->info("m_connected_resource_servers add {}", *connection_name_ptr);
}
else
{
return;
}
if(m_router->has_anchor("redis_server") && m_router->has_anchor("db_server"))
{
json report_ready_info, temp_param;
report_ready_info["cmd"] = "report_server_ready";
temp_param["server_name"] = *m_local_name_ptr;
temp_param["server_type"] = m_local_server.type;
report_ready_info["param"] = temp_param;
m_router->push_msg( m_local_name_ptr,std::make_shared<std::string>(m_upstream_server.name), std::make_shared<std::string>(report_ready_info.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
}
}
当space_server变成ready之后,会向gate_server进行广播:
// void mgr_server::on_report_server_ready(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
json broadcast_msg, param;
std::vector<stub_info> temp_stub_infos;
temp_stub_infos.push_back(m_named_servers[server_name]);
broadcast_msg["cmd"] = "notify_server_ready";
param["servers"] = temp_stub_infos;
broadcast_msg["param"] = param;
auto cur_info = std::make_shared<std::string>(broadcast_msg.dump(4));
broadcast_to_space_svrs(cur_info);
if(server_type == "space_server")
{
broadcast_to_gate_svrs(cur_info);
}
gate_server收到space_server的ready消息之后,会主动发起一个连接:
void gate_server::on_notify_server_ready(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> dest, const json& msg)
{
std::vector<stub_info> ready_servers;
try
{
msg.at("servers").get_to(ready_servers);
}
catch (std::exception& e)
{
m_logger->error("on_notify_server_ready fail to parse {} error {}", msg.dump(4), e.what());
return;
}
for (const auto& one_server : ready_servers)
{
if (one_server.type != "space_server")
{
continue;
}
m_named_servers[one_server.name] = one_server;
if (!m_router->has_connection_with_name(one_server.name))
{
connect_to_server(one_server.name);
}
}
}
当一个service_server变成ready之后,mgr_server并不会立即往这些service_server分配service去创建,而是等待一个计时器超时再去创建:
void mgr_server::check_to_create_service()
{
if(m_stopped)
{
return;
}
if (m_service_create_check_timer.valid())
{
m_timer_mgr.cancel_timer(m_service_create_check_timer);
m_service_create_check_timer.reset();
}
std::uint32_t need_server_num = 0;
for (const auto& one_pair : m_services_to_create)
{
need_server_num = std::max(need_server_num, one_pair.second);
}
std::vector<std::string> ready_service_servers;
for (const auto& one_pair : m_service_stub_infos)
{
if (one_pair.second.ready)
{
ready_service_servers.push_back(one_pair.first);
}
}
if (need_server_num > ready_service_servers.size())
{
// m_logger->error("need_server_num {} is larger than m_service_stub_infos size {}", need_server_num, ready_service_servers.size());
add_timer_with_gap(std::chrono::milliseconds(1000), [this]() {
check_to_create_service();
});
return;
}
if (m_min_service_server_num > ready_service_servers.size())
{
// m_logger->error("m_min_service_server_num {} is larger than m_service_stub_infos size {}", m_min_service_server_num, ready_service_servers.size());
add_timer_with_gap(std::chrono::milliseconds(1000), [this]() {
check_to_create_service();
});
return;
}
std::unordered_map<std::string, std::vector<std::string>> temp_services_on_server;
std::uint32_t temp_counter = 0;
for (const auto& one_service_cluster : m_services_to_create)
{
for (std::uint32_t i = 0; i < one_service_cluster.second; i++)
{
auto cur_select_server = ready_service_servers[temp_counter % ready_service_servers.size()];
temp_services_on_server[cur_select_server].push_back(one_service_cluster.first);
request_create_service_on_server(cur_select_server, one_service_cluster.first, {});
temp_counter++;
}
}
}
这样做的目的是为了等待所有的service_server都启动好之后再去平摊各种局外服务的创建,否则就可能出现所有服务都创建在同一个service_server的情况。
服务器关闭流程
服务器关闭指令需要运维人员手动去操作,通过mgr_server的http端口进行通知,在deploy/scripts/stop_servers.py中我们通过curl指令来执行关服通知:
with open(options.config_path, "r") as config_file:
config_detail = json.load(config_file)
mgr_server_ip = config_detail["mgr_server"]["ip"]
mgr_server_http_port = config_detail["mgr_server"]["http_port"]
curl_cmd = "curl -H \"Accept: application/json\" -H \"Content-type: application/json\" -X GET -d '{\"cmd\": \"stop\", \"param\": {}}' "
curl_cmd += "http://" + mgr_server_ip + ":" + str(mgr_server_http_port) + "/gm_cmd/GET/"
os.system(curl_cmd)
mgr_server接收到这个运维指令之后,会调用notify_stop来开始执行关服逻辑:
void json_stub::on_gm_cmd(const std::string& cmd, const json& param, msg_seq_t req_seq)
{
if(cmd == "stop")
{
notify_stop();
json reply_json;
reply_json["params"] = param;
auto reply_str = reply_json.dump() + "\r\n";
http::http_request_mgr::finish_request(req_seq, reply_str);
}
}
void basic_stub::notify_stop()
{
if(m_stopped)
{
m_logger->error("notify stop while already stopping");
return;
}
m_logger->warn("notify_stop");
m_stopped = true;
m_stop_report_ts = m_stop_begin_ts = std::chrono::steady_clock::now();
stop_begin();
}
void basic_stub::stop_begin()
{
m_logger->info("stop_begin");
}
在notify_stop中我们会设置m_stopped这个bool变量为true,代表当前进程正在关服流程中,至于具体的关服细节则依赖于各个进程角色的重载实现。
在进程主循环中,如果发现自己是关服过程中,则开始定期检查关服过程是否已经执行完毕,如果执行完毕则开始关闭所有的监听端口以及计时器,等待io_service的自然退出:
// void basic_stub::main_loop()
if(m_stopped)
{
auto stop_check_ts = std::chrono::steady_clock::now();
bool with_stop_log = false;
std::chrono::duration<double> stop_elapsed_time = stop_check_ts - m_stop_report_ts;
if(stop_elapsed_time.count() > m_stop_alert_duration)
{
m_logger->warn("stop check fail after {} seconds", stop_elapsed_time.count());
with_stop_log = true;
m_stop_report_ts = stop_check_ts;
}
if(check_stop_finish(with_stop_log))
{
on_stop_finish();
return;
}
}
void basic_stub::on_stop_finish()
{
m_router->disconnect_all();
m_asio_wrapper->m_acceptor.close();
m_asio_wrapper->m_timer.cancel();
m_http_server->stop();
m_logger->warn("on_stop_finish");
}
这里的check_stop_finish主要检查其他进程发起的连接以及http连接是否为0,都是0的时候再等待所有的线程池退出:
bool basic_stub::check_stop_finish(bool with_log)
{
std::uint32_t has_upstream_server = 0;
if(!m_upstream_server.name.empty())
{
has_upstream_server = 1;
}
if(m_router->get_active_connection_count() > has_upstream_server)
{
if(with_log)
{
m_logger->debug("router->get_active_connection_count() fail remain {}", m_router->get_active_connection_count());
}
return false;
}
if(m_http_server)
{
if(m_http_server->get_session_count() != 0)
{
if(with_log)
{
m_logger->debug("_http_server->get_session_count() fail");
}
return false;
}
}
if(m_finished_async_thread_counter != m_async_thread_num)
{
if(with_log)
{
m_logger->debug("m_finished_async_thread_counter fail");
}
return false;
}
for(auto& one_thread_ptr : m_async_threads)
{
one_thread_ptr->join();
}
m_async_threads.clear();
return true;
}
从前面小章节可知各种角色的服务器的就绪状态其实是有依赖的,这种逻辑依赖不仅影响服务器的启动,还影响服务器的关闭,强制关闭所有的连接可能会导致服务器状态没有正确的保存到数据库。所以mgr_server关服的时候是分阶段来通知各个进程角色去退出的的,阶段变量存储在m_stop_stage中,首先通知的是gate_server:
void mgr_server::stop_begin()
{
json_stub::stop_begin();
json sync_msg;
sync_msg["cmd"] = "notify_stop";
sync_msg["param"] = json::object_t();
auto cur_stop_msg = std::make_shared<std::string>(sync_msg.dump());
// 先通知所有的gate 退出
broadcast_to_gate_svrs(cur_stop_msg);
m_stop_stage = stop_stage::wait_gate_server_destroy;
return;
}
gate_server在接收到这个的通知请求之后,会通知所有的客户端服务器关闭,让客户端主动的断开所有的连接,同时拒绝掉后续的所有新客户端发出来的连接:
void gate_server::stop_begin()
{
json_stub::stop_begin();
std::vector<std::string> temp_entities;
for(const auto& one_pair: m_eid_to_conn_id)
{
temp_entities.push_back(one_pair.first);
}
json::object_t notify_msg;
notify_msg["msg"] = "server_close";
for(const auto& one_dest: temp_entities)
{
request_client_close_impl(one_dest, notify_msg);
}
add_timer_with_gap(std::chrono::seconds(1), [this]()
{
m_router->disconnect_all();
});
}
这里会同时开启一个延迟计时器去强制关闭所有的连接,包括客户端、space_server、mgr_server。mgr_server会收到gate_server的断线通知,当所有的gate_server都退出之后,开始进入第二阶段,通知所有玩家都下线:
void mgr_server::on_gate_all_destroyed()
{
m_logger->warn("on_gate_all_destroyed");
if(m_stop_stage != stop_stage::wait_gate_server_destroy)
{
return;
}
m_stop_stage = stop_stage::wait_account_logout;
json sync_msg;
sync_msg["cmd"] = "request_logout_all_accounts";
sync_msg["param"] = json::object_t();
auto cur_stop_msg = std::make_shared<std::string>(sync_msg.dump());
// 通知 所有的space server account准备logout
broadcast_to_space_svrs(cur_stop_msg);
// 等待login_service汇报所有玩家都已经下线
}
当这个request_logout_all_accounts消息发送到space_server之后,所有的在线账号都会通知其对应的在线玩家去执行存库后下线操作,如果没有对应在线玩家则直接执行账号下线操作:
void space_server::on_request_logout_all_accounts(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const json& msg)
{
// 通知所有的account 准备logout
auto cur_accounts = entity::entity_manager::instance().get_entities_by_exact_type<entity::account_entity>();
utility::rpc_msg logout_msg;
logout_msg.cmd = "request_logout_account";
for(auto one_account: cur_accounts)
{
one_account->on_rpc_msg(logout_msg);
}
utility::rpc_msg request_msg;
request_msg.cmd = "request_check_accounts_empty";
call_service("login_service", request_msg);
}
void account_entity::request_logout_account(const utility::rpc_msg& msg)
{
if(m_statem.active_state_name() == "logout_account")
{
return;
}
if(m_player_id.empty())
{
utility::rpc_msg request_msg;
request_msg.cmd = "request_logout_account";
call_service("login_service", request_msg);
dispatcher().dispatch(enums::event_category::account, "logout");
m_statem.change_to("logout_account");
}
else
{
utility::rpc_msg request_msg;
request_msg.cmd = "request_logout_player";
call_player(request_msg);
m_statem.change_to("logout_account");
}
}
当所有在线账号都下线之后,login_service会通知mgr_server下线阶段执行完成:
void service_server::report_accounts_all_logout()
{
m_logger->info("report_accounts_all_logout");
json report_info;
report_info["cmd"] = "report_accounts_all_logout";
report_info["param"] = json::object_t();
m_router->push_msg(m_local_name_ptr, std::make_shared<std::string>(m_upstream_server.name), std::make_shared<std::string>(report_info.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
}
在此之后mgr_server通知所有的service执行存库后退出操作,:
void mgr_server::on_accounts_all_logout(std::shared_ptr<network::net_connection> con, std::shared_ptr<const std::string> from, const json& msg)
{
m_logger->warn("on_accounts_all_logout");
if(m_stop_stage != stop_stage::wait_account_logout)
{
return;
}
m_stop_stage = stop_stage::wait_service_destroy;
// login_service汇报所有账号都已经下线
json sync_msg;
sync_msg["cmd"] = "notify_stop";
sync_msg["param"] = json::object_t();
auto cur_stop_msg = std::make_shared<std::string>(sync_msg.dump());
// 通知所有的service准备退出
broadcast_to_service_svrs(cur_stop_msg);
broadcast_to_space_svrs(cur_stop_msg);
}
同时由于所有的账号都下线了,所有的space_server也不再被需要,开始执行退出流程:
void space_server::stop_begin()
{
manager_base::stop_managers();
json_stub::stop_begin();
}
service_server接收到stop通知后,会遍历当前进程上的所有service,执行存库后退出操作:
void service_server::stop_begin()
{
m_logger->warn("service server {} stop_begin ", *m_local_name_ptr);
auto cur_services = m_service_mgr->get_all_servicies();
for(auto one_pair: cur_services)
{
one_pair.second->notify_stop();
}
manager_base::stop_managers();
}
当一个service完成了自己的退出逻辑之后,会通知到mgr_server:
void service_server::destroy_service(service::base_service* cur_service)
{
json report_destroy_info, temp_param;
report_destroy_info["cmd"] = "report_service_destroyed";
temp_param["service_type"] = cur_service->m_base_desc.m_type_name;
temp_param["service_id"] = cur_service->global_id();
report_destroy_info["param"] = temp_param;
m_router->push_msg( m_local_name_ptr, std::make_shared<std::string>(m_upstream_server.name), std::make_shared<std::string>(report_destroy_info.dump()), enums::packet_cmd_helper::encode(enums::packet_cmd::server_control, 0));
m_services_by_id.erase(cur_service->global_id());
m_services_to_destroy.push_back(cur_service);
}
当mgr_server记录的所有service都退出后,开始通知space_server与service_server执行连接清理并退出:
void mgr_server::on_service_all_destroyed()
{
m_logger->warn("on_service_all_destroyed");
if(m_stop_stage != stop_stage::wait_service_destroy)
{
return;
}
m_stop_stage = stop_stage::wait_space_server_destroy;
// service 都退出之后 通知 space server 与 service server主动退出
json sync_msg;
sync_msg["cmd"] = "notify_clear_connection";
sync_msg["param"] = json::object_t();
auto cur_stop_msg = std::make_shared<std::string>(sync_msg.dump());
broadcast_to_space_svrs(cur_stop_msg);
broadcast_to_service_svrs(cur_stop_msg);
// 等待 所有的 space 服务器都退出
}
当所有的space_server都下线之后,最后通知所有的资源服务器执行下线操作:
void mgr_server::on_space_server_all_destroyed()
{
m_logger->warn("on_space_server_all_destroyed");
if(m_stop_stage != stop_stage::wait_space_server_destroy)
{
return;
}
m_stop_stage = stop_stage::wait_connection_clear;
json sync_msg;
sync_msg["cmd"] = "notify_stop";
sync_msg["param"] = json::object_t();
auto cur_stop_msg = std::make_shared<std::string>(sync_msg.dump());
// 通知所有的资源服务器准备退出
broadcast_to_map_svrs(cur_stop_msg);
broadcast_to_redis_svrs(cur_stop_msg);
broadcast_to_db_svrs(cur_stop_msg);
}
由于redis与db才是最后负责数据落地的角色,所以这两种进程的退出条件优先判断是否所有的读写任务都已经完成:
bool db_server::check_stop_finish(bool with_log)
{
if (!mongo_task_channels.tasks_all_finished())
{
if(with_log)
{
m_logger->debug("mongo_task_channels check fail");
}
return false;
}
else
{
for (auto& one_worker : workers)
{
one_worker->notify_stop();
}
return json_stub::check_stop_finish(with_log);
}
}
bool redis_server::check_stop_finish(bool with_log)
{
if (!redis_task_channels.tasks_all_finished())
{
if (with_log)
{
m_logger->debug("redis_task_channels check fail");
}
return false;
}
else
{
for (auto &one_worker : workers)
{
one_worker->notify_stop();
}
return json_stub::check_stop_finish(with_log);
}
}
当所有的连接都断开之后,mgr_server的check_stop_finish就会返回true,并最终导致mgr_server进程退出。