BigWorld 的分布式场景管理
在之前的内容中已经介绍过,Space是一个KDTree,每个Cell对应KDTree的一个节点,每个节点都有一个Rect区域,这个区域就是当前Cell所负责的区域。在分布式场景中,Cell会被分配到不同的CellApp进程上运行,而每个CellApp进程会负责多个Cell的运行。为了保证每个CellApp的负载均衡,需要定期对各个Cell进行检查,并根据当前的负载情况进行调整,这包括创建新的Cell来分担负载以及调整现有Cell的边界。下面来对这些Cell的调整过程来做具体的分析。
Space的Cell调整
如果大量的玩家涌进了同一个区域,那么这些玩家就会被分配到同一个Cell中,这样会让这个Cell所消耗的资源增加很多,有些情况下甚至是当前Cell下Player数量的平方复杂度。为了避免这样的Cell将当前CellApp的资源耗尽,需要定期对Cell进行资源使用量检查,如果发现大于特定阈值,则有两种选择:
- 一种是将这个
Cell进行分裂,创建一个新的Cell来共同分担当前Cell对应Rect区域的所有负载,对应的入口为CellAppMgr::metaLoadBalance - 另外一种是将这个
Cell的负责区域转移一部分给其兄弟Cell,如果其有兄弟Cell的话,对应的入口为CellAppMgr::loadBalance
这两个入口都是通过计时器定期调用的:
/**
* This method responds to timeout events.
*/
void CellAppMgr::handleTimeout( TimerHandle /*handle*/, void * arg )
{
if (pShutDownHandler_ &&
pShutDownHandler_->isPaused())
{
pShutDownHandler_->checkStatus();
return;
}
if (isShuttingDown_)
{
return;
}
switch ((uintptr)arg)
{
#ifndef BW_EVALUATION
case TIMEOUT_LOAD_BALANCE:
{
if (!this->isRecovering() && hasStarted_)
{
this->loadBalance();
}
break;
}
case TIMEOUT_META_LOAD_BALANCE:
{
if (shouldMetaLoadBalance_ && hasStarted_)
{
this->metaLoadBalance();
}
break;
}
#endif
// 省略很多代码
}
}
计时器的注册在CellAppMgr::init这个初始化函数中,根据配置的检查间隔去注册重复计时器:
/**
* The initialisation method.
*/
bool CellAppMgr::init( int argc, char * argv [] )
{
if (!this->ManagerApp::init( argc, argv ))
{
return false;
}
if (!interface_.isGood())
{
NETWORK_ERROR_MSG( "CellAppMgr::init: "
"Failed to create network interface. Unable to proceed.\n" );
return false;
}
bool isRecovery = false;
for (int i = 0; i < argc; ++i)
{
if (strcmp( argv[i], "-recover" ) == 0)
{
isRecovery = true;
}
else if (strcmp( argv[i], "-machined" ) == 0)
{
CONFIG_INFO_MSG( "CellAppMgr::init: Started from machined\n" );
}
}
ReviverSubject::instance().init( &interface_, "cellAppMgr" );
PROC_IP_INFO_MSG( "Internal address = %s\n", interface_.address().c_str() );
CONFIG_INFO_MSG( "Is Recovery = %s\n",
this->isRecovering() ? "True" : "False" );
loadBalanceTimer_ =
this->mainDispatcher().addTimer(
int( Config::loadBalancePeriod() * 1000000.0 ),
this, (void *)TIMEOUT_LOAD_BALANCE,
"LoadBalance" );
if (Config::metaLoadBalancePeriod() > 0.0)
{
metaLoadBalanceTimer_ = this->mainDispatcher().addTimer(
int( Config::metaLoadBalancePeriod() * 1000000.0 ),
this, (void *)TIMEOUT_META_LOAD_BALANCE,
"MetaLoadBalance" );
}
// 省略很多代码
}
接下来对这两种情况来做具体的梳理。
Cell 的创建
CellAppMgr::metaLoadBalance检查新Cell创建的理由有两个:
Space加载时的预先创建,这个在checkLoadingSpace函数中Space加载完成之后的根据负载按需创建,这个在appGroups.checkForOverloaded部分
/**
* This method checks whether there needs to be any migration of spaces between
* cell applications. If so, it will take action.
*/
void CellAppMgr::metaLoadBalance()
{
// Identify the CellApp groups used in meta-load-balancing. These are the
// groups of CellApps such that normal load-balancing can balance their
// loads.
CellAppGroups appGroups( cellApps_ );
const float mergeThreshold =
this->avgCellAppLoad() + CellAppMgrConfig::metaLoadBalanceTolerance();
// Do meta-balance groups need to be joined?
appGroups.checkForOverloaded( mergeThreshold );
// Should more CellApps be added to help with loading?
bool hasLoadingSpaces = this->checkLoadingSpaces();
if (!hasLoadingSpaces)
{
// Are there underloaded groups who should have Cells retired?
appGroups.checkForUnderloaded( CellAppLoadConfig::lowerBound() );
}
}
Space加载时的预先创建
这里的checkLoadingSpaces会遍历所有的Space,对于每个Space根据配置的最小和最大Cell数量来补充新的Cell,如果这个Space所需的资源还没有完全加载完成的话:
/**
* This method checks whether there are any spaces that are loading geometry
* that could benefit from more loading cells.
*/
bool CellAppMgr::checkLoadingSpaces()
{
bool hasLoadingSpaces = false;
Spaces::iterator spaceIter = spaces_.begin();
while (spaceIter != spaces_.end())
{
Space * pSpace = spaceIter->second;
// Work out whether more cells are needed to help do the initial chunk
// loading.
bool isLoading = !pSpace->hasLoadedRequiredChunks();
hasLoadingSpaces |= isLoading;
// There is an upper bound on the number of loading cells for a space
// and the minimum size of these loading cells.
bool needsMoreLoadingCells = isLoading &&
(pSpace->numCells() < Config::maxLoadingCells()) &&
(pSpace->spaceBounds().area()/pSpace->numCells() >
Config::minLoadingArea());
if (needsMoreLoadingCells)
{
CellData * pCell = pSpace->addCell();
if (pCell)
{
INFO_MSG( "CellAppMgr::checkLoadingSpaces: "
"Added CellApp %u to Space %u.\n",
pCell->cellApp().id(), pSpace->id() );
}
}
++spaceIter;
}
return hasLoadingSpaces;
}
可以看到最终会调用到pSpace->addCell()这个无参addCell的版本,并最终调用到Space::addCell( CellApp & cellApp, NULL)这个第二个参数为nullptr的双参数版本,并最终执行到这一行:
pRoot_ = (pRoot_ ? pRoot_->addCell( pCellData ) : pCellData);
由于初始创建Space的时候pRoot已经有值了,此时会执行pRoot_->addCell,这里会根据pRoot_是叶子节点CellData还是内部节点InternalNode来执行不同的addCell逻辑。
如果是CellData的话,逻辑就很简单,执行对当前CellData的水平切分,因为默认的切割方向是水平的:
BSPNode * addCell( CellData * pCell, bool isHorizontal = true )
如果是内部节点InternalNode的话,就会根据左右两个子树的节点数量大小来选择往左子树去补充还是右子树去补充:
/**
* This method adds a new cell to this subtree.
*
* @return The new subtree.
*/
BSPNode * InternalNode::addCell( CellData * pCell, bool isHorizontal )
{
bool addToLeft = false;
if (leftCount_ < rightCount_)
{
addToLeft = true;
}
else if (leftCount_ == rightCount_)
{
float value = range_.range1D( isHorizontal_ ).midPoint();
addToLeft = (position_ < value);
}
if (addToLeft && !pLeft_->isRetiring())
{
pLeft_ = pLeft_->addCell( pCell, !isHorizontal_ );
}
else
{
pRight_ = pRight_->addCell( pCell, !isHorizontal_ );
}
return this;
}
注意到这里递归调用addCell的时候,会将切割方向执行反向,这样就避免由于切割方向默认值带来的永远执行水平切割问题。
Space负载均衡时的按需创建
appGroups.checkForOverloaded这里会遍历所有的CellAppGroup,检查其平均负载是否大于指定值,如果大于指定值,则将这个CellAppGroup加入到一个临时集合OverloadedGroups中:
/**
* This method checks whether there are any overloaded CellApp groups and adds
* cells, if necessary, to help balance the load.
*/
void CellAppGroups::checkForOverloaded( float addCellThreshold )
{
OverloadedGroups overloadedGroups;
List::iterator iter = list_.begin();
while (iter != list_.end())
{
CellAppGroup & group = *iter;
if (group.avgLoad() > addCellThreshold)
{
overloadedGroups.add( group );
}
++iter;
}
overloadedGroups.addCells();
}
按照一般的设计来说,这里遍历的应该是Space,去获取哪些Space目前平均Cell负载比较高,而不是这个不明结构体CellAppGroup,所以在往后介绍addCells之前需要先明确一下这个CellAppGroup是干什么的。
/**
* This class is used to represent a group of CellApps that are in a
* meta-balance group. This is, a set of CellApps that can balance their
* load through the normal load balancing. For this to occur, there must
* be multi-cell Spaces that cover the CellApps.
*/
class CellAppGroup
{
public:
CellAppGroup() : avgLoad_( -1.f ) {}
~CellAppGroup();
void addCell();
void checkForUnderloaded( float loadLowerBound );
void insert( CellApp * );
void join( CellAppGroup * pMainGroup );
private:
typedef BW::set< CellApp * > Set;
typedef BW::map< uint32, int > CellAppsPerIPMap;
CellAppsPerIPMap cellAppsPerIP_;
float avgLoad_;
Set set_;
// 省略一些代码
};
根据这个结构体定义可以知道CellAppGroup是一组CellApp的集合,代表一组进程,这里的Insert接口会同时更新这个Set和每个独立Ip对应的CellApp的数量:
void CellAppGroup::insert( CellApp * pApp )
{
MF_ASSERT( pApp->pGroup() == NULL );
set_.insert( pApp );
pApp->pGroup_ = this;
++cellAppsPerIP_[ pApp->addr().ip ];
}
对应的avgLoad计算会对这组进程的负载进行累加然后求平均:
float CellAppGroup::avgLoad( int ifNumRemoved ) const
{
int count = -ifNumRemoved;
float totalLoad = 0.f;
CellAppGroup::iterator iter = this->begin();
while (iter != this->end())
{
CellApp * pApp = *iter;
totalLoad += pApp->smoothedLoad();
if (!pApp->hasOnlyRetiringCells())
{
++count;
}
++iter;
}
return (count > 0) ? totalLoad/count : FLT_MAX;
}
将这些CellApp聚合起来的意义在于限制某些Space新增Cell的时候选取的CellApp必须在这个Space对应的CellAppGroup中执行findBestCellApp选择,不同的CellAppGroup负载均衡完全执行隔离,也算是一种资源隔离机制:
CellData * Space::addCell()
{
CellAppGroup * pGroup = NULL;
if (!cells_.empty())
{
pGroup = cells_.front()->cellApp().pGroup();
}
const CellApps & cellApps = CellAppMgr::instance().cellApps();
CellApp * pCellApp = cellApps.findBestCellApp( this, pGroup );
return pCellApp != NULL ? this->addCell( *pCellApp ) : NULL;
}
知道CellAppGroup的意义之后,我们继续分析CellAppGroups::checkForOverloaded中最后的调用overloadedGroups.addCells,这个会遍历所有存储的CellAppGroup来尝试调用addCell函数,注意这里的遍历是逆序遍历,优先处理负载高的:
class OverloadedGroups
{
public:
void add( CellAppGroup & group )
{
map_.insert( std::make_pair( group.avgLoad(), &group ) );
}
void addCells()
{
// Iterate over in reverse order so that the most loaded group is merged
// first.
Map::reverse_iterator iter = map_.rbegin();
while (iter != map_.rend())
{
CellAppGroup * pGroup = iter->second;
pGroup->addCell();
++iter;
}
}
private:
typedef std::multimap< float, CellAppGroup * > Map;
Map map_;
};
这里的pGroup->addCell会通过chooseConnectionSpace选择内部的一个Space来尝试添加新的Cell:
/**
* This method attempts to add a cell to this group to help spread the group's
* load.
*/
void CellAppGroup::addCell()
{
if (this->isEmpty())
{
// This occurs if this group was already merged with another.
return;
}
if (!this->cancelRetiringCellApp())
{
// Which space from this group should have a Cell added?
Space * pSpace = this->chooseConnectionSpace();
if (pSpace)
{
pSpace->addCell();
}
}
}
这里的chooseConnectionSpace会根据配置的负载均衡规则来选取其中的最佳Space,规则主要有Cell数量最多的,Cell数量最小的,以及负载最大的等规则:
/**
* This method chooses a space in this group that would be good to use to
* connect to another group.
*/
Space * CellAppGroup::chooseConnectionSpace() const
{
CellAppMgrConfig::MetaLoadBalanceScheme scheme =
static_cast< CellAppMgrConfig::MetaLoadBalanceScheme >(
CellAppMgrConfig::metaLoadBalanceScheme() );
switch (scheme)
{
case CellAppMgrConfig::SCHEME_LARGEST:
{
LargestSpaceChooser chooser;
return this->chooseConnectionSpace( chooser );
}
break;
case CellAppMgrConfig::SCHEME_SMALLEST:
{
SmallestSpaceChooser chooser;
return this->chooseConnectionSpace( chooser );
}
break;
case CellAppMgrConfig::SCHEME_HYBRID:
{
HybridSpaceChooser chooser;
return this->chooseConnectionSpace( chooser );
}
break;
default:
ERROR_MSG( "CellAppGroup::chooseConnectionSpace: "
"Invalid scheme %d. Switching.\n", scheme );
CellAppMgrConfig::metaLoadBalanceScheme.set(
CellAppMgrConfig::SCHEME_HYBRID );
break;
}
return NULL;
}
一般来说同一个Space在同一个CellApp上只会有一个Cell,因为有多个的情况会显著的增加Cell间通信的复杂度,还不如直接合并为一个。
选出来一个合乎要求的Space之后,就会对这个Space执行无参数的addCell操作,这个无参addCell我们已经分析过了,就是从当前Space对应的CellAppGroup资源组里选择一个CellApp来创建Cell。但是这里有一个比较令人疑惑的点,就是新添加的Cell依然会分配在这个Space的CellAppGroup里,分裂之后相邻两个Cell之间还会添加一些额外的通信复杂度。只有在Cell内负载与负责区域面积之前的关系小于线性相关(例如与最大边长线性相关),才能造成整个CellAppGroup的总体负载下降。或者通过新Cell将一些负载从高负载的CellApp转移到低负载的CellApp上,此时总体负载上升,但是最大CellApp负载是下降的。
Cell的边界调整
只有在整个CellAppGroup的平均负载大于某个阈值之后才会触发新Cell的添加,但是平常遇到更多的情况是由于人群分布不均匀时引发的Cell间负载差异过大。此时为了避免单Cell承载了太多的负载,需要执行Cell间的边界调整,尽可能的使任意的InternalNode的左右子树的负载相对均衡。这个调整的入口在CellAppMgr::loadBalance之中,是计时器定时触发的:
/**
* This method performs load balancing on each of the spaces.
*/
void CellAppMgr::loadBalance()
{
// Balance all spaces
{
Spaces::iterator iter = spaces_.begin();
while (iter != spaces_.end())
{
if (g_shouldLoadBalance)
{
iter->second->loadBalance();
}
else
{
iter->second->informCellAppsOfGeometry( /*shouldSend*/ false );
}
iter++;
}
}
// This is done after balancing each space so that the messages to each
// CellApp are aggregated into a single bundle.
cellApps_.sendToAll();
}
CellAppMgr::loadBalance会遍历所有的Space执行loadBalance,这里的Space::loadBalance会通过BSP树的根节点来执行递归的balance:
/**
* This method changes the geometry of cells to balance the load.
*/
void Space::loadBalance()
{
#ifndef BW_EVALUATION
if (isBalancing_)
{
WARNING_MSG( "Space::loadBalance( %d ): Called recursively.\n", id_ );
return;
}
if (pRoot_ == NULL)
{
INFO_MSG( "Space::loadBalance( %d ): Called with pRoot_ == NULL.\n",
id_ );
return;
}
isBalancing_ = true;
pRoot_->updateLoad();
BW::Rect rect(
-std::numeric_limits< float >::max(),
-std::numeric_limits< float >::max(),
std::numeric_limits< float >::max(),
std::numeric_limits< float >::max() );
// If a branch has a cell that is overloaded, we do not want to make
// things worse. This is especially a cap when new cells are being
// added.
float loadSafetyBound = std::max( CellAppLoadConfig::safetyBound(),
pRoot_->avgSmoothedLoad() * CellAppLoadConfig::safetyRatio() );
bool wasLoaded = this->hasLoadedRequiredChunks();
pRoot_->balance( rect, loadSafetyBound );
pRoot_->updateLoad();
// 省略后续代码
}
函数开头会首先通过根节点的updateLoad来递归的更新BSP树中所有节点的负载,然后再计算出一个负载阈值loadSafetyBound,如果某个Cell的负载阈值大于这个loadSafetyBound,则可以考虑调整其边界。
根据我们之前的分析pRoot_可能是叶子节点CellNode,也可能是内部节点InternalNode,所以这个balance函数需要处理这两种版本。如果是叶子节点CellData就很简单了,由于其没有子节点,因此不需要考虑边界的调整,所以我们需要关心的重点是InternalNode的实现:
/**
* This method is used to balance the load between the cells.
*/
void InternalNode::balance( const BW::Rect & range,
float loadSafetyBound, bool isShrinking )
{
range_ = range;
BalanceDirection balanceDir = this->doBalance( loadSafetyBound );
this->balanceChildren( loadSafetyBound, balanceDir );
}
/**
* This method balances the children.
*/
void InternalNode::balanceChildren( float loadSafetyBound,
InternalNode::BalanceDirection balanceDir )
{
// Update child ranges, and trigger load balancing for children.
BW::Rect leftRange;
BW::Rect rightRange;
this->calculateChildRanges( leftRange, rightRange );
pLeft_->balance( leftRange, loadSafetyBound,
/* isShrinking */ balanceDir == BALANCE_LEFT );
pRight_->balance( rightRange, loadSafetyBound,
/* isShrinking */ balanceDir == BALANCE_RIGHT );
}
上面的doBalance负责根据安全负载阈值来计算当前InternalNode的分界线调整方向,并执行调整。调整完成之后再调用balanceChildren来执行左右两个子树的递归调整,不过这里的isShrinking参数好像在调整InternalNode的时候没什么用,具体用处在后面介绍Cell删除的时候才会体现。所以这个balance函数的任务基本都在doBalance函数中:
/**
* This method does that actual work of balancing.
*/
InternalNode::BalanceDirection InternalNode::doBalance( float loadSafetyBound )
{
bool shouldLimitToChunks =
CellAppMgrConfig::shouldLimitBalanceToChunks();
const bool isRetiring = pLeft_->isRetiring() || pRight_->isRetiring();
float loadDiff = 0.f;
// Do not move if we do not know if the CellApp has created the Cell.
// This avoids problems with other Cells offloading before the new Cell
// exists.
const bool childrenCreated = pLeft_->hasBeenCreated() &&
pRight_->hasBeenCreated();
// Check whether we should balance based on unloaded chunks
if (!isRetiring)
{
if (!this->hasLoadedRequiredChunks())
{
shouldLimitToChunks = false;
if ((this->maxLoad() < loadSafetyBound) &&
CellAppMgrConfig::shouldBalanceUnloadedChunks())
{
return childrenCreated ?
this->balanceOnUnloadedChunks( loadSafetyBound ) :
BALANCE_NONE;
}
}
// Difference from average.
const float leftAvgLoad = pLeft_->avgLoad();
const float rightAvgLoad = pRight_->avgLoad();
const float nodeAvgLoad = this->avgLoad();
if (leftAvgLoad > rightAvgLoad)
{
loadDiff = fabs( leftAvgLoad - nodeAvgLoad );
}
else
{
loadDiff = -fabs( rightAvgLoad - nodeAvgLoad );
}
}
else
{
loadDiff = pLeft_->isRetiring() ? 1.f : -1.f;
}
BalanceDirection balanceDir = this->dirFromLoadDiff( loadDiff );
BSPNode * pFromNode = this->growingChild( balanceDir );
const bool shouldMove = (balanceDir != BALANCE_NONE) &&
childrenCreated &&
(pFromNode->maxLoad() < loadSafetyBound);
float newPos = position_;
if (shouldMove)
{
float entityLimit = this->entityLimitInDirection( balanceDir,
loadDiff * balanceAggression_ );
float chunkLimit = shouldLimitToChunks ?
this->chunkLimitInDirection( balanceDir ) : entityLimit;
newPos = this->closestLimit( position_,
entityLimit, chunkLimit, balanceDir );
}
else
{
balanceDir = BALANCE_NONE;
}
position_ = newPos;
this->adjustAggression( balanceDir );
return balanceDir;
}
这个函数的开头会首先计算左右两个子节点的aveLoad的差值loadDiff,根据差值的符号来决定是向左移动边界还是向右移动边界,这里的左右是相对于水平划分来说的,如果是垂直划分则是向下和向上:
/**
* This method returns a balance direction given left minus right loads.
*/
InternalNode::BalanceDirection InternalNode::dirFromLoadDiff( float loadDiff )
{
return
(loadDiff > 0.f) ? BALANCE_LEFT :
(loadDiff < 0.f) ? BALANCE_RIGHT : BALANCE_NONE;
}
计算出方向之后再调用growingChild来确定要增大的子节点是哪一个:
BSPNode * growingChild( BalanceDirection direction ) const
{
return (direction == BALANCE_LEFT) ? pRight_ : pLeft_;
}
如果这个要增大范围的节点的近期最大负载小于阈值loadSafetyBound的话,才会执行真正的边界调整:
const bool shouldMove = (balanceDir != BALANCE_NONE) &&
childrenCreated &&
(pFromNode->maxLoad() < loadSafetyBound);
float newPos = position_;
if (shouldMove)
{
float entityLimit = this->entityLimitInDirection( balanceDir,
loadDiff * balanceAggression_ );
float chunkLimit = shouldLimitToChunks ?
this->chunkLimitInDirection( balanceDir ) : entityLimit;
newPos = this->closestLimit( position_,
entityLimit, chunkLimit, balanceDir );
}
else
{
balanceDir = BALANCE_NONE;
}
这里的entityLimitInDirection会根据负载差值来计算出一个合适的新边界出来:
/**
* This method returns the limit of movement based on the entity bounds in the
* direction of movement.
*/
float InternalNode::entityLimitInDirection( BalanceDirection direction,
float loadDiff ) const
{
// If less than the minimum level, don't move.
if (fabs( loadDiff ) < BalanceConfig::minCPUOffload())
{
return position_;
}
BSPNode * pToNode = this->shrinkingChild( direction );
// This also works for BALANCE_NONE case.
bool shouldGetMax = (pToNode == pLeft_);
return pToNode->entityBoundLevels().entityBoundForLoadDiff(
loadDiff,
isHorizontal_,
shouldGetMax,
position_ );
}
这里的pToNode指向的是要缩小边界的节点,计算新边界的时候会利用到EntityBoundLevels这样的结构来辅助计算,这个结构主要用来存储当前节点的分段负载信息。这里的分段只考虑单一轴方向,但是会同时存储四个方向来组成一个Rect,这四个方向分别为从左到右、从右到左,从上到下,从下到上。这里的entityBounds_存储的就是各个方向的分界线,然后entityLoads_存储的就是这个方向上到此分界线的累加负载是多少:
/**
* This class maintains entity bound levels data
* received from CellApps. It's supposed to be a property of
* internal BSP nodes and leaves.
*/
class EntityBoundLevels
{
public:
EntityBoundLevels( int numLevels ) :
entityBounds_( numLevels ),
entityLoads_( numLevels )
{
}
private:
typedef BW::vector< BW::Rect > Rects;
Rects entityBounds_;
// entityLoads_ isn't used as an array of actual rectangles
// 'left' loads may be greater than a 'right' ones and vice versa
Rects entityLoads_;
};

这个entityboundlevels需要尽可能的与对应的Cell的真实负载保持一致,所以CellApp那边会在Tick里的执行RPC来更新对应Cell的entityboundlevels,下面就是完整的调用链相关代码:
/**
* This method handles the game tick time slice.
*/
void CellApp::handleGameTickTimeSlice()
{
AUTO_SCOPED_PROFILE( "gameTick" );
if (this->inShutDownPause())
{
this->tickShutdown();
return;
}
this->updateLoad();
cellAppMgr_.informOfLoad( persistentLoad_ );
this->updateBoundary();
// 省略很多代码
}
/**
* This method lets the CellAppMgr know about our bounding rectangle.
*/
void CellApp::updateBoundary()
{
AUTO_SCOPED_PROFILE( "calcBoundary" );
// TODO: We could probably only send this at the load balancing rate or
// it could be part of informOfLoad?
cellAppMgr_.updateBounds( cells_ );
}
/**
* This method informs the CellAppMgr of data that it needs for load balancing.
*
* @param cells Collection of cells on this CellApp.
*/
void CellAppMgrGateway::updateBounds( const Cells & cells )
{
// TODO: Put this in the same bundle as cellAppMgr_.informOfLoad
Mercury::Bundle & bundle = channel_.bundle();
bundle.startMessage( CellAppMgrInterface::updateBounds );
cells.writeBounds( bundle );
channel_.send();
}
/**
* This method handles a message from the associated cellapp. It is used to
* inform us about where the entities are on the cells of this application.
*/
void CellApp::updateBounds( BinaryIStream & data )
{
while (data.remainingLength() != 0)
{
SpaceID spaceID;
data >> spaceID;
CellData * pCell = cells_.findFromSpaceID( spaceID );
if (pCell != NULL)
{
pCell->updateBounds( data );
}
else
{
ERROR_MSG( "CellApp::updateBounds: "
"CellApp %s has no cell in space %u\n",
this->addr().c_str(), spaceID );
// Just forget the remaining updates. It'll get it next time.
data.finish();
return;
}
}
}
/**
* This method updates the bounds associated with this cell based on
* information sent from the CellApp.
* inform us about where the entities are on the cells of this application.
*/
void CellData::updateBounds( BinaryIStream & data )
{
this->updateEntityBounds( data );
data >> chunkBounds_;
this->space().updateBounds( data );
data >> numEntities_;
if (numEntities_ > 0)
{
this->space().hasHadEntities( true );
}
}
/**
* This method reads in the entity bounds from a stream.
*/
void CellData::updateEntityBounds( BinaryIStream & data )
{
entityBoundLevels_.updateFromStream( data );
}
/**
* This method reads in the entity bounds from a stream.
*/
void EntityBoundLevels::updateFromStream( BinaryIStream & data )
{
// This needs to match CellApp's Space::writeEntityBounds.
for (int isMax = 0; isMax <= 1; ++isMax)
{
for (int isY = 0; isY <= 1; ++isY)
{
for (int level = entityBounds_.size() - 1; level >= 0; --level )
{
float pos;
float load;
data >> pos >> load;
entityBounds_[ level ].range1D( isY )[ isMax ] = pos;
entityLoads_[ level ].range1D( isY )[ isMax ] = load;
}
}
}
}
这里我们需要重点关注一下初始的Cell是如何收集EntityLevelBounds的:
/**
* This method streams on various boundaries to inform the CellAppMgr for the
* purposes of load balancing.
*
*/
void Cells::writeBounds( BinaryOStream & stream ) const
{
Container::const_iterator iter = container_.begin();
while (iter != container_.end())
{
iter->second->writeBounds( stream );
++iter;
}
}
/**
* This method streams on various boundaries to inform the CellAppMgr for the
* purposes of load balancing.
*/
void Cell::writeBounds( BinaryOStream & stream ) const
{
if (!this->isRemoved())
{
this->space().writeBounds( stream );
}
}
/**
* This method streams on various boundaries to inform the CellAppMgr for the
* purposes of load balancing.
*
*/
void Space::writeBounds( BinaryOStream & stream ) const
{
if (this->isShuttingDown())
{
return;
}
stream << this->id();
this->writeEntityBounds( stream );
this->writeChunkBounds( stream );
// Number of entities including ghosts.
stream << uint32( this->spaceEntities().size() );
}
/**
* This method writes the entity bounds to inform the CellAppMgr. These are
* used in load balancing.
*/
void Space::writeEntityBounds( BinaryOStream & stream ) const
{
// This needs to match CellAppMgr's CellData::updateEntityBounds
// Args are isMax and isY
this->writeEntityBoundsForEdge( stream, false, false ); // Left
this->writeEntityBoundsForEdge( stream, false, true ); // Bottom
this->writeEntityBoundsForEdge( stream, true, false ); // Right
this->writeEntityBoundsForEdge( stream, true, true ); // Top
}
可以看到writeEntityBounds用四组不同的参数执行了四次,对应我们之前说的四个方向的边界与负载收集。这个收集过程会利用到Space内部存储的一个十字链表,关于十字链表之前的内容已经讲过了,所以这里就不再详细阐述了。值得注意的是一个Entity会在这个十字链表上创建四个节点,来组成一个矩形,而这里的isMax,isY参数就是为了获取一个entity在isY轴方向的最大值isMax:
/**
* This method calculates boundary values for the boundaries that contains
* the real entities of this space at different sizes based on CPU. This is
* used by load balancing to calculate where to move partitions.
*
* @param stream The stream to write the levels to.
* @param isMax Indicates whether the lower or upper bound should be
* calculated.
* @param isY Indicates whether the X or Y/Z bound should be
* calculated.
*/
void Space::writeEntityBoundsForEdge( BinaryOStream & stream,
bool isMax, bool isY ) const
这个函数的开口会首先找到指定遍历方向上的第一个Real节点:
const RangeListNode * pNode =
isMax ? rangeList_.pLastNode() : rangeList_.pFirstNode();
float currCPULoad = 0.f;
int level = BalanceConfig::numCPUOffloadLevels();
float currCPULimit = BalanceConfig::cpuOffloadForLevel( --level );
float perEntityLoadShare = CellApp::instance().getPerEntityLoadShare();
bool hasEntities = false;
float lastEntityPosition = 0.f;
const float FUDGE_FACTOR = isMax ? -0.1f : 0.1f;
// Looking for the first real entity node
while (pNode && (!pNode->isEntity() ||
!EntityRangeListNode::getEntity( pNode )->isReal()))
{
pNode = pNode->getNeighbour( !isMax, isY );
}
找到第一个Real节点之后,开始按照这个轴方向持续遍历:
// Now iterate through the real entities but counting dense bursts
// within POS_DIFF_EPSILON range as one entity
while (pNode && (level >= 0))
{
const float POS_DIFF_EPSILON = 0.01f;
bool shouldCountThisEntity = true;
const RangeListNode * pNextNode = pNode->getNeighbour( !isMax, isY );
while (pNextNode && (!pNextNode->isEntity() ||
!EntityRangeListNode::getEntity( pNextNode )->isReal()))
{
pNextNode = pNextNode->getNeighbour( !isMax, isY );
}
const Entity * pEntity = EntityRangeListNode::getEntity( pNode );
const float entityPosition = pEntity->position()[ isY * 2 ];
currCPULoad += pEntity->profiler().load() + perEntityLoadShare;
const Entity * pNextEntity = NULL;
float nextEntityPosition = 0.f;
if (pNextNode)
{
pNextEntity = EntityRangeListNode::getEntity( pNextNode );
nextEntityPosition = pNextEntity->position()[ isY * 2 ];
if (almostEqual(entityPosition, nextEntityPosition, POS_DIFF_EPSILON))
{
shouldCountThisEntity = false;
}
}
if (shouldCountThisEntity)
{
if (currCPULoad > currCPULimit)
{
const float limitPosition = pNextEntity ?
(nextEntityPosition + entityPosition) / 2 :
entityPosition + FUDGE_FACTOR;
stream << limitPosition << currCPULoad;
currCPULimit = BalanceConfig::cpuOffloadForLevel( --level );
}
hasEntities = true;
lastEntityPosition = entityPosition;
}
pNode = pNextNode;
}
遍历的过程中会对currCPULoad进行更新,加上当前Entity记录的负载pEntity->profiler().load(),同时加上每个Entity都会有的一个轻量级负载perEntityLoadShare。
每当当前的currCPULoad大于指定的currCPULimit之后就会记录一行,此时分割点为当前Entity与下一个Entity的中间点,记录之后会更新新的currCPULimit,这个更新机制比较奇特,居然采取的是指数,根据这个计算公式和相关常量,currCPULimit的取值依次为[1/32,1/16,1/8,1/4,1/2],这样累加起来与1基本相等:
BW_OPTION_RO( int, numCPUOffloadLevels, 5 );
const float LEVEL_STEP = 0.5f;
/**
* This method returns the amount of CPU to attempt to offload for each
* entity bounds level.
*/
float BalanceConfig::cpuOffloadForLevel( int level )
{
// Each level offloads twice as much as the previous with the last level
// offloading maxCPUOffload.
return maxCPUOffload() * powf( LEVEL_STEP, level );
}
注意这里更新currCPULimit的时候,currCPULoad并没有清零。
如果这个轴方向相邻的两个Real Entity坐标基本一样,则划定新边界的时候最好把这两个Entity打包在一起,所以这里会根据pNextNode来计算shouldCountThisEntity,只有两者相隔一段距离之后才考虑记录到stream中。
注意到EntityBoundLevels::updateFromStream执行反序列化的时候,level是从大到小遍历的,与我们的序列化顺序相反,所以最终存储到CellAppMgr里的EntityBoundLevels的cpuLoad分界值是逆序的,即[1/2,1/4,1/8,1/16,1/32]:
/**
* This method reads in the entity bounds from a stream.
*/
void EntityBoundLevels::updateFromStream( BinaryIStream & data )
{
// This needs to match CellApp's Space::writeEntityBounds.
for (int isMax = 0; isMax <= 1; ++isMax)
{
for (int isY = 0; isY <= 1; ++isY)
{
for (int level = entityBounds_.size() - 1; level >= 0; --level )
{
float pos;
float load;
data >> pos >> load;
entityBounds_[ level ].range1D( isY )[ isMax ] = pos;
entityLoads_[ level ].range1D( isY )[ isMax ] = load;
}
}
}
}
所以选取新分界线的时候会选择第一个负载不大于loadDiff的,因为这个entityLoads_存储的值是降序的:
/**
* This method returns the entity bound best suited to offload a desired
* amount. The largest level less than this load amount is chosen.
*/
float EntityBoundLevels::entityBoundForLoadDiff( float loadDiff,
bool isHorizontal, bool isMax, float defaultPosition ) const
{
const float absLoadDiff = fabs( loadDiff );
for (int level = 0;
level < (int)entityLoads_.size();
++level)
{
if (entityLoads_[ level ].range1D(
isHorizontal )[ isMax ] <= absLoadDiff)
{
return entityBounds_[ level ].range1D( isHorizontal )[ isMax ];
}
}
return defaultPosition;
}
如果这个要变动的区域对应的负载太小导致调整效果不明显的话,则依赖后续的loadBalance来继续调整。
注意到每次负载上报的时候,只有CellData的EntityBoundLevels会被更新,InternalNode里对应的成员变量则不会被修改,这些InternalNode在执行负载均衡的时候是需要获取当前总体的EntityBoundLevels信息的,这些信息的收集则是在负载均衡开头的所有节点的updateLoad函数中:
/**
* This method updates the load associated with nodes in this sub-tree.
*/
float InternalNode::updateLoad()
{
totalLoad_ = pLeft_->updateLoad() + pRight_->updateLoad();
totalSmoothedLoad_ =
pLeft_->totalSmoothedLoad() + pRight_->totalSmoothedLoad();
numRetiring_ = pLeft_->numRetiring() + pRight_->numRetiring();
areaNotLoaded_ = pLeft_->areaNotLoaded() + pRight_->areaNotLoaded();
minLoad_ = std::min( pLeft_->minLoad(), pRight_->minLoad() );
maxLoad_ = std::max( pLeft_->maxLoad(), pRight_->maxLoad() );
entityBoundLevels_.merge( pLeft_->entityBoundLevels(),
pRight_->entityBoundLevels(),
isHorizontal_, position_ );
// 省略后续一些无关代码
}
/**
* This method merges bound levels data from left and right nodes
* into this one
*/
void EntityBoundLevels::merge( const EntityBoundLevels & left,
const EntityBoundLevels & right,
bool isHorizontal, float partitionPosition )
{
// Is the partition line horizontal?
if (isHorizontal)
{
this->mergeTwoBranches( left, right, false, false );
this->mergeTwoBranches( left, right, false, true );
this->takeSingleBranch( left, true, false, partitionPosition );
this->takeSingleBranch( right, true, true, partitionPosition );
}
else
{
this->mergeTwoBranches( left, right, true, false );
this->mergeTwoBranches( left, right, true, true );
this->takeSingleBranch( left, false, false, partitionPosition );
this->takeSingleBranch( right, false, true, partitionPosition );
}
}
这个UpdateLoad函数里会对左右两个子节点的EntityBoundLevel数据执行合并merge操作:
takeSingleBranch负责合并分割轴方向上的负载统计mergeTwoBranches负责合并非分割轴方向上的负载统计
这两个函数都有点长,这里就不去做具体介绍了。
与entityLimitInDirection对应的还有一个chunkLimitInDirection,这个方法会固定的扩张一个ghostDistance,而不考虑具体的负载,反正调整的不够的话等待下一次调整就好了:
/**
* This method returns the limit of movement based on the entity bounds in the
* direction of movement.
*/
float InternalNode::chunkLimitInDirection( BalanceDirection direction ) const
{
BSPNode * pFromNode = this->growingChild( direction );
// This also works for BALANCE_NONE case.
bool shouldGetMax = (pFromNode == pLeft_);
float ghostDistance = CellAppMgrConfig::ghostDistance();
if (shouldGetMax)
{
ghostDistance = -ghostDistance;
}
const Rect & chunkBounds = pFromNode->balanceChunkBounds();
return chunkBounds.range1D( isHorizontal_ )[ shouldGetMax ] + ghostDistance;
}
/**
* This method returns the chunk bounds that should be considered when load
* balancing.
*/
const BW::Rect & CellData::balanceChunkBounds() const
{
const float FLOATING_POINT_TOLERANCE = 1.f; // Avoid floating point issues
BW::Rect desiredRect( range_ );
desiredRect.safeInflateBy(
CellAppMgrConfig::ghostDistance() - FLOATING_POINT_TOLERANCE );
return chunkBounds_.contains( desiredRect ) ?
chunkBounds_ : range_;
}
这里的chunkBounds_代表当前Cell已经加载的场景资源范围,而range_则是期望加载的场景资源范围,一般来说chunkBounds_会比range_扩充一个CellAppMgrConfig::ghostDistance()。
最后会同时考虑entityLimitInDirection与chunkLimitInDirection的结果,选取的是调整量最小的那个新边界:
/**
* This helper method returns the limit to move to based on the direction
* of movment.
*/
static float closestLimit( float position, float limit1, float limit2,
BalanceDirection direction )
{
return (direction == BALANCE_LEFT) ?
std::min( position, std::max( limit1, limit2 ) ) :
std::max( position, std::min( limit1, limit2 ) );
}
Cell的负载记录
每个 entity 上面都有一个 profiler,基本上每个处理单独一个Entity逻辑的地方都会都调用这个profiler进行来记录消耗。记录的时候使用了宏和RAII,每次需要记录的时候在函数第一行加一下AUTO_SCOPED_THIS_ENTITY_PROFILE就好了,下面的Entity::sendMessageToReal就是一个非常简单的例子:
#define AUTO_SCOPED_ENTITY_PROFILE( ENTITY_PTR ) \
EntityProfiler::AutoScopedHelper< BaseOrEntity > \
_autoEntityProfile( ENTITY_PTR )
#define AUTO_SCOPED_THIS_ENTITY_PROFILE \
AUTO_SCOPED_ENTITY_PROFILE( this )
/**
* This class is a helper for auto-scoped profiling macros
*/
template < class ENTITY >
class AutoScopedHelper
{
public:
AutoScopedHelper( const ENTITY * pEntity ) : pEntity_()
{
if (pEntity)
{
pEntity_ = pEntity;
pEntity->profiler().start();
}
}
~AutoScopedHelper()
{
if (pEntity_)
{
pEntity_->profiler().stop();
}
}
private:
ConstSmartPointer< ENTITY > pEntity_;
};
bool Entity::sendMessageToReal( const MethodDescription * pDescription,
ScriptTuple args )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
// 省略具体业务代码
}
这里的start/stop其实就是简单的记录一下时间戳差值,考虑了递归的情况:
INLINE void EntityProfiler::start() const
{
if (callDepth_++ > 0)
{
// we're already profiling
return;
}
startTime_ = timestamp();
}
INLINE void EntityProfiler::stop() const
{
MF_ASSERT( callDepth_ > 0 );
if (--callDepth_ > 0)
{
// we were nested inside another start/stop
return;
}
uint64 dt = timestamp() - startTime_;
elapsedTickTime_ += dt;
startTime_ = 0;
}
每个gametick,都会调用EntityProfiler::tick来重新计算每个entity的cpu load,注意这里会更新最大负载,平滑负载等各项负载消息:
void CellApp::updateLoad()
{
uint64 lastTickTimeInStamps = this->calcTickPeriod();
double tickTime = stampsToSeconds( lastTickTimeInStamps );
this->tickProfilers( lastTickTimeInStamps );
// 省略后续代码
}
/**
* This method ticks entity and entity type profilers
*/
void CellApp::tickProfilers( uint64 lastTickInStamps )
{
cells_.tickProfilers( lastTickInStamps, Config::loadSmoothingBias() );
EntityType::tickProfilers( totalEntityLoad_, totalAddedLoad_ );
}
/**
* This method ticks profilers on all the cells
*/
void Cells::tickProfilers( uint64 tickDtInStamps, float smoothingFactor )
{
Container::iterator iCell = container_.begin();
while (iCell != container_.end())
{
iCell->second->tickProfilers( tickDtInStamps, smoothingFactor );
++iCell;
}
}
/**
* This method ticks profilers on the real entities on a Cell instance.
*/
void Cell::tickProfilers( uint64 tickDtInStamps, float smoothingFactor )
{
Cell::Entities::iterator iEntity = realEntities_.begin();
while (iEntity != realEntities_.end())
{
EntityProfiler & profiler = (*iEntity)->profiler();
EntityTypeProfiler & typeProfiler = (*iEntity)->pType()->profiler();
profiler.tick( tickDtInStamps, smoothingFactor, typeProfiler );
profiler_.addEntityLoad( profiler.load(), profiler.rawLoad() );
++iEntity;
}
profiler_.tick();
}
/**
* This method should be called every tick
* to recalculate current smoothed load
*/
void EntityProfiler::tick( uint64 tickDtInStamps,
float smoothingFactor,
EntityTypeProfiler &typeProfiler )
{
// How many ticks it takes smoothed load to get from
// a' to a" with error <= 1% ( |a" - a'| / 100 )?
// numTicks = math.ceil( math.log( 0.01, (1 - SMOOTHING_FACTOR) ) )
//
// What should SMOOTHING_FACTOR be for smoothed load to get from a' to a"
// in numTicks with error <= 1% ( |a" - a'| / 100 )?
// SMOOTHING_FACTOR = 1.0 - math.pow( 0.01, (1.0 / numTicks) )
float rawLoad = (float)((double)elapsedTickTime_ / (double)tickDtInStamps);
currSmoothedLoad_ = smoothingFactor * rawLoad + \
(1.f - smoothingFactor) * \
currSmoothedLoad_;
currAdjustedLoad_ = currSmoothedLoad_;
// Calculate added load and apply min artificial load if necessary
float addedLoad = 0.f;
if (artificialMinLoad_ > currSmoothedLoad_)
{
addedLoad = artificialMinLoad_ - currSmoothedLoad_;
currAdjustedLoad_ = artificialMinLoad_;
}
currRawLoad_ = rawLoad;
if (maxRawLoad_ < rawLoad)
{
maxRawLoad_ = rawLoad;
}
elapsedTickTime_ = 0;
typeProfiler.addEntityLoad( currAdjustedLoad_, currRawLoad_, addedLoad );
}
其实除了每个Entity上会挂在一个profiler之外,每个EntityType也有一个profiler,也就是上面的typeProfiler。然后每个Cell也有一个专属的profiler来记录当前Cell内所有的Entity的负载总和。除了调整Cell间边界会使用到Cell的负载之外,CellAppMgr创建新Cell通过findBestCellApp选择最佳CellApp的时候也会使用这个负载信息。这个函数内会遍历指定资源组CellAppGroup内的所有CellApp,选取其中评分最高的作为新Cell的创建位置:
CellApp * CellApps::findBestCellApp( const Space * pSpace,
const CellAppGroup * pExcludedGroup ) const
{
MF_ASSERT( pSpace );
CellApp * pBest = NULL;
Map::const_iterator iter = map_.begin();
while (iter != map_.end())
{
CellApp * pApp = iter->second;
if (((pExcludedGroup == NULL) || (pApp->pGroup() != pExcludedGroup)) &&
!pApp->isRetiring())
{
if (cellAppComparer_.isValidApp( pApp, pSpace ) &&
cellAppComparer_.isABetterCellApp( pBest, pApp, pSpace ))
{
pBest = pApp;
}
}
++iter;
}
return pBest;
}
/**
* This method compares two CellApps, returning true if the second is better
* than the first, and false otherwise.
* The result is calculated by comparing the CellApps on each criteria, in
* order of their priority, until they are found to differ on a criterion.
*/
bool CellAppComparer::isABetterCellApp(
const CellApp * pOld, const CellApp * pNew, const Space * pSpace ) const
{
if (pOld == NULL)
{
return true;
}
Scorers::const_iterator iter = attributes_.begin();
while (iter != attributes_.end())
{
const CellAppScorer & attribute = **iter;
float result = attribute.compareCellApps( pOld, pNew, pSpace );
if (result < 0.f)
{
return false;
}
else if (result > 0.f)
{
return true;
}
++iter;
}
return false;
}
这里的isABetterCellApp并不只考虑之前提到的Entity负载,这里的attributes_存储了不同的打分函数,然后根据指定的优先级来依次选择最优的。打分函数的构造和优先级排序在CellAppComparer的初始化函数中:
void CellAppComparer::init()
{
const char * sectionName = "cellAppMgr/metaLoadBalancePriority";
DataSectionPtr pSection = BWConfig::getSection( sectionName );
if (!pSection)
{
ERROR_MSG( "CellAppComparer::init: Failed to open %s\n",
sectionName );
return;
}
DataSectionIterator iter;
iter = pSection->begin();
int numScorers = 0;
CONFIG_INFO_MSG( "Meta load-balance prioritisation:\n" );
while (iter != pSection->end())
{
if (this->addScorer( iter.tag(), *iter ))
{
CONFIG_INFO_MSG( " %d: %s",
++numScorers, iter.tag().c_str() );
}
++iter;
}
}
目前实现的打分函数包括如下几种:
limitedSpaces比较的是不同的CellApp之间的Cell数量多少cellAppLoad比较的是我们之前统计的Cell的负载groupLoad比较的是当前CellApp所属资源组的平均负载CellCellTrafficScorer比较的是当前Space在同一个物理机上的Cell数量,baseCellTraffic比较的是当前CellApp的Ip是否与当前Space的第一次创建时的Ip一致,这样在同一台物理机器上可以避免跨机通信的损耗
/**
* This method generates the criteria on which CellApps are scored.
*/
bool CellAppComparer::addScorer( const BW::string & name,
const DataSectionPtr & pSection )
{
CellAppScorer * pScorer;
if (name == "limitedSpaces")
{
pScorer = new LimitedSpacesScorer;
}
else if (name == "cellAppLoad")
{
pScorer = new CellAppLoadScorer;
}
else if (name == "groupLoad")
{
pScorer = new CellAppGroupLoadScorer;
}
else if (name == "cellCellTraffic")
{
pScorer = new CellCellTrafficScorer;
}
else if (name == "baseCellTraffic")
{
pScorer = new BaseCellTrafficScorer;
}
else
{
ERROR_MSG( "CellAppComparer::addScorer: "
"Unknown meta load-balancing priority option '%s'\n",
name.c_str() );
return false;
}
if (!pScorer->init( pSection ))
{
bw_safe_delete( pScorer );
return false;
}
else
{
this->addScorer( pScorer );
return true;
}
}
Cell的删除
常规来说,如果一个Cell的负载太低了的话,就会考虑将这个Cell所承载的区域完全转移到其兄弟Cell上,这样可以降低通信成本。这部分逻辑也在CellAppMgr::metaLoadBalance里面,最后面的appGroups.checkForUnderloaded处理的就是这些低负载的Cell:
/**
* This method checks whether there needs to be any migration of spaces between
* cell applications. If so, it will take action.
*/
void CellAppMgr::metaLoadBalance()
{
// Identify the CellApp groups used in meta-load-balancing. These are the
// groups of CellApps such that normal load-balancing can balance their
// loads.
CellAppGroups appGroups( cellApps_ );
const float mergeThreshold =
this->avgCellAppLoad() + CellAppMgrConfig::metaLoadBalanceTolerance();
// Do meta-balance groups need to be joined?
appGroups.checkForOverloaded( mergeThreshold );
// Should more CellApps be added to help with loading?
bool hasLoadingSpaces = this->checkLoadingSpaces();
if (!hasLoadingSpaces)
{
// Are there underloaded groups who should have Cells retired?
appGroups.checkForUnderloaded( CellAppLoadConfig::lowerBound() );
}
}
这里计算是否要去移除一个低负载Cell的时候并没有只考虑单独的一个Cell的负载,而是从其资源组CellAppGroup整体考虑的。这里会尝试计算这个CellAppGroup的整体负载除以当前CellApp数量减一之后得到的平均负载是否小于指定值,如果小于则代表可以选择其中负载最低的CellApp去移除:
/**
* This method checks whether this group is underloaded and removes a CellApp
* if necessary.
*/
void CellAppGroup::checkForUnderloaded( float loadLowerBound )
{
// If the expected average load with one less CellApp is lower than the
// input threshold, the least loaded CellApp is retired from the group.
if (this->avgLoad( 1 ) < loadLowerBound)
{
CellApp * pLeastLoaded = NULL;
float leastLoad = FLT_MAX;
iterator iter = this->begin();
while (iter != this->end())
{
CellApp * pApp = *iter;
if ((!pApp->hasOnlyRetiringCells() &&
pApp->smoothedLoad() < leastLoad))
{
pLeastLoaded = *iter;
leastLoad = pLeastLoaded->smoothedLoad();
}
++iter;
}
if (pLeastLoaded)
{
pLeastLoaded->retireAllCells();
}
}
}
这里看来与之前的设想不太一样,之前考虑的是移除Cell,结果发现执行的时候移除的是CellApp,这里会将此CellApp里的所有Cell都执行删除:
/**
* This method starts retiring all cells from this app.
*/
void CellApp::retireAllCells()
{
cells_.retireAll();
}
/**
* This method tells all cells to start retiring.
*/
void Cells::retireAll()
{
Container::iterator iter = cells_.begin();
while (iter != cells_.end())
{
(*iter)->startRetiring();
++iter;
}
}
/**
* This method starts the process of removing this cell.
*/
void CellData::startRetiring()
{
if (!numRetiring_)
{
INFO_MSG( "CellData::startRetiring: cell %u from space %u\n",
pCellApp_ ? pCellApp_->id() : 0,
pSpace_->id() );
if (pCellApp_)
{
pCellApp_->incCellsRetiring();
}
this->sendRetireCell( /*isRetiring:*/true );
numRetiring_ = 1;
}
}
/**
* This method sends a retireCell message to the app.
*/
void CellData::sendRetireCell( bool isRetiring ) const
{
INFO_MSG( "CellData::sendRetireCell: "
"Retiring cell %u from space %u. isRetiring = %d.\n",
pCellApp_ ? pCellApp_->id() : 0, pSpace_->id(), isRetiring );
Mercury::Bundle & bundle = this->cellApp().bundle();
bundle.startMessage( CellAppInterface::retireCell );
bundle << pSpace_->id();
bundle << isRetiring;
// Don't send immediately so that messages are aggregated.
this->cellApp().channel().delayedSend();
}
这里的删除逻辑叫retireCell,执行的时候会将当前CellData标记为numRetiring_ = 1,同时通过RPC通知具体的CellApp去执行retireCell。
如果一个CellData被标记为Retire状态的话,在InternalNode的balance函数里会执行边界调整,将所负责的区域全都转移到其兄弟节点上:
/**
* This method does that actual work of balancing.
*/
InternalNode::BalanceDirection InternalNode::doBalance( float loadSafetyBound )
{
bool shouldLimitToChunks =
CellAppMgrConfig::shouldLimitBalanceToChunks();
const bool isRetiring = pLeft_->isRetiring() || pRight_->isRetiring();
float loadDiff = 0.f;
// Do not move if we do not know if the CellApp has created the Cell.
// This avoids problems with other Cells offloading before the new Cell
// exists.
const bool childrenCreated = pLeft_->hasBeenCreated() &&
pRight_->hasBeenCreated();
// Check whether we should balance based on unloaded chunks
if (!isRetiring)
{
// 省略无retiring的相关代码
}
else
{
// 这里将loaddiff设置为1或者-1 代表把目标区域全都合并
loadDiff = pLeft_->isRetiring() ? 1.f : -1.f;
}
BalanceDirection balanceDir = this->dirFromLoadDiff( loadDiff );
BSPNode * pFromNode = this->growingChild( balanceDir );
const bool shouldMove = (balanceDir != BALANCE_NONE) &&
childrenCreated &&
(pFromNode->maxLoad() < loadSafetyBound);
float newPos = position_;
if (shouldMove)
{
float entityLimit = this->entityLimitInDirection( balanceDir,
loadDiff * balanceAggression_ );
float chunkLimit = shouldLimitToChunks ?
this->chunkLimitInDirection( balanceDir ) : entityLimit;
newPos = this->closestLimit( position_,
entityLimit, chunkLimit, balanceDir );
}
else
{
balanceDir = BALANCE_NONE;
}
position_ = newPos;
this->adjustAggression( balanceDir );
return balanceDir;
}
边界更新之后就慢慢等待所有的Entity执行转移操作。
然后这个CellData在执行负载均衡函数balance的时候如果发现自己正在Retire且当前没有Entity在所负责的区域上,则通知Space来执行删除:
/*
* Override from BSPNode.
*/
void CellData::balance( const BW::Rect & range,
float loadSafetyBound, bool isShrinking )
{
// Should not shrink if not yet created.
MF_ASSERT( hasBeenCreated_ || !isShrinking );
range_ = range;
isOverloaded_ = (this->cellApp().smoothedLoad() > loadSafetyBound);
bool hasArea = (range.xMin() < range.xMax()) &&
(range.yMin() < range.yMax());
// When a cell has no area, it should be removed if load balancing is trying
// to make it smaller.
if (!hasArea && (isShrinking || numRetiring_) && (this->numEntities() == 0))
{
Space::addCellToDelete( this );
}
}
这里的addCellToDelete只是添加到一个待删除数组中去,只有当Space::loadbalance的时候采取执行节点的删除:
// -----------------------------------------------------------------------------
// Section: Static methods
// -----------------------------------------------------------------------------
/**
* This static method is used during load balancing. If a cell has been fully
* removed, it adds itself to a set so that it can be deleted later.
*/
void Space::addCellToDelete( CellData * pCell )
{
cellsToDelete_.insert( pCell );
}
/**
* This method changes the geometry of cells to balance the load.
*/
void Space::loadBalance()
{
#ifndef BW_EVALUATION
// 省略之前已经展示过的负载均衡代码
// TODO: I don't think this is needed. isBalancing_ enforces this is not
// called recursively.
// Delete cells from this space that are no longer included. Do this
// after updateGeometry so that the cells being deleted know the latest
// layout.
{
// Take a copy here because this method is called recursively.
CellsToDelete copyOfCellsToDelete;
copyOfCellsToDelete.swap( cellsToDelete_ );
CellsToDelete::iterator iter = copyOfCellsToDelete.begin();
while (iter != copyOfCellsToDelete.end())
{
(*iter)->removeSelf();
++iter;
}
}
this->checkCellsHaveLoadedMappedGeometry();
isBalancing_ = false;
#endif
}
在最后的removeSelf中,调用BSPTree的节点删除操作,并最终删除自身,至此CellData的生命周期彻底完成:
/**
* This method removes this Cell from the system. By the end of this method,
* this object has been deleted.
*/
void CellData::removeSelf()
{
INFO_MSG( "CellData::removeSelf: Removed %u from space %u.\n",
pCellApp_ ? pCellApp_->id() : 0, pSpace_->id() );
pSpace_->eraseCell( this, true );
pSpace_ = NULL;
delete this;
}
Space的cell状态同步
一个分布式场景的所有Cell信息都存储在一棵BSP树中,这个树的状态仲裁者为CellAppMgr,同时在这个分布式场景的每个Cell中都会有一份Space副本:
/**
* This class is used to represent a cell.
*/
class Cell
{
public:
// Constructor/Destructor
Cell( Space & space, const CellInfo & cellInfo );
~Cell();
protected:
Space & space_;
ConstCellInfoPtr pCellInfo_;
};
这里的Space结构体类型是CellApp::Space,并不是之前看到过的CellAppMgr::Space,不过他们内部存储的数据基本是相同的,都是一个BSP树结构,具体实现这里就不贴出了。
根据这里Cell的定义可以看出,其实对于同一个CellApp上的同一Space的所有Cell,他们共享了同一个Space,所以这里的space_成员变量是一个引用。
这个Cell结构的创建时机在CellApp::addCell中,这个函数负责创建一个新的Cell,创建Cell的时候发现已经有对应的Space数据存在的情况下就会复用之前的Space,否则会创建一个新的,这样就保证了一个CellApp上对于同一个spaceID只需要维护同一份Space数据:
/**
* This method handles a message to add a cell to this cell application.
*/
void CellApp::addCell( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
// 省略开头一些无关代码
SpaceID spaceID;
data >> spaceID;
Space * pSpace = this->findSpace( spaceID );
if (pSpace)
{
INFO_MSG( "CellApp::addCell: Re-using space %d\n", spaceID );
pSpace->reuse();
}
else
{
pSpace = pSpaces_->create( spaceID );
}
INFO_MSG( "CellApp::addCell: Space = %u\n", spaceID );
MF_ASSERT( pSpace );
pSpace->updateGeometry( data );
CellInfo * pCellInfo = pSpace->findCell( interface_.address() );
if (pCellInfo)
{
Cell * pNewCell = pSpace->pCell();
if (pNewCell)
{
WARNING_MSG( "CellApp::addCell: "
"Cell did not fully remove; reusing.\n" );
MF_ASSERT( pCellInfo == &pNewCell->cellInfo() );
pNewCell->reuse();
}
else
{
pNewCell = new Cell( *pSpace, *pCellInfo );
cells_.add( pNewCell );
}
bool isFirstCell;
data >> isFirstCell;
bool isFromDB;
data >> isFromDB;
if (data.remainingLength() > 0)
{
pSpace->allSpaceData( data );
}
// The first cell in the first space.
if (isFirstCell && Config::useDefaultSpace() && spaceID == 1)
{
this->onGetFirstCell( isFromDB );
}
}
else
{
CRITICAL_MSG( "CellApp::addCell: Failed to add a cell for space %u\n",
spaceID );
}
}
Space数据的更新则是通过pSpace->updateGeometry( data )来做的:
/**
* This method handles a message from the server that updates geometry
* information.
*/
void Space::updateGeometry( BinaryIStream & data )
{
bool wasMulticell = !this->hasSingleCell();
// Mark them all to be deleted
{
// We could get rid of this step if we used a flip-flop value but it's
// simpler this way.
CellInfos::const_iterator iter = cellInfos_.begin();
while (iter != cellInfos_.end())
{
iter->second->shouldDelete( true );
++iter;
}
}
if (pCellInfoTree_ != NULL)
{
pCellInfoTree_->deleteTree();
}
BW::Rect rect(
-std::numeric_limits< float >::max(),
-std::numeric_limits< float >::max(),
std::numeric_limits< float >::max(),
std::numeric_limits< float >::max() );
pCellInfoTree_ = this->readTree( data, rect );
// Delete the cells that should be
//if(1)
{
CellInfos::iterator iter = cellInfos_.begin();
while (iter != cellInfos_.end())
{
CellInfos::iterator oldIter = iter;
++iter;
if (oldIter->second->shouldDelete())
{
// TODO: This assertion can be triggered. I believe this can
// occur if multiple cells are deleted at the same time. It
// would be good to confirm that this is not an issue with
// missing notifyOfCellRemoval calls.
// MF_ASSERT( oldIter->second->isDeletePending() );
cellInfos_.erase( oldIter );
}
}
}
// see if we are going to get rid of our own cell
if (pCell_)
{
if (pCell_->cellInfo().shouldDelete())
{
INFO_MSG( "Space::updateGeometry: Cell in space %u is going\n",
id_ );
}
else
{
pCell_->checkOffloadsAndGhosts();
}
}
// see if we want to expressly shut down this space now
if (wasMulticell)
{
this->checkForShutDown();
}
}
这个updateGeometry的函数体开头首先标记当前Space的所有CellInfo为等待删除状态,然后通过readTree将所有需要保留的CellInfo取消删除,最后再检查所有的CellInfo,对其中删除标记为仍然为true的进行删除。这里的readTree相当于反序列化Space:
/**
* This method reads a BSP from the stream.
*/
SpaceNode * Space::readTree( BinaryIStream & stream,
const BW::Rect & rect )
{
SpaceNode * pResult = NULL;
uint8 type = 0xFF;
stream >> type;
switch (type)
{
case 0:
case 1:
pResult =
new SpaceBranch( *this, rect,
stream, type == 0 /*isHorizontal*/ );
break;
case 2:
{
Mercury::Address addr;
stream >> addr;
CellInfo * pCellInfo = this->findCell( addr );
if (pCellInfo)
{
pCellInfo->shouldDelete( false );
pCellInfo->rect( rect );
pCellInfo->updateFromStream( stream );
pResult = pCellInfo;
}
else
{
pCellInfo = new CellInfo( id_, rect, addr, stream );
pResult = pCellInfo;
cellInfos_[ addr ] = pCellInfo;
}
break;
}
default:
ERROR_MSG( "Space::readTree: stream.error = %d. type = %d\n",
stream.error(), type );
MF_ASSERT( 0 );
break;
}
return pResult;
}
为了更好的理解这里的type以及对应的处理逻辑,此时我们再对照一下CellAppMgr上Space::AddCell时往下发送的数据:
/**
* This method creates a cell and adds it to this space.
*/
CellData * Space::addCell( CellApp & cellApp, CellData * pCellToSplit )
{
INFO_MSG( "Space::addCell: Space %u. CellApp %u (%s)\n",
id_, cellApp.id(), cellApp.addr().c_str() );
// 省略以前介绍过的创建cell的相关代码
Mercury::Bundle & bundle = cellApp.bundle();
bundle.startRequest( CellAppInterface::addCell,
new AddCellReplyHandler( cellApp.addr(), id_ ) );
this->addToStream( bundle );
bundle << isFirstCell_;
isFirstCell_ = false;
bundle << isFromDB_;
//prepare space data for sending
{
// bundle.startMessage( CellAppInterface::allSpaceData );
// bundle << id_;
bundle << (uint32)dataEntries_.size();
DataEntries::const_iterator iter = dataEntries_.begin();
while (iter != dataEntries_.end())
{
bundle << iter->first <<
iter->second.key << iter->second.data;
++iter;
}
}
cellApp.send();
return pCellData;
}
注意看这里的this->addToStream( bundle );,这里相当于将当前Space的全量数据都下发了:
/**
* This method adds this space to the input stream.
*
* @param stream The stream to add the space to.
* @param isForViewer Indicates whether the stream is being sent to CellApps or
* to SpaceViewer.
*/
void Space::addToStream( BinaryOStream & stream, bool isForViewer ) const
{
stream << id_;
if (pRoot_)
{
if (isForViewer)
{
stream << CellAppMgr::instance().numCellApps();
stream << CellAppMgr::instance().numEntities();
}
pRoot_->addToStream( stream, isForViewer );
}
}
/*
* Override from BSPNode.
*/
void CellData::addToStream( BinaryOStream & stream, bool isForViewer )
{
float smoothedLoad =
pCellApp_ ? pCellApp_->smoothedLoad() : 0.f;
stream << uint8( CM::BSP_NODE_LEAF ) << this->addr() << smoothedLoad;
if (!isForViewer)
{
// Preserve backwards compatibility for SpaceViewer.
stream << uint8( this->hasBeenCreated() );
}
else
{
if (pCellApp_)
{
stream << this->cellApp().id();
stream << this->cellApp().viewerPort();
}
else
{
stream << CellAppID( 0 ) << uint16( 0 );
}
entityBoundLevels_.addToStream( stream );
stream << chunkBounds_;
stream << (int8)this->isRetiring();
stream << isOverloaded_;
}
}
/**
* This method adds this subtree to the input stream.
*/
void InternalNode::addToStream( BinaryOStream & stream, bool isForViewer )
{
stream << uint8( isHorizontal_ ? BSP_NODE_HORIZONTAL : BSP_NODE_VERTICAL );
stream << position_;
if (isForViewer)
{
stream << this->avgLoad() << balanceAggression_;
}
pLeft_->addToStream( stream, isForViewer );
pRight_->addToStream( stream, isForViewer );
}
可以看出,每个Node的序列化开头都是当前节点的类型, 0,1都是内部节点,代表不同的分割方向, 2代表叶子节点:
enum BSPNodeType
{
BSP_NODE_HORIZONTAL,
BSP_NODE_VERTICAL,
BSP_NODE_LEAF
};
同时这个节点的序列化是一个递归的过程,直到叶子节点才停止。有了这些知识之后,我们猜测Space::readTree处理0,1时调用的SpaceBranch应该也是一个递归过程,跟进去看实现,果然是重新通过readTree做到了间接递归:
/**
* Constructor.
*/
SpaceBranch::SpaceBranch( Space & space,
const BW::Rect & rect,
BinaryIStream & stream, bool isHorizontal ) :
isHorizontal_( isHorizontal )
{
stream >> position_;
BW::Rect leftRect = rect;
BW::Rect rightRect = rect;
if (isHorizontal_)
{
leftRect.yMax_ = position_;
rightRect.yMin_ = position_;
}
else
{
leftRect.xMax_ = position_;
rightRect.xMin_ = position_;
}
pLeft_ = space.readTree( stream, leftRect );
pRight_ = space.readTree( stream, rightRect );
}
因此每次AddCell时,都会往对应的CellApp发送当前Space的全量最新数据下去。但是这里AddCell里好像没有对当前Space的其他Cell的同步通知处理,这个广播通知藏在了Space::informCellAppsOfGeometry这个函数里,这里会遍历当前的所有cells进行全量数据的推送:
/**
*
*/
void Space::informCellAppsOfGeometry( bool shouldSend )
{
Cells::iterator iter = cells_.begin();
while (iter != cells_.end())
{
Mercury::Bundle & bundle = (*iter)->cellApp().bundle();
bundle.startMessage( CellAppInterface::updateGeometry );
this->addToStream( bundle );
// TODO: This could be optimised so that we do not send this if it
// hasn't changed. This would be particularly true for single cell
// spaces.
// The send may be delayed so that all space updates are sent in one
// send.
if (shouldSend)
{
(*iter)->cellApp().send();
}
++iter;
}
}
这里发送的RPC数据会调用到之前介绍过的Space::UpdateGeometry接口,也就是Space的全量重建接口。至于Space::informCellAppsOfGeometry的调用时机,则在负载均衡函数中:
/**
* This method performs load balancing on each of the spaces.
*/
void CellAppMgr::loadBalance()
{
// Balance all spaces
{
Spaces::iterator iter = spaces_.begin();
while (iter != spaces_.end())
{
if (g_shouldLoadBalance)
{
iter->second->loadBalance();
}
else
{
iter->second->informCellAppsOfGeometry( /*shouldSend*/ false );
}
iter++;
}
}
// This is done after balancing each space so that the messages to each
// CellApp are aggregated into a single bundle.
cellApps_.sendToAll();
}
Space::loadBalance()内部也会调用一次informCellAppsOfGeometry(false),这里的shouldSend的意思是是否需要立即发送,为false的话则只是添加到对应cellapp的发送缓冲区,等待帧末尾统一发送,这样可以避免多次修改Space时的多次发送。
当一个Cell被删除时,会有一个与updategeometry不同的广播消息会对当前Space的所有CellApp进行广播:
/**
* This method removes the input cell from this space.
*/
void Space::eraseCell( CellData * pCell, bool notifyCellApps )
{
cells_.erase( pCell );
if (notifyCellApps)
{
cells_.notifyOfCellRemoval( id_, *pCell );
}
if (pRoot_)
{
pRoot_ = (pRoot_ != pCell ) ? pRoot_->removeCell( pCell ) : NULL;
this->updateRanges();
}
}
/**
*
*/
void Cells::notifyOfCellRemoval( SpaceID spaceID, CellData & removedCell ) const
{
const Mercury::Address removedAddress = removedCell.addr();
Mercury::Bundle & bundleToRemoved = removedCell.cellApp().bundle();
bundleToRemoved.startMessage( CellAppInterface::removeCell );
bundleToRemoved << spaceID;
Container::const_iterator iter = cells_.begin();
while (iter != cells_.end())
{
CellData * pCell = *iter;
// Should have been removed already.
MF_ASSERT( pCell != &removedCell );
CellApp & cellApp = pCell->cellApp();
bundleToRemoved << cellApp.addr();
Mercury::Bundle & bundle = cellApp.bundle();
bundle.startMessage( CellAppInterface::notifyOfCellRemoval );
bundle << spaceID;
bundle << removedAddress;
cellApp.channel().delayedSend();
++iter;
}
removedCell.cellApp().channel().delayedSend();
}
当每个相关CellApp接收到这个RPC时,会将消息路由到要删除的Cell上,给这个Cell增加要删除的标记:
/**
* This method is called by the CellAppMgr to inform us that this cell should
* be removed. It also sends a list of CellApp addresses. This cell should not
* be deleted until all these CellApps have confirmed that no more entities
* are on their way.
*/
void Cell::removeCell( BinaryIStream & data )
{
INFO_MSG( "Cell::removeCell(%u)\n", this->spaceID() );
MF_ASSERT( !isRemoved_ );
isRemoved_ = true;
while (data.remainingLength())
{
Mercury::Address addr;
data >> addr;
RemovalAcks::iterator iAck = receivedAcks_.find( addr );
if (iAck != receivedAcks_.end())
{
receivedAcks_.erase( iAck );
}
else
{
pendingAcks_.insert( addr );
}
}
}
至此,一个Space的Cell创建、Cell删除、Cell边界更新都会将最新的Space数据推送到所有的相关CellApp上。