Mosaic Game 里的 RealGhost 管理
Real-Ghost 管理
在每个actor_entity身上都会有两个Map来记录当前正在创建和已经创建的ghost_entity集合:
class Meta(rpc) actor_ghost_component final: public actor_component::sub_class<actor_ghost_component>
{
private:
// key cell_space_id value [cell_game_id, create_ts]
std::map<std::string, std::pair<std::string,std::uint64_t>> m_ghost_creating;
// key cell_space_id value [cell_game_id, create_ts]
std::map<std::string, std::pair<std::string,std::uint64_t>> m_ghost_created;
std::vector<std::string> m_anchors_for_ghost; // 所有正在创建和已经创建的ghost的anchor
};
执行这两个集合管理的入口是space_cell_component::check_ghost_and_real, 这个函数会通过计时器的方式来定期执行。 check_ghost_and_real的函数体负责收集周围的CellSpace和当前场景内的real_entity,然后分别调用下面的两个函数:
auto cur_ghost_create_count = check_ghost_create_destroy(cur_cell, temp_real_entities, nearby_cells);
auto cur_real_transfer_count = check_transfer_real_entities(cur_cell, temp_real_entities, nearby_cells);
check_ghost_create_destroy函数负责检查一个ghost_entity的创建与销毁:
int space_cell_component::check_ghost_create_destroy(const distributed_space::space_cells::space_node* cur_cell,const std::vector<entity::actor_entity*>& real_entities, const std::vector<const distributed_space::space_cells::space_node*>& nearby_cells)
{
std::vector<std::pair<const distributed_space::space_cells::space_node*, distributed_space::cell_bound>> nearby_cells_with_ghost_bounds; // cell的boundary扩张一个aoi_radius
nearby_cells_with_ghost_bounds.reserve(nearby_cells.size());
auto aoi_radius = m_space_cells->ghost_radius();
for(auto one_cell: nearby_cells)
{
auto pre_bound = one_cell->boundary();
pre_bound.min.z -= aoi_radius;
pre_bound.max.z += aoi_radius;
pre_bound.min.x -= aoi_radius;
pre_bound.max.x += aoi_radius;
nearby_cells_with_ghost_bounds.emplace_back(one_cell, pre_bound);
}
int cur_ghost_creat_count = 0;
auto max_ghost_keep_ts = utility::timer_manager::now_ts() - m_ghost_keep_duration_seconds * 1000;
for(auto cur_entity: real_entities)
{
auto cur_ent_pos = cur_entity->pos();
for(const auto& [one_cell, one_cell_ghost_boundary] : nearby_cells_with_ghost_bounds)
{
// 查询周围的cell 如果在这些cell的可创建ghost区域则创建ghost
if(one_cell_ghost_boundary.cover(cur_ent_pos.x, cur_ent_pos.z))
{
if(cur_ghost_creat_count < m_ghost_create_limit && cur_entity->try_create_ghost(one_cell->game_id(), one_cell->space_id()))
{
cur_ghost_creat_count++;
}
}
else
{
// 否则从这个区域里删除ghost 这里会避免销毁刚创建不久的ghost
cur_entity->try_destroy_ghost(one_cell->space_id(), max_ghost_keep_ts);
}
}
}
return cur_ghost_creat_count;
}
这里的逻辑其实很简单,对每个周围的CellSpace,根据其boundary扩张一个aoi_radius,形成一个ghost_rect。然后对每个real_entity,对每个ghost_rect执行点在矩形内的测试:
- 如果这个
Entity不在这个CellSpace的ghost_rect内,则需要从这个CellSpace中删除已经创建好的ghost_entity,这里会带上一个时间戳max_ghost_keep_ts,如果这个ghost_entity的创建时间早于这个时间戳才真正执行删除,这样保证一个ghost_entity的存活时间起码为m_ghost_keep_duration_seconds - 如果这个
Entity在这个CellSpace的ghost_rect内,则需要在这个CellSpace里创建一个ghost_entity,不过这里还有一个额外限制条件,单次最多创建m_ghost_create_limit个ghost_entity,这样避免边界调整时引发短时间内的ghost_entity大量创建
当try_create_ghost真正被触发时,请求会被转发到actor_ghost_component上:
bool actor_entity::try_create_ghost(const std::string& cell_game_id, const std::string& cell_space_id)
{
if(is_ghost())
{
return false;
}
if(!get_space())
{
return false;
}
if(!get_space()->is_cell_space())
{
return false;
}
auto cur_ghost_comp = get_component<actor_ghost_component>();
if(!cur_ghost_comp)
{
return false;
}
return cur_ghost_comp->try_create_ghost(cell_game_id, cell_space_id);
}
这个函数负责将这个real_entity的ghost_entity相关属性进行打包,并向远端发起一个创建ghost_entity的请求:
bool actor_ghost_component::try_create_ghost(const std::string& cell_game_id, const std::string& cell_space_id)
{
check_ghost_creating();
if(m_ghost_creating.find(cell_space_id) != m_ghost_creating.end())
{
return false;
}
if(m_ghost_created.find(cell_space_id) != m_ghost_created.end())
{
return false;
}
if(cell_space_id == m_owner->get_space()->entity_id())
{
return false;
}
m_ghost_creating[cell_space_id] = std::make_pair(cell_game_id, utility::timer_manager::now_ts());
utility::rpc_msg ghost_create_msg;
json::object_t ghost_data;
json::object_t enter_info;
enter_info["pos"] = m_owner->pos().data;
enter_info["yaw"] = m_owner->yaw();
m_owner->prepare_ghost_data(ghost_data);
ghost_create_msg.cmd = "request_create_ghost";
ghost_create_msg.set_args(*m_owner->get_call_proxy(), cell_space_id, m_owner->type_name(), m_owner->entity_id(), m_owner->online_entity_id(), enter_info, ghost_data);
m_owner->get_server()->call_server(utility::rpc_anchor::concat(cell_game_id, "space_manager"), ghost_create_msg);
m_anchors_for_ghost.push_back(utility::rpc_anchor::concat(cell_game_id, m_owner->entity_id()));
return true;
}
这里会将创建的时间戳记录在m_ghost_creating中, 作为一个超时检查时间戳来使用。
当目标CellSpace的space_manager在接收到创建ghost_entity的请求之后,就会在指定的SpaceEntity中创建这个ghost_entity,成功创建之后回以reply_create_ghost来通知创建成功:
void space_manager::request_create_ghost(const utility::rpc_msg& msg, const std::string& reply_anchor, const std::string& space_id, const std::string& type_id, const std::string& real_eid, std::uint64_t online_entity_id, const json::object_t &enter_info, const json& init_info)
{
std::string error;
do
{
auto cur_space_iter = m_spaces.find(space_id);
if (cur_space_iter == m_spaces.end())
{
error = "cant find space";
break;
}
if(!cur_space_iter->second->can_create_ghost())
{
error = "space cant create ghost";
break;
}
auto cur_entity = m_server->create_entity(type_id, real_eid, online_entity_id, init_info, error);
if (!cur_entity)
{
m_logger->error("fail to create entity {} type {} with error {}", real_eid, type_id, error);
break;
}
cur_space_iter->second->enter_space(dynamic_cast<entity::actor_entity*>(cur_entity), enter_info);
} while (0);
utility::rpc_msg reply_create_ghost_msg;
reply_create_ghost_msg.cmd = "reply_create_ghost";
std::vector<json> temp_args;
temp_args.reserve(2);
temp_args.push_back(space_id);
temp_args.push_back(error);
reply_create_ghost_msg.args = std::move(temp_args);
m_server->call_server(reply_anchor, reply_create_ghost_msg);
}
如果创建成功,将会在m_ghost_created中记录创建成功的时间戳:
void actor_ghost_component::reply_create_ghost(const utility::rpc_msg& msg, const std::string& cell_space_id, const std::string& error)
{
m_owner->logger()->info("player {} reply_create_ghost cell_id {} error {}", m_owner->entity_id(), cell_space_id, error);
auto cur_cell_iter = m_ghost_creating.find(cell_space_id);
if(cur_cell_iter == m_ghost_creating.end())
{
return;
}
auto cur_ts = utility::timer_manager::now_ts();
// auto ghost_create_cost = cur_ts - cur_cell_iter->second;
if(error.empty())
{
m_ghost_created[cell_space_id] = std::make_pair(cur_cell_iter->second.first, cur_ts);
}
m_ghost_creating.erase(cur_cell_iter);
}
任何RPC都会有超时的情况,超时检查就在check_ghost_creating函数中,这里会遍历所有的正在创建的ghost_entity,通知其删除当前real_entity对应的ghost_entity,并从m_ghost_creating中移除相应条目,以配合后续的重试:
void actor_ghost_component::check_ghost_creating()
{
auto cur_ts = utility::timer_manager::now_ts();
auto expire_ts = cur_ts - m_ghost_create_timeout_ms * 1000; // 10s 过期
utility::rpc_msg ghost_del_msg;
ghost_del_msg.cmd = "request_delete_ghost";
ghost_del_msg.args.push_back(m_owner->entity_id());
std::vector<std::string> temp_expired_cells;
utility::map_erase_if(m_ghost_creating, [&temp_expired_cells, expire_ts, this](const auto& one_pair)
{
if(one_pair.second.second < expire_ts)
{
temp_expired_cells.push_back(utility::rpc_anchor::concat(one_pair.second.first, one_pair.first));
remove_ghost_anchor(one_pair.first);
return true;
}
else
{
return false;
}
});
m_owner->get_server()->call_server_multi(m_owner, ghost_del_msg, temp_expired_cells);
}
负责处理real_entity迁移的函数check_transfer_real_entities就复杂一点了,基础的迁移规则是real_entity在当前CellSpace的ghost_rect的外部,附加条件是单次最大迁移数量要小于指定值,以及考虑一个CellSpace在被合并时会强制将当前CellSpace内的real_entity执行迁出 :
int space_cell_component::check_transfer_real_entities(const distributed_space::space_cells::space_node* cur_cell, const std::vector<entity::actor_entity*>& real_entities, const std::vector<const distributed_space::space_cells::space_node*>& nearby_cells)
{
auto aoi_radius = m_space_cells->ghost_radius();
auto ghost_max_region = cur_cell->boundary();
ghost_max_region.min.z -= aoi_radius;
ghost_max_region.min.x -= aoi_radius;
ghost_max_region.max.x += aoi_radius;
ghost_max_region.max.z += aoi_radius;
int cur_real_migrate_count = 0;
for(auto one_ent: real_entities)
{
if(cur_real_migrate_count >= m_real_migrate_limit)
{
return cur_real_migrate_count;
}
if(one_ent->is_ghost())
{
continue;
}
auto cur_ent_pos = one_ent->pos();
if(!cur_cell->is_merging() && ghost_max_region.cover(cur_ent_pos.x, cur_ent_pos.z))
{
continue;
}
m_owner->logger()->info("space {} cant cover entity {} try get migrate dest", m_owner->entity_id(), one_ent->entity_id());
const distributed_space::space_cells::space_node* dest_cell = nullptr;
if(cur_cell->is_merging())
{
// 合并状态下 将当前点扩张为半径为aoi_radius的矩形 然后选取一个相交的周围cell来作为迁移目标 当然 迁移目标不能是merging状态下的
spiritsaway::distributed_space::cell_bound cur_migrate_bound;
cur_migrate_bound.min.x = cur_ent_pos.x;
cur_migrate_bound.min.z = cur_ent_pos.z;
cur_migrate_bound.max = cur_migrate_bound.min;
cur_migrate_bound.min.x -= aoi_radius;
cur_migrate_bound.min.z -= aoi_radius;
cur_migrate_bound.max.x += aoi_radius;
cur_migrate_bound.max.z += aoi_radius;
for(auto one_cell: nearby_cells)
{
if(!one_cell->is_merging() && one_cell->boundary().intersect(cur_migrate_bound))
{
dest_cell = one_cell;
break;
}
}
}
else
{
// 非合并状态下 选择当前点所在的cell作为迁移目标 当然 迁移目标不能是merging状态下的
for(auto one_cell : nearby_cells)
{
if(!one_cell->is_merging() &&one_cell->boundary().cover(cur_ent_pos.x, cur_ent_pos.z))
{
dest_cell = one_cell;
break;
}
}
}
if(!dest_cell)
{
dest_cell = m_space_cells->query_leaf_for_point(cur_ent_pos.x, cur_ent_pos.z);
}
if(!dest_cell)
{
continue;
}
if(one_ent->try_transfer_real(dest_cell->space_id()))
{
cur_real_migrate_count++;
}
}
return cur_real_migrate_count;
}
计算迁移目标dest_cell的规则有些复杂:
- 如果当前
CellSpace正在被合并,那么随机选择周围任意一个与当前Entity创建ghost_entity的矩形区域相交的CellSpace作为目标 - 如果当前
CellSpace没有在被合并,则选择能覆盖当前actor_entity位置的CellSpace作为目标 - 如果上面两个分支执行之后的仍然没有一个有效结果,则直接从整个
SpaceCells中查询这个点对应的CellSpace,作为迁移目标。
同时这个try_transfer_real也是将逻辑转发到actor_ghost_component上:
bool actor_entity::try_transfer_real(const std::string& cell_space_id)
{
if(is_ghost())
{
return false;
}
if(!get_space())
{
return false;
}
if(!get_space()->is_cell_space())
{
return false;
}
auto cur_ghost_comp = get_component<actor_ghost_component>();
if(!cur_ghost_comp)
{
return false;
}
return cur_ghost_comp->try_transfer_real(cell_space_id);
}
这里的迁移还有一个前置条件,即目标CellSpace已经有了当前real_entity的一个ghost_entity,这个限制其实与BigWorld中的限制差不多。不过BigWorld中要求要么已经创建好了ghost_entity,要么这个创建ghost_entity请求已经发出,而MosaicGame中要求目标进程中的已经创建好了ghost_entity,必须在m_ghost_created存在这个元素:
bool actor_ghost_component::try_transfer_real(const std::string& cell_space_id)
{
if(!can_transfer_real(cell_space_id))
{
return false;
}
return true;
}
bool actor_ghost_component::can_transfer_real(const std::string& cell_space_id)
{
if(m_migrating_real)
{
return false;
}
auto cur_cell_iter = m_ghost_created.find(cell_space_id);
if(cur_cell_iter == m_ghost_created.end())
{
return false;
}
std::string cell_game_id = cur_cell_iter->second.first;
utility::rpc_msg request_msg;
json::object_t enter_info;
enter_info["pos"] = m_owner->pos();
enter_info["yaw"] = m_owner->yaw();
enter_info["pre_cell"] = m_owner->entity_id();
request_msg.cmd = "request_migrate_begin";
request_msg.args.push_back(cell_game_id);
request_msg.args.push_back(cell_space_id);
request_msg.args.push_back(m_owner->get_space()->union_space_id());
request_msg.args.push_back(enter_info);
m_owner->call_relay_anchor(request_msg);
m_migrating_real = true;
return true;
}
这里的request_migrate_begin并不会直接将当前real_entity的数据全打包发过去,而是先通知当前actor_entity对应的RelayAnchor。同时RealAnchor在记录好迁移目标m_dest_game之后,再通知这个real_entity开始执行真正的迁移:
void relay_entity::request_migrate_begin(const utility::rpc_msg& msg, const std::string& game_id, const std::string& space_id, const std::string& union_space_id, const json::object_t& enter_info)
{
if(!m_dest_actor)
{
m_logger->error("request_migrate_begin while dest_anchor empty dest_game {} dest_eid {}", m_dest_game, m_dest_eid);
return;
}
utility::rpc_msg reply_msg;
reply_msg.cmd = "reply_migrate_begin";
reply_msg.args.push_back(game_id);
reply_msg.args.push_back(space_id);
reply_msg.args.push_back(union_space_id);
reply_msg.args.push_back(enter_info);
call_server(m_dest_actor, reply_msg);
m_dest_actor.reset();
m_dest_game = game_id;
}
在space_manager中统一处理所有的迁移逻辑,不管是分布式无缝大世界的迁移还是非无缝大世界的迁移,内部会根据要进入的场景union_space_id是否等于当前场景的union_space_id来标记enter_new_space,true代表是普通的actor_entity迁移,false代表Real-Ghost迁移:
void player_space_component::reply_migrate_begin(const utility::rpc_msg& msg, const std::string& game_id, const std::string& space_id, const std::string& union_space_id, const json::object_t& enter_info)
{
auto new_enter_info = enter_info;
server::space_manager::instance().migrate_out(m_owner, game_id, space_id, union_space_id, new_enter_info);
}
void space_manager::migrate_out(entity::actor_entity *cur_entity, const std::string &game_id, const std::string &space_id, const std::string &union_space_id, json::object_t &enter_info)
{
json::object_t migrate_info;
bool enter_new_space = true;
auto pre_space = cur_entity->get_space();
if(pre_space && pre_space->union_space_id() == union_space_id)
{
enter_new_space = false;
enter_info["pos"] = cur_entity->pos();
enter_info["yaw"] = cur_entity->yaw();
}
if(pre_space && enter_new_space)
{
pre_space->leave_space(cur_entity);
}
enter_info["enter_new_space"] = enter_new_space;
cur_entity->migrate_out(migrate_info, enter_new_space);
utility::rpc_msg cur_msg;
cur_msg.cmd = "migrate_in";
m_logger->info("migrate out entity {} to game {} space {} union_space_id {} with info {} enter_new_space {} ", cur_entity->entity_id(), game_id, space_id, union_space_id, json(migrate_info).dump(), enter_new_space);
cur_msg.args.push_back(cur_entity->entity_id());
cur_msg.args.push_back(cur_entity->online_entity_id());
cur_msg.args.push_back(cur_entity->m_base_desc.m_type_name);
cur_msg.args.push_back(space_id);
cur_msg.args.push_back(union_space_id);
cur_msg.args.push_back(enter_info);
cur_msg.args.push_back(migrate_info);
if(enter_new_space)
{
m_server->destroy_entity(cur_entity);
}
m_server->call_server(utility::rpc_anchor::concat(game_id, "space_manager"), cur_msg);
}
actor_entity::migrate_out这个接口也会对是否是大世界迁移做区分,如果是大世界迁移的话,迁移完成之后会将当前real_entity转换为ghost_entity,同时通知所有的组件这个转换操作on_become_ghost:
void actor_entity::migrate_out(json::object_t& migrate_info, bool enter_new_space)
{
encode_migrate_out_data(migrate_info, enter_new_space);
auto migrate_out_lambda = [=](actor_component* cur_comp)
{
cur_comp->migrate_out(enter_new_space);
};
call_component_interface(migrate_out_lambda);
if(!enter_new_space)
{
m_is_ghost = true;
auto become_ghost_lambda = [=](actor_component* cur_comp)
{
cur_comp->on_become_ghost();
};
call_component_interface(become_ghost_lambda);
}
}
当迁移目标ghost_entity对应的space_manager接收到这个migrate_in请求之后,也会解析出这个enter_new_space字段,来决定是创建一个新的actor_entity还是选择一个现有的ghost_entity来走后续的actor_entity::migate_in流程:
void space_manager::do_migrate_in(const std::string &entity_id, std::uint64_t online_entity_id, const std::string &type_id, const std::string &space_id, const std::string &union_space_id, const json::object_t &enter_info, const json::object_t &init_info)
{
std::string error;
auto cur_space_iter = m_spaces.find(space_id);
if (cur_space_iter == m_spaces.end())
{
if (union_space_id.empty())
{
m_logger->error("fail to create entity {} type {} with invalid space {}", entity_id, type_id, space_id);
return;
}
else
{
retry_migrate_info saved_migrate_info;
saved_migrate_info.entity_id = entity_id;
saved_migrate_info.online_entity_id = online_entity_id;
saved_migrate_info.type_id = type_id;
saved_migrate_info.union_space_id = union_space_id;
saved_migrate_info.init_info = init_info;
saved_migrate_info.retry_ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
m_retry_migrate_infos[entity_id] = saved_migrate_info;
utility::rpc_msg retry_msg;
retry_msg.cmd = "request_retry_migrate";
retry_msg.from = m_server->gen_local_anchor(name());
retry_msg.set_args(entity_id, union_space_id, enter_info);
m_server->call_server("space_service", retry_msg);
return;
}
}
bool enter_new_space = true;
try
{
enter_info.at("enter_new_space").get_to(enter_new_space);
}
catch(const std::exception& e)
{
m_logger->error("fail to migrate entity {} type {} read enter_new_space with error {}", entity_id, type_id, e.what());
return;
}
entity::actor_entity* cur_entity;
if(enter_new_space)
{
cur_entity = dynamic_cast<entity::actor_entity*>(m_server->create_entity(type_id, entity_id, online_entity_id, init_info, error));
if (!cur_entity)
{
m_logger->error("fail to create entity {} type {} with error {}", entity_id, type_id, error);
return;
}
}
else
{
cur_entity = entity::entity_manager::instance().get_entity<entity::actor_entity>(entity_id);
if (!cur_entity)
{
m_logger->error("fail to find entity {} type {} ", entity_id, type_id);
return;
}
if(!cur_entity->is_ghost())
{
m_logger->error("fail to find entity {} type {} with ghost", entity_id, type_id);
return;
}
}
cur_entity->migrate_in(init_info, enter_new_space);
if(enter_new_space)
{
cur_space_iter->second->enter_space(cur_entity, enter_info);
}
}
void space_manager::migrate_in(const utility::rpc_msg &data, const std::string &entity_id, std::uint64_t online_entity_id, const std::string &type_id, const std::string &space_id, const std::string &union_space_id, const json::object_t &enter_info, const json::object_t &init_info)
{
do_migrate_in(entity_id, online_entity_id, type_id, space_id, union_space_id, enter_info, init_info);
}
在actor_entity::migrate_in中,如果发现自己是ghost_entity转换为real_entity,则会通知所有的组件这个信息:
utility::rpc_msg finish_msg;
finish_msg.from = utility::persist_entity_id();
finish_msg.cmd = "notify_migrate_finish";
finish_msg.set_args(get_local_server_name());
call_relay_anchor(finish_msg);s
m_migrate_in_finish_dispatcher.dispatch();
if(!enter_new_space)
{
// ghost to real之后重新设置aoi
// auto cur_space = get_space();
migrate_data.at("space_self_event_seq").get_to(m_space_self_event_seq);
migrate_data.at("space_actor_event_seq").get_to(m_space_actor_event_seq);
migrate_fix_space_events();
auto cur_lambda = [](actor_component* cur_comp)
{
cur_comp->on_become_real();
};
call_component_interface(cur_lambda);
}
同时这里还会通知对应的relay_anchor,当前actor_entity已经迁移完成,之前缓存的消息可以重启处理了。这样就完成了real_entity迁移的全过程。
AOI管理
在之前的AOI章节中我们介绍了MosaicGame的AOI实现,以及在此之上的属性同步。简单来说就是每次属性修改之后,都会生成一个属性同步的广播,广播的消息开头两个字节就是这个发生属性改变的actor_entity在当前space_entity里的uint16_t aoi_index。在分布式大世界中,属性同步的对象不仅仅是当前进程同场景里的一些客户端,现在还包括了ghost_entity。同时一个real_entity不仅接受其他real_entity的属性同步,还需要接受当前AOI内的其他ghost_entity的属性同步。原来设计的属性同步方案已经无法满足这种分布式同步要求,为此我们在原有的属性同步上做一些额外的功能增强与修正,使得状态能够正确的同步到客户端。
在分布式大世界的情况下,原有的每个真实space_entity单独维护一个aoi_manager的设计会造成一些问题。最突出的问题就是,在不同的cell_space中,同一个actor_entity对应的real_entity与ghost_entity他们的aoi_index很有可能不一样,因为这个标识符是这个entity在进入每个space_entity时单独分配的。假设real_entity(A)在进入cell_space(M)中被分配到了aoi_index_1,此时在cell_space(M)中的另外一个player_entity(B)在其客户端以aoi_index_1创建了real_entity(A)的客户端对象。然后real_entity(A)从cell_space(M)迁移到了cell_space(N),在新场景的aoi_index_2是这个ghost_entity(A)进入cell_space(N)时被分配的。迁移之后的real_entity(A)所发生的任何广播属性同步数据都会以aoi_index_2作为标识符,而player_entity(B)的客户端中aoi_index_2可能不存在关联的actor_entity,更坏的情况是aoi_index_2关联的是其他actor_entity,这样就会导致客户端与服务器之间的属性同步错乱,并进一步导致客户端退出。
在了解原来的aoi_index分配设计在分布式大世界中的不足之后,我们来思考如何改进,目前有两种改进方案:
- 将两个字节的
aoi_index替换为八个字节的online_entity_id,因为real_entity和ghost_entity的online_entity_id总是保持一致的,但是这样会导致属性同步流量的显著升高 - 为同一个分布式大世界指定一个唯一的
aoi_manager实例,这个实例可以托管在map_server上,但是这样会导致原来所有的同步创建actor_entity都变成了异步创建actor_entity,所有的AOI查询也变成了异步调用,打破了原有的执行流;同时这样会把当前分布式大世界的同步actor_entity的上限限制在了65536个以内 - 每个
cell_space单独维护自身的aoi_manager,同时在每个player_entity迁移之后,将客户端的所有actor_entity的最新aoi_index都推送下去,重新建立一个aoi_index到actor_entity的映射
综合来看,第三种方案更优,因此MosaicGame中在player_entity迁移之前,actor_aoi_component::encode会将当前往客户端同步的actor_entity信息带上,存在entities_in_aoi_radius字段中:
json::object_t actor_aoi_component::encode(bool for_ghost)
{
if(for_ghost)
{
return json::object_t{};
}
// 先保存所有的aoi数据
json::object_t result;
auto cur_aoi_mgr = m_owner->get_space()->aoi_mgr();
std::map<std::string, std::map<std::string, std::pair<std::string, std::uint32_t>>> all_entities_in_aoi_radius;
for(const auto& one_aoi_radius: m_aoi_radius_names)
{
std::map<std::string, std::pair<std::string, std::uint32_t>> temp_aoi_eids;
const auto& cur_force_aois = m_force_aoi_eids[one_aoi_radius.second.value];
const auto& cur_aoi_guids = cur_aoi_mgr->interest_in_guids(one_aoi_radius.second);
for(auto one_guid: cur_aoi_guids)
{
actor_entity* other_entity = entity_manager::instance().get_entity_with_type<actor_entity>(utility::entity_slot::from_uint64(one_guid));
if(!other_entity)
{
continue;
}
bool is_force_aoi = cur_force_aois.find(other_entity->entity_id()) != cur_force_aois.end();
std::uint32_t temp_combined_aoi = other_entity->aoi_idx();
temp_combined_aoi <<= 16;
temp_combined_aoi += std::uint32_t(is_force_aoi);
temp_aoi_eids[other_entity->entity_id()] = std::make_pair(*other_entity->get_call_proxy(), temp_combined_aoi);
}
}
result["entities_in_aoi_radius"] = all_entities_in_aoi_radius;
return result;
}
当迁移到一个ghost_entity之后,在migrate_in函数中将这些数据解析出来,存在m_temp_all_entities_in_aoi_radius这个成员变量上:
void actor_aoi_component::migrate_in(const json::object_t& migrate_info, bool enter_new_space)
{
if(!enter_new_space)
{
return;
}
std::vector<std::string> temp_aoi_sync_actor_ids;
try
{
migrate_info.at("entities_in_aoi_radius").get_to(m_temp_all_entities_in_aoi_radius);
}
catch(const std::exception& e)
{
m_owner->logger()->error("migrate_in_restore_aoi fail to convert aoi data with error {}", e.what());
return;
}
m_owner->add_migrate_in_event(enums::migrate_event::migrate_fix_aoi_radius);
}
void player_aoi_component::event_listener(const utility::enum_type_value_pair& ev_cat, const std::string& detail)
{
if(ev_cat == utility::enum_type_value_pair(enums::migrate_event::migrate_fix_aoi_radius))
{
restore_aoi();
}
}
当迁移migrate_in开头的数据解析阶段结束之后,就会开始进行第二阶段,执行一些后处理逻辑,这里的restore_aoi就是其中的一个部分。这个restore_aoi负责调用actor_aoi_component::restore_aoi_radius函数来计算迁移之后的aoi状态改变:
void player_aoi_component::restore_aoi()
{
auto cur_actor_aoi_comp = m_owner->get_component<actor_aoi_component>();
aoi::aoi_radius_controller cur_aoi_ctrl;
cur_aoi_ctrl.any_flag = 0;
cur_aoi_ctrl.need_flag = (1ull <<std::uint8_t(enums::entity_flag::is_client_visible));
cur_aoi_ctrl.radius = 30;
cur_aoi_ctrl.min_height = cur_aoi_ctrl.max_height = 0;
cur_aoi_ctrl.forbid_flag = 0;
cur_aoi_ctrl.max_interest_in = 30;
std::vector<migrate_in_aoi_invalid_info> invalid_aois;
std::unordered_map<actor_entity*, std::uint16_t> remain_aois;
cur_actor_aoi_comp->restore_aoi_radius(cur_aoi_ctrl, [this](actor_entity* other, bool is_enter)
{
if(is_enter)
{
on_aoi_enter(other);
}
else
{
on_aoi_leave(other->entity_id(), other->aoi_idx());
}
}, static_type_name(), invalid_aois, remain_aois);
// 暂时省略后续代码
}
这里的restore_aoi_radius流程分为如下几步:
- 注册一个
aoi_radius,用来接收周围其他actor_entity进出当前player_entity的客户端同步半径事件, - 将迁移之前记录的
AOI集合中的actor_entity都先强制加入到当前actor_entity的关注集合中 - 注册
aoi_radius的回调,用来接受enter/leave事件 - 取消本来非强制关注的
actor_entity,这样如果这个actor_entity已经在同步半径外,则会触发leave事件 - 然后获取当前新的关注集合,与迁移之前记录的老关注集合做交集得到
remain_aois,同时做diff得到invalid_aois
void actor_aoi_component::restore_aoi_radius(const aoi::aoi_radius_controller& cur_aoi_ctrl, std::function<void(actor_entity*, bool)> radius_cb, const std::string& radius_name, std::vector<migrate_in_aoi_invalid_info>& invalid_aois, std::unordered_map<actor_entity*, std::uint16_t>& remain_aois)
{
auto temp_name_iter = m_aoi_radius_names.find(radius_name);
if(temp_name_iter != m_aoi_radius_names.end())
{
m_owner->logger()->error("restor_aoi_radius with duplicated name {}", radius_name);
return;
}
auto cur_space = m_owner->get_space();
auto cur_aoi_mgr = cur_space->aoi_mgr();
auto new_aoi_radius_idx = cur_aoi_mgr->add_radius_entity(aoi::aoi_pos_idx{m_aoi_pos_idx}, cur_aoi_ctrl);
if(!new_aoi_radius_idx.value)
{
m_owner->logger()->error("restor_aoi_radius with name {} fail to get radius idx", radius_name);
return;
}
m_owner->logger()->info("restor_aoi_radius with name {}", radius_name);
m_aoi_radius_names[radius_name] = new_aoi_radius_idx;
// 先恢复aoi 但是此时不要执行callback
const auto& pre_aoi_radius_info = m_temp_all_entities_in_aoi_radius[radius_name];
for(const auto& one_pair: pre_aoi_radius_info)
{
auto temp_other_actor = cur_space->get_entity(one_pair.first);
if(!temp_other_actor)
{
continue;
}
m_force_aoi_eids[new_aoi_radius_idx.value].insert(temp_other_actor->entity_id());
}
m_aoi_radius_callbacks[new_aoi_radius_idx] = radius_cb;
// 添加完之后 再触发已经不满足条件的leave
for(const auto& one_pair: pre_aoi_radius_info)
{
auto temp_other_actor = cur_space->get_entity(one_pair.first);
if(!temp_other_actor)
{
continue;
}
if(!(one_pair.second.second & 1))
{
m_force_aoi_eids[new_aoi_radius_idx.value].erase(temp_other_actor->entity_id());
}
}
// 再进行比对
auto temp_interested_guids = cur_aoi_mgr->interest_in_guids(new_aoi_radius_idx);
std::unordered_map<std::string, actor_entity*> temp_remain_actors;
for(auto one_guid: temp_interested_guids)
{
actor_entity* other_entity = entity_manager::instance().get_entity_with_type<actor_entity>(utility::entity_slot::from_uint64(one_guid));
if(!other_entity)
{
continue;
}
temp_remain_actors[other_entity->entity_id()] = other_entity;
}
for(const auto& one_pair:pre_aoi_radius_info)
{
auto temp_actor_iter = temp_remain_actors.find(one_pair.first);
if(temp_actor_iter == temp_remain_actors.end())
{
m_owner->logger()->info("notify remote aoi radois remove by radius name {} entity_id {} remote proxy {}", radius_name, one_pair.first, one_pair.second.first);
invalid_aois.push_back(migrate_in_aoi_invalid_info{one_pair.first, one_pair.second.first, std::uint16_t(one_pair.second.second>>16)});
}
else
{
remain_aois[temp_actor_iter->second] = std::uint16_t(one_pair.second.second>>16);
}
}
m_temp_all_entities_in_aoi_radius.erase(radius_name);
return;
}
invalid_aois里每个元素都代表已经在当前客户端中无效的一个actor_entity,同时remain_aois里每个元素都代表一个需要重新执行aoi_index映射的actor_entity。因此在player_aoi_component::restore_aoi得到这两个集合之后,对invalid_aois中的每个元素都执行一次on_aoi_leave回调,然后汇总remain_aois中的前后aoi_index映射,通过notify_aoi_reindex来通知客户端执行重新映射:
void player_aoi_component::restore_aoi()
{
// 省略之前已经介绍的代码
for(const auto& one_invalid_aoi: invalid_aois)
{
on_aoi_leave(one_invalid_aoi.entity_id, one_invalid_aoi.pre_aoi_idx);
}
std::map<std::uint16_t, std::uint16_t> remain_aoi_ids;
for(auto one_remain_actor: remain_aois)
{
remain_aoi_ids[one_remain_actor.second] = one_remain_actor.first->aoi_idx();
m_aoi_actors.insert(one_remain_actor.first);
one_remain_actor.first->get_component<actor_aoi_component>()->add_sync_player(m_player);
}
utility::rpc_msg reindex_aoi_msg;
reindex_aoi_msg.cmd = "notify_aoi_reindex";
reindex_aoi_msg.args.push_back(remain_aoi_ids);
m_player->call_client(reindex_aoi_msg);
auto cur_space = m_owner->get_space();
for(const auto& one_pair: cur_space->global_actors())
{
cur_actor_aoi_comp->add_force_aoi(player_aoi_component::static_type_name(), one_pair.second);
}
}
客户端接收到这个notify_aoi_reindex的通知之后,会遍历这个map中的每个元素,强制修改其aoi_index:
void player_space_component::notify_aoi_reindex(const utility::rpc_msg& msg, const std::map<std::uint16_t, std::uint16_t>& new_aoi_ids)
{
std::vector<client_actor*> cur_aoi_entities;
for(auto one_aoi_pair: new_aoi_ids)
{
auto pre_entity = m_aoi_entities[one_aoi_pair.first];
if(!pre_entity)
{
m_owner->logger()->error("fail to find entity for aoi_idx {}", one_aoi_pair.first);
continue;
}
pre_entity->set_aoi_idx(one_aoi_pair.second);
cur_aoi_entities.push_back(pre_entity);
}
// 上面的循环中不能去设置m_aoi_entities,因为可能会覆盖掉还没处理的entity
// 所以只能先缓存起来,等全部处理完再去更新m_aoi_entities
for(auto one_aoi_pair: new_aoi_ids)
{
m_aoi_entities[one_aoi_pair.first] = nullptr;
}
for(auto one_ent: cur_aoi_entities)
{
m_aoi_entities[one_ent->aoi_idx()] = one_ent;
}
}
注意在第一个循环中不能去修改m_aoi_entities,因为前后的aoi_index的集合可能重合,直接修改会数据错乱。比方说A的aoi_index要从2变成4,同时B的aoi_index要从4变成1。如果在开头的循环中直接设置m_aoi_index,处理完2->4之后就会造成m_aoi_index[2]=nullptr,m_aoi_index[4]=A,此时再去处理4->1,从m_aoi_index[4]中获得的已经是A了,而不是期望的B。因此这里额外增加了两个循环,第一个循环负责设置为nullptr,第二个循环负责设置真实的值。
属性同步保序
除了这个aoi_index重新映射之外,分布式大世界里的属性同步还有一个非常大的问题,迁移前后的属性消息接收顺序是不能保证的。举个例子来说,我们在actor_entity上有个int counter字段会参与客户端同步,因此在real_entity上每次修改这个属性字段都会生成一个消息广播到其所有的ghost_entity上。此时我们可以构造出一种ghost_entity上属性修改的接收顺序并不是real_entity的属性修改发送顺序的情况:
- 时刻
1,在cell_space(A)上的real_entity(O)修改了这个counter字段为1,此时会向cell_space(B)上的ghost_entity(P)和cell_space(C)上的ghost_entity(Q)发出这个同步消息S - 时刻
2,real_entity(O)准备迁移到cell_space(B),real_entity(O)成为了ghost_entity(O) - 时刻
3,cell_space(B)上的ghost_entity(P)成为了real_entity(P) - 时刻
4,real_entity(P)修改counter为2,向cell_space(A)上的ghost_entity(O)和cell_space(C)上的ghost_entity(Q)发出这个同步消息T - 时刻
5,cell_space(C)上的ghost_entity(Q)收到了cell_space(B)发出的counter=2的消息T,修改本地counter为2 - 时刻
6,cell_space(C)上的ghost_entity(Q)收到了cell_space(A)发出的counter=1的消息S,修改本地counter为1
出现这种情况是因为网络层只能保证单TCP连接的消息有序性,不能保证A->C的消息会比A->B-C先到,因为不同机器之间的网络延迟是完全不一样的。此外我们目前的网络层设计里,业务层发送一个消息只是将这个消息推送到一个网络连接的等待发送队列中,具体什么时候被发送出去其实也是不可控的,如果消息队列里消息太多,就可能有百毫秒以上的延迟,上不封顶,反之消息队列为空时最大发送时延只有5ms。
为了解决这个迁移导致的同步消息接收时不保证有序的问题,最简单的解决方式保证迁移后所有的ghost_entity读取到的属性状态都是一致的。具体策略如下:
- 在
real_entity发生迁移时,先将打包好的数据在当前进程的space_manager上进行保留,然后向所有的ghost_entity发出一个迁移开始的通知notify_real_migrate_begin。 - 当一个
ghost_entity接收到这个迁移开始通知notify_real_migrate_begin时,对这个消息的来源进行回复确认ack_real_migrate_begin - 当
space_manager上收集到了这个real_entity所有ghost_entity发出的ack_real_migrate_begin,才真正的将数据传递到目标进程去执行ghost_entity到real_entity的切换。
一旦一个ghost_entity回复了ack_real_migrate_begin,说明在这个real_entity迁移之前的所有属性同步数据都已经收到。因此当所有的ghost_entity都回复了ack_real_migrate_begin就代表所有的ghost_entity都拥有了同样的属性状态,达到了强制一致性同步的目标。
但是这个强制统一快照的方案会造成无缝迁移的延迟显著增大,之前的逻辑链路的中间只多了一个与relay_entity之间的通信,现在又多了与所有ghost_entity的通信,而且增大的延迟是所有ghost_entity里延迟的最大值。随着ghost_entity的数量增多,这个延迟最大值也逐渐变得不可控,虽然一般情况下一个actor_entity的ghost_entity数量不超过3个。
强制快照的延迟代价太大,此时我们从最基础的通信原理中吸取在不可靠信道中维持消息有序的智慧,即TCP的可靠消息机制。在TCP中,每个包都有一个递增的uint32_t来表示这个包的序列号。每个发出的包都会存储在本地已发送队列中,直到对端返回确认时再删除,长时间未确认则重发。如果对端接收到一个乱序的包,则会先在本地缓存起来,等待其他包一起组成连续包之后再处理。由于我们目前的网络底层已经使用的是TCP,所以可以不去考虑丢包,只考虑乱序。接下来将介绍MosaicGame中是如何实现这个属性同步序列号机制的。
首先需要定义一个带序列号的消息结构sync_msg,这里的version就是对应actor_entity的ghost_entity属性同步序列号:
struct sync_msg_header
{
std::uint32_t version;
std::uint8_t cmd;
std::uint8_t ts;// 时间戳字段 用来淘汰过期数据 std::uint8_t((utility::timer_manager::now_ts() / 1000) % 256)
std::uint16_t data_sz;
};
struct sync_msg: public sync_msg_header
{
std::shared_ptr<const std::string> data;
std::string to_bytes();
bool from_bytes(const char* buffer, std::uint32_t buffer_sz, std::uint16_t new_aoi_idx);
};
当一个real_entity需要向所有的ghost_entity发送属性同步消息时,会有一个专用的接口sync_other_by_ghost,这个接口会在属性同步的入口sync_to_others_with_aoi_data中自动被调用:
void actor_entity::sync_to_others_with_aoi_index(enums::entity_packet entity_packet_cmd, std::shared_ptr<const std::string> with_aoi_data)
{
auto cur_space = get_space();
if(!cur_space)
{
return;
}
if(!aoi_idx())
{
return;
}
auto cur_ghost_comp = get_component<actor_ghost_component>();
if(cur_space->is_cell_space())
{
cur_ghost_comp->sync_other_by_ghost(entity_packet_cmd, with_aoi_data);
}
sync_to_aoi_players(entity_packet_cmd, with_aoi_data);
}
这个sync_other_by_ghost负责将传入的同步数据封装成一个sync_msg,每次一个新的sync_msg被创建时, actor_ghost_component::m_sync_version就会自增:
void actor_ghost_component::sync_other_by_ghost(enums::entity_packet cur_entity_packet, std::shared_ptr<const std::string> sync_data)
{
sync_data_to_ghost_impl(cur_entity_packet, sync_data, true);
}
void actor_ghost_component::sync_data_to_ghost_impl(enums::entity_packet cur_entity_packet, std::shared_ptr<const std::string> sync_data, bool is_sync_others)
{
if(sync_data->size() >= std::numeric_limits<std::uint16_t>::max())
{
m_owner->logger()->error("sync_other_by_ghost sync_data size {} too big sync_cmd {} ", sync_data->size(), std::uint32_t(cur_entity_packet));
return;
}
m_sync_version++;
sync_msg cur_sync_msg;
cur_sync_msg.version = m_sync_version;
cur_sync_msg.data = sync_data;
auto cur_ts = utility::timer_manager::now_ts();
cur_sync_msg.ts = std::uint8_t((cur_ts / 1000) % 256);
cur_sync_msg.data_sz = sync_data->size();
cur_sync_msg.cmd = std::uint8_t(cur_entity_packet);
if(!m_owner->is_ghost())
{
m_owner->get_server()->call_server_multi(m_owner, std::make_shared<std::string>(cur_sync_msg.to_bytes()),enums::entity_packet::sync_ghost, m_anchors_for_created_ghost);
}
m_sync_msgs.push_back(cur_sync_msg);
if(cur_ts > m_next_check_cache_msg_expire_ts)
{
m_next_check_cache_msg_expire_ts = cur_ts + m_cache_msg_expire_gap * 1000;
check_cached_msg_expire();
}
}
sync_msg被创建完成之后,就会通过call_server_multi接口将这条同步数据广播到所有的ghost_entity上处理。ghost_entity接收这个同步消息的时候,直接调用handle_out_order_sync_msgs来处理可能的消息乱序:
utility::rpc_msg::call_result actor_ghost_component::on_entity_raw_msg(std::uint8_t cmd, std::shared_ptr<const std::string> msg)
{
switch(cmd)
{
case std::uint8_t(enums::entity_packet::sync_ghost):
{
if(!m_owner->is_ghost())
{
m_owner->logger()->error("{} on_entity_raw_msg sync_ghost while self is real", m_owner->entity_id());
return utility::rpc_msg::call_result::invalid_format;
}
sync_msg cur_sync_msg;
if(!cur_sync_msg.from_bytes(msg->data(), msg->size(), m_owner->aoi_idx()))
{
return utility::rpc_msg::call_result::invalid_format;
}
handle_out_order_sync_msgs(cur_sync_msg);
return utility::rpc_msg::call_result::suc;
}
default:
return utility::rpc_msg::call_result::rpc_not_found;
}
}
在handle_out_order_sync_msgs中,会使用插入排序的方式将当前消息插入到m_out_order_sync_msgs这个有序数组中,使得数组中的元素都按照version的递增序排列:
void actor_ghost_component::handle_out_order_sync_msgs(sync_msg new_msg)
{
if(new_msg.version <= m_sync_version)
{
// 已经处理过的消息 直接丢弃
m_owner->logger()->error("{} handle_out_order_sync_msgs new_msg version {} smaller than current version {}", m_owner->entity_id(), new_msg.version, m_sync_version);
return;
}
if(new_msg.version != m_sync_version + 1)
{
m_owner->logger()->warn("{} handle_out_order_sync_msgs new_msg version {} expected {}", m_owner->entity_id(), new_msg.version, m_sync_version + 1);
}
m_out_order_sync_msgs.push_back(new_msg);
// 进行简单的冒泡排序 把新加入的消息放到正确的位置
auto cur_msg_idx = m_out_order_sync_msgs.size() - 1;
while(cur_msg_idx > 0)
{
if(m_out_order_sync_msgs[cur_msg_idx - 1].version > m_out_order_sync_msgs[cur_msg_idx].version)
{
std::swap(m_out_order_sync_msgs[cur_msg_idx - 1], m_out_order_sync_msgs[cur_msg_idx]);
cur_msg_idx--;
}
else
{
break;
}
}
if(cur_msg_idx > 0)
{
// 说明不是紧接着的消息 直接返回 等待补齐
return;
}
// 这里先反过来 避免频繁的pop_front操作
std::reverse(m_out_order_sync_msgs.begin(), m_out_order_sync_msgs.end());
while(!m_out_order_sync_msgs.empty() && m_out_order_sync_msgs.back().version == m_sync_version + 1)
{
const auto& cur_sync_msg = m_out_order_sync_msgs.back();
m_sync_version++;
// 这里暂时忽略处理具体消息的逻辑
// 为了保持连号 需要把所有的sync other与sync ghost的消息都放进去
m_sync_msgs.push_back(cur_sync_msg);
m_out_order_sync_msgs.pop_back();
}
std::reverse(m_out_order_sync_msgs.begin(), m_out_order_sync_msgs.end());
auto cur_ts = utility::timer_manager::now_ts();
if(cur_ts > m_next_check_cache_msg_expire_ts)
{
m_next_check_cache_msg_expire_ts = cur_ts + m_cache_msg_expire_gap * 1000;
check_cached_msg_expire();
}
}
在排列好之后,就开始使用while来循环处理所有连续的消息,注意到这里对于不同的数据请求执行的操作是不一样的:
switch(cur_sync_msg.cmd)
{
case std::uint8_t(enums::entity_packet::sync_aoi_rpc):
{
m_owner->sync_to_aoi_players(m_sync_version, enums::entity_packet::sync_aoi_rpc, cur_sync_msg.data);
break;
}
case std::uint8_t(enums::entity_packet::sync_aoi_prop):
{
m_owner->sync_to_aoi_players(m_sync_version, enums::entity_packet::sync_aoi_prop, cur_sync_msg.data);
break;
}
case std::uint8_t(enums::entity_packet::sync_aoi_locomotion):
{
m_owner->sync_to_aoi_players(m_sync_version, enums::entity_packet::sync_aoi_locomotion, cur_sync_msg.data);
m_owner->on_sync_pos_yaw_diff(cur_sync_msg.data_without_aoi_index());
break;
}
case std::uint8_t(enums::entity_packet::sync_ghost_rpc):
{
m_owner->on_entity_raw_msg(std::uint8_t(enums::entity_packet::json_rpc), std::make_shared<std::string>(cur_sync_msg.data_without_aoi_index()));
break;
}
case std::uint8_t(enums::entity_packet::sync_ghost_prop):
{
auto real_msg_data = cur_sync_msg.data_without_aoi_index();
if(!json::accept(real_msg_data))
{
m_owner->logger()->error("on_sync_self_by_ghost sync_prop not json data");
break;
}
auto cur_json = json::parse(real_msg_data);
std::uint64_t prop_offset;
std::uint8_t prop_cmd;
json prop_data;
if(!spiritsaway::serialize::decode_multi(cur_json, prop_offset, prop_cmd, prop_data))
{
m_owner->logger()->error("fail to decode prop_delta msg {}", real_msg_data);
break;
}
m_owner->replay_prop_msg(spiritsaway::property::property_replay_offset(prop_offset), spiritsaway::property::property_cmd(prop_cmd), prop_data);
break;
}
default:
m_owner->logger()->warn("{} on_sync_other_by_ghost unknown cmd {}", m_owner->entity_id(), cur_sync_msg.cmd);
break;
}
-
enums::entity_packet::sync_aoi_rpc这个代表一个real_entity向其所有的在客户端里的client_actor推送一条rpc指令,因此ghost_entity收到后直接调用sync_to_aoi_players将当前需要同步的玩家进行广播 -
enums::entity_packet::sync_aoi_prop这个代表一个real_entity将自身所有客户端可见的属性都同步到其所有的在客户端里的client_actor,因此ghost_entity收到后直接调用sync_to_aoi_players将当前需要同步的玩家进行广播 -
enums::entity_packet::sync_aoi_locomotion这个代表一个位置同步消息,类似于所有客户端可见的属性,因此这里也会调用sync_to_aoi_players进行广播,不过这里还会顺带的更新一下ghost_entity的最新位置, -
enums::entity_packet::sync_ghost_rpc这个代表一个real_entity对所有ghost_entity的一次广播rpc调用,因此接到之后直接进行rpc调用即可 -
enums::entity_packet::sync_ghost_prop,这个代表real_entity的一些需要给ghost_entity同步属性,所以这里会执行属性的replay
值得注意的是如果一个属性是所有客户端可见的,那么这个属性也一定是ghost_entity可见的。但是我们在同步属性变化的时候,会将这个同步数据分别用sync_clients与sync_ghost编码两遍,并生成两个消息发送到ghost_entity。所以ghost_entity在接收到sync_aoi_prop的时候,并不会执行属性的replay,只有在sync_ghost_prop的时候才会执行属性的replay:
void actor_entity::add_prop_msg(const spiritsaway::property::property_record_offset& offset, spiritsaway::property::property_cmd cmd, spiritsaway::property::property_flags need_flag, spiritsaway::property::property_flags data_flag, const json& data)
{
// m_logger->info("add_prop_msg cmd {} need_flag {} data_flag {} data {} is_ghost {}", cmd, need_flag.value, data_flag.value, data, m_is_ghost);
// std::string sync_cmd = "prop_delta";
std::vector<json> sync_args;
sync_args.reserve(4);
sync_args.push_back(offset.value());
sync_args.push_back(std::uint8_t(cmd));
sync_args.push_back(data);
auto sync_str = std::make_shared<const std::string>(json(sync_args).dump());
const auto sync_self_flag = spiritsaway::mosaic_game::property::property_flags::sync_self;
const auto sync_ghost_flag = spiritsaway::mosaic_game::property::property_flags::sync_ghost;
const auto sync_other_flag = spiritsaway::mosaic_game::property::property_flags::sync_clients;
// 以前的方式来处理prop是错误的 对于一个item来说 里面可能有多个不同同步范围的field 假设有一个self的a与all的b
// 当我们insert一个{a:1, b:2}时 由于匹配优先级 会导致只有 {b:2} 这个json对象会经过sync_to_others_without_aoi_index 同步给所有客户端及ghost {a:1}这个分量会丢失
// 正确的做法应该是根据每种flag单独处理同步
// 以 {a:1, b:2}同步给 self
// 以 {b:2} 同步给ghost
// 以 {b:2} 同步给others 这里包括ghost的中转
// 同时 同步给ghost 要早于同步给other
switch(need_flag.value)
{
case sync_other_flag:
{
sync_to_others_without_aoi_index(enums::entity_packet::sync_aoi_prop, sync_str);
return;
}
case sync_ghost_flag:
{
auto cur_space = get_space();
if(cur_space && cur_space->is_cell_space())
{
auto cur_ghost_comp = get_component<actor_ghost_component>();
cur_ghost_comp->sync_self_to_ghost(enums::entity_packet::sync_ghost_prop, sync_str);
}
return;
}
case sync_self_flag:
{
sync_to_self_client(enums::entity_packet::sync_prop, sync_str);
return;
}
default:
m_logger->error("invalid sync flag {} offset {}", need_flag.value, offset.value());
return;
}
}
当real_entity创建好一个sync_msg的时候,会向ghost_entity广播,同时调用sync_to_aoi_players通知给当前进程里的其他real_entity。然后当ghost_entity接收到一个sync_msg的时候,也会通过sync_to_aoi_players通知给当前进程里的其他real_entity。
void actor_entity::sync_to_aoi_players(std::uint32_t sync_version, enums::entity_packet entity_packet_cmd, std::shared_ptr<const std::string> with_aoi_data)
{
for(const auto& one_pair: get_component<actor_aoi_component>()->aoi_sync_players())
{
if(one_pair.second->is_ghost())
{
continue;
}
one_pair.second->sync_to_client_from_other(entity_id(), sync_version, aoi_idx(), entity_packet_cmd, with_aoi_data);
}
}
由于其他real_entity也是有迁移能力的,所以对于同一个actor_entity的real/ghost发送过来的sync_msg,可能会出现消息的乱序、丢包等现象,不过现在我们的sync_msg都带上了版本号,这个消息连续性问题就可以解决了,在往下发送一个sync_msg之前做一个序列号校验check_other_sync_version。
void player_entity::sync_to_client_from_other(const std::string& other_entity_id, std::uint32_t other_sync_version, std::uint16_t other_aoi_idx, enums::entity_packet entity_packet_cmd, const std::shared_ptr<const std::string> data)
{
auto cur_aoi_comp = get_component<player_aoi_component>();
if(!cur_aoi_comp->check_other_sync_version(other_entity_id, other_sync_version))
{
return;
}
// 省略具体处理代码
}
check_other_sync_version的工作原理是在real_entity上保存当前客户端里所有其他actor_entity的最新已同步版本号在m_other_sync_versions这个map中。当一个新的带版本号的数据过来的时候,检查这个版本号是否等于所期待的下一个版本,如果是的话才允许同步到客户端,同时对这个版本号进行更新。注意这个版本号机制只有在cell_space时才开启,对于普通的非分布式大世界,这个检查永远都返回true。
bool player_aoi_component::check_other_sync_version(const std::string& other_entity_id, std::uint32_t other_sync_version)
{
auto cur_space = m_owner->get_space();
if(!cur_space)
{
return false;
}
if(!cur_space->is_cell_space())
{
return true;
}
auto cur_iter = m_other_sync_versions.find(other_entity_id);
if(cur_iter == m_other_sync_versions.end())
{
return false;
}
if((cur_iter->second + 1)!= other_sync_version)
{
return false;
}
cur_iter->second++;
return true;
}
这个检查可以有效的过滤掉重复数据的向下同步,但是对于缺失数据的处理还需要额外工作。下面就是一个同步数据缺失的例子:
- 在进程
A上的real_entity(M)接收到的ghost_entity(N)的最新同步版本号是100 real_entity(M)迁移到了real_entity(N)所在的进程B,发现此时N的最新同步版本号是110real_entity(N)的后续广播数据版本号都不会小于110,这样就导致M永远无法再同步N发出的广播消息
为了解决这个问题,每个actor_entity都会有一个vector<sync_msg> m_sync_msgs;队列来缓存一些最近发出的广播消息,real_entity创建sync_msg和ghost_entity接收到sync_msg的时候都会往这个队列里添加有序数据。在real_entity迁移的时候,除了带上最新版本号之外,这个缓存队列也会带上,迁移完成之后再将这个数据解析出来,设置回m_sync_msgs。
json::object_t actor_ghost_component::encode(bool for_ghost)
{
auto cur_space = m_owner->get_space();
if(!cur_space)
{
return {};
}
if(!cur_space->is_cell_space())
{
return {};
}
json::object_t result;
if(for_ghost)
{
result["sync_version"] = m_sync_version;
return result;
}
result["ghost_creating"] = m_ghost_creating;
result["ghost_created"] = m_ghost_created;
result["from_real"] = *(m_owner->get_space()->get_call_proxy());
result["sync_version"] = m_sync_version;
result["sync_msgs"] = m_sync_msgs;
m_sync_msgs.clear();
return result;
}
每次调用完广播接口,都会将这个sync_msg放到m_sync_msgs这个队列中,为了避免队列无限膨胀,会加入一个过期机制。这个机制依赖于sync_msg.ts字段,构造时时间戳设置为当前秒对256取模,然后定期调用check_cached_msg_expire来淘汰太老的数据:
void actor_ghost_component::check_cached_msg_expire()
{
if(m_sync_msgs.size() == 1)
{
return;
}
std::vector<sync_msg> temp_msg_vec;
std::uint8_t cur_ts = m_sync_msgs.back().ts;
std::uint64_t last_remain_index = m_sync_msgs.size() - 1;
if(cur_ts >= m_cache_msg_expire_gap)
{
auto min_expire_ts = cur_ts - m_cache_msg_expire_gap;
while(last_remain_index > 0)
{
const auto& cur_msg = m_sync_msgs[last_remain_index - 1];
if(cur_msg.ts < min_expire_ts)
{
break;
}
last_remain_index--;
}
}
else
{
auto max_expire_ts = cur_ts + 255 - m_cache_msg_expire_gap;
while(last_remain_index > 0)
{
const auto& cur_msg = m_sync_msgs[last_remain_index - 1];
if(cur_msg.ts > cur_ts && cur_msg.ts < max_expire_ts)
{
break;
}
last_remain_index--;
}
}
if(last_remain_index == 0)
{
return;
}
if(last_remain_index < m_sync_msgs.size() / 4)
{
// 如果要删除的元素太少 就先不删除了
return;
}
m_sync_msgs.erase(m_sync_msgs.begin(), m_sync_msgs.begin() + last_remain_index);
}
由于我们设置时间戳时使用了取模操作,所以判定过期数据是要小心的处理时间戳回环问题。
有了这个m_sync_msgs数据之后,每个player_entity迁移到新进程,需要在新进程上找到其客户端里的所有actor_entity,利用这个缓存来补充之间漏的一些同步数据,这部分代码在之前介绍过的player_aoi_component::restore_aoi中:
void player_aoi_component::restore_aoi()
{
// 省略之前已经介绍过的aoi_radius恢复代码
std::unordered_map<actor_entity*, std::uint16_t> remain_aois;
std::map<std::uint16_t, std::uint16_t> remain_aoi_ids;
for(auto one_remain_actor: remain_aois)
{
// 迁移进来之后 对于还在自己客户端aoi的actor 检查同步版本号是否匹配 如果落后则需要使用cache来补充
remain_aoi_ids[one_remain_actor.second] = one_remain_actor.first->aoi_idx();
m_actors_in_client.insert(one_remain_actor.first);
one_remain_actor.first->get_component<actor_aoi_component>()->add_sync_to_player(m_player->entity_id(), m_player);
auto other_ghost_comp = one_remain_actor.first->get_component<actor_ghost_component>();
check_resync_by_cache(one_remain_actor.first, other_ghost_comp->sync_version(), other_ghost_comp->sync_msgs());
}
// 省略一些已经介绍过的aoi_reindex代码
}
这里的check_resync_by_cache作用就是根据本地的m_other_sync_versions来获得之前存储的最新同步版本,然后从other_msgs拿出所有后续版本来做补充同步:
void player_aoi_component::check_resync_by_cache(actor_entity* other, std::uint32_t other_sync_ver, const std::vector<sync_msg>& other_msgs)
{
auto other_aoi_idx = other->aoi_idx();
auto temp_iter = m_other_sync_versions.find(other_aoi_idx);
if(temp_iter == m_other_sync_versions.end())
{
return;
}
auto pre_sync_ver = temp_iter->second;
if(pre_sync_ver >= other_sync_ver)
{
// 本地收到的版本号已经大于了other的版本号 说明不需要补充数据
return;
}
temp_iter->second = other_sync_ver;
if(!other_msgs.empty() && pre_sync_ver + 1< other_msgs.front().version)
{
// 如果本地版本号与other缓存队列中的开头版本号不匹配 说明有信息丢失 直接使用完整数据进行同步
m_owner->logger()->info("player_aoi_component {} detect other {} aoi_idx {} sync version miss match local {} other {}", m_owner->entity_id(), other->entity_id(), other_aoi_idx, pre_sync_ver, other_msgs.front().version);
auto other_encode_info = other->encode_with_flag(std::uint32_t(enums::encode_flags::other_client));
utility::rpc_msg resync_msg;
resync_msg.cmd = "notify_aoi_resync";
resync_msg.set_args(other->aoi_idx(), other_encode_info);
m_owner->sync_to_self_client(resync_msg);
return;
}
// 使用缓存数据进行补充
for(const auto& one_msg: other_msgs)
{
if(one_msg.version > pre_sync_ver)
{
m_player->sync_to_client_from_other(other->entity_id(), one_msg.version, m_owner->aoi_idx(), enums::entity_packet(one_msg.cmd), one_msg.data);
}
}
}
在做补充同步的时候可能会出现other_msgs的第一个版本号与当前同步到的最新版本号之间有空洞,此时能做的只有根据other_entity的当前数据来执行一个aoi_resync,将最新的数据encode之后下发到客户端通知客户端来直接覆盖。
异步业务流程
在之前的介绍中我们可以看出所有的状态修改者必须是real_entity,ghost_entity的作用就是负责作为一些客户端可见以及服务端其他real_entity可见的属性的副本而存在的。所以actor_entity之间做交互的时候,所要修改的actor_entity在本地有三种情况:
- 对应的
real/ghost在当前cell_space中并不存在 - 对应的
ghost在当前cell_space中存在 - 对应的
real在当前的cell_space中存在
如果是最后一种情况,业务逻辑就简单了,处理起来与非分布式大世界中的代码没有什么不同。但是在前两种情况下,正常的业务逻辑将会被打断,此时的逻辑上下文需要通过rpc发送到对应的real_entity上执行,当对端的real_entity执行完成之后再以rpc的形式通知回来,然后再恢复上下文继续执行。这样完整的业务逻辑就被拆成了多个零碎的部分,如果期间涉及到多个real_entity之间的交互,业务流就被彻底切碎了,变得极其难以维护。下面就是一个非常简单的战斗结算例子。
战斗的发起者是客户端的玩家,调用了下面的这个rpc来发起一个攻击:
void player_combat_component::request_begin_hit(const utility::rpc_msg& msg, const std::string& target_id, std::uint64_t client_hit_seq)
{
m_owner->logger()->info("request_begin_hit target {} client_hit_seq {}", target_id, client_hit_seq);
auto server_hit_seq = m_actor_combat_comp->try_hit(target_id);
utility::rpc_msg reply_msg;
reply_msg.cmd = "reply_begin_hit";
reply_msg.set_args(client_hit_seq, server_hit_seq);
m_player->call_client(reply_msg);
}
这里的actor_combat_component::try_hit会检查一下攻击目标是否在当前的mono_space/cell_space里,如果不在则直接返回,否则走后续的逻辑:
std::uint64_t actor_combat_component::try_hit(const std::string& target_id)
{
auto target_actor = m_owner->get_space()->get_entity(target_id);
if(!target_actor)
{
return 0;
}
return try_hit(target_actor);
}
粗看一下觉得在cell_space环境下没有找到(real/ghost)_entity直接返回有点暴力,但是其实如果当前当前场景找不到的话,客户端其实也应该无法同步到这个actor_entity,那么客户端请求攻击一个当前看不到的actor_entity就是一个非法操作,所以直接返回逻辑上是正确的。
如果找到了,则使用第二个try_hit来构造一个攻击上下文hit_record:
struct hit_record
{
std::uint64_t from_online_id; // 攻击发起者的id
std::uint64_t target_online_id; // 受击者的id
std::uint64_t from_entity_flag; // 攻击发起者的entity_flag
std::uint64_t hit_ts; // 攻击时间戳
double hit_damage; // 预期伤害
std::uint64_t hit_seq; // 攻击序列号
json encode() const;
bool decode(const json& data);
bool operator==(const hit_record& other) const;
};
std::uint64_t actor_combat_component::try_hit(actor_entity* target)
{
auto cur_ts = utility::timer_manager::now_ts();
if(m_owner->combat_prop()->last_hit_ts() + m_owner->attr_prop()->attack_gap() > cur_ts)
{
return 0;
}
if(!can_hit(target))
{
return 0;
}
hit_record cur_hit_record;
cur_hit_record.hit_seq = m_owner->combat_prop()->last_hit_seq() + 1;
m_owner->combat_prop_proxy()->last_hit_seq().set(cur_hit_record.hit_seq);
cur_hit_record.hit_ts = cur_ts;
// 现在设置为0.5s之后检查命中
m_owner->combat_prop_proxy()->do_hit_ts().insert(cur_hit_record.hit_seq, cur_ts + 500);
// 现在设置为5s之后检查技能超时
m_owner->combat_prop_proxy()->hit_expire_ts().insert(cur_hit_record.hit_seq, cur_ts + 5000);
cur_hit_record.from_online_id = m_owner->online_entity_id();
cur_hit_record.target_online_id = target->online_entity_id();
cur_hit_record.hit_damage = m_owner->attr_prop()->attack();
cur_hit_record.from_entity_flag = m_owner->entity_flag();
m_owner->combat_prop_proxy()->hit_records().insert(cur_hit_record.hit_seq, cur_hit_record);
m_owner->combat_prop_proxy()->last_hit_ts().set(cur_ts);
m_hit_begin_dispatcher.dispatch(cur_hit_record);
if(!m_hit_check_timer.valid())
{
m_hit_check_timer = m_owner->add_timer_with_gap(std::chrono::milliseconds(100), [this]()
{
check_hits();
});
}
return cur_hit_record.hit_seq;
}
构造之前使用can_hit来检查目标是否可以攻击,为了让这个can_hit检查尽可能的细致,避免后续的攻击到对方real_entity被前置检查无效,ghost_entity需要同步很多的属性,例如当前血量和阵营等字段,真正的业务中其实需要检查很多东西:
bool actor_combat_component::can_hit(const actor_entity* target)
{
if(!target->attr_prop())
{
return false;
}
if(target->attr_prop()->hp() <= 0)
{
return false;
}
if(target == m_owner)
{
return false;
}
if(target->combat_prop()->faction() == m_owner->combat_prop()->faction())
{
return false;
}
return true;
}
当构造好这个hit_record之后,正常的逻辑根据攻击动画来计算物理上是否命中。但是目前我们的服务器很粗糙,所以设置的是0.5s之后直接假设命中。当命中的时候会调用do_hit,这里会根据指定的actor_entity的状态来做具体的逻辑:
- 如果不存在,则直接返回,等待这个攻击自动超时删除
- 如果目标是
ghost_entity,则发送一个server_on_hit来通知对方被击中,此时需要带上当前real_entity的回调地址call_proxy - 如果目标是
real_entity,则直接调用on_hit来执行命中结算后处理
void actor_combat_component::do_hit(const hit_record& cur_hit)
{
auto other_entity = m_owner->get_space()->get_entity(utility::entity_desc::gen_local_id(cur_hit.target_online_id));
if(!other_entity)
{
return;
}
if(other_entity->is_ghost())
{
utility::rpc_msg cur_hit_msg;
cur_hit_msg.cmd = "server_on_hit";
cur_hit_msg.set_args(m_owner->entity_id(), *m_owner->get_call_proxy(), serialize::encode(cur_hit));
m_owner->call_server(other_entity->get_call_proxy(), cur_hit_msg);
}
else
{
auto other_combat_comp = other_entity->get_component<actor_combat_component>();
other_combat_comp->on_hit(m_owner, m_owner->entity_id(), *m_owner->get_call_proxy(), cur_hit);
}
}
其实这里的server_on_hit就是on_hit的一个简单的异步封装,只不过第一个参数设置为了nullptr,攻击发起者不在当前进程:
void actor_combat_component::server_on_hit(const utility::rpc_msg& msg, const std::string& from_eid, const std::string& from_proxy, const hit_record& hit_info)
{
on_hit(nullptr, from_eid, from_proxy, hit_info);
}
这里的on_hit先计算当前应该扣除的血量,修改相关属性之后,将攻击结算的伤害通知回攻击者,这里又会根据是否是real/ghost来区分逻辑,如果是real则直接调用on_hit_feedback,如果是ghost则发送server_hit_feedback这个rpc:
void actor_combat_component::on_hit(actor_entity* from_entity, const std::string& from_eid, const std::string& from_proxy, const hit_record& hit_info)
{
// 省略伤害结算逻辑
bool is_kill = m_owner->attr_prop()->hp() <= 0.f;
if(from_entity && !from_entity->is_ghost())
{
from_entity->get_component<actor_combat_component>()->on_hit_feedback(m_owner, hit_info.hit_seq, real_dmg, is_kill);
}
else
{
utility::rpc_msg cur_hit_back_msg;
cur_hit_back_msg.cmd = "server_hit_feedback";
cur_hit_back_msg.set_args(serialize::encode(hit_info), real_dmg, is_kill);
m_owner->call_server(std::make_shared<std::string>(from_proxy), cur_hit_back_msg);
}
// 省略一些代码
}
大家也可以猜到这里的server_hit_feedback其实也是一个对on_hit_feedback的简单封装,第一个参数设置为了nullptr代表受击方来自另外一个进程:
void actor_combat_component::server_hit_feedback(const utility::rpc_msg& msg, std::uint64_t hit_seq, double real_dmg, bool is_kill)
{
on_hit_feedback(nullptr, hit_seq, real_dmg, is_kill);
}
当一个on_hit_feedback回来之后,这个攻击算是生命周期结束,可以从未完成攻击中删除了:
void actor_combat_component::on_hit_feedback(actor_entity* dest_entity, std::uint64_t hit_seq, double real_dmg, bool is_kill)
{
auto temp_hit_iter = prop_hit_records().find(hit_seq);
if(temp_hit_iter == prop_hit_records().end())
{
m_owner->logger()->error("on_hit_feedback invalid hit_seq {} real_dmg {}", hit_seq, real_dmg);
return;
}
if(!dest_entity)
{
dest_entity = m_owner->get_space()->get_entity_by_online_id(temp_hit_iter->second.target_online_id);
}
m_owner->logger()->debug("on_hit_feedback dest {} hit_damage {} real_dmg {} is_kill {}", temp_hit_iter->second.target_online_id, temp_hit_iter->second.hit_damage, real_dmg, is_kill);
const auto& new_hit_record = temp_hit_iter->second;
m_owner->combat_prop_proxy()->hit_expire_ts().erase(hit_seq);
m_hit_end_dispatcher.dispatch(new_hit_record);
m_owner->combat_prop_proxy()->hit_records().erase(hit_seq);
}
全套流程走下来非常冗长,涉及到多个异步调用:
