BigWorld 的迁移
对于常见的非分布式游戏场景实例,一个Space肯定是被单一进程托管的,玩家、怪物等ActorEntity只需要考虑进入场景和离开场景这两个操作。但是对于分布式大世界场景,一个Space是由分布在多个进程中的CellSpace拼接而成的,每个CellSpace负责一个与其他CellSpace不重叠的矩形区域,每个ActorEntity都根据其位置坐标绑定到覆盖了这个位置的CellSpace。但是由于ActorEntity的位置坐标是动态的,同时CellSpace的覆盖区域也会被负载均衡所调整,所以一个ActorEntity所归属的CellSpace并不是固定的,而是不断的变化之中。当一个ActorEntity所属的CellSpace发生改变时,这个ActorEntity就需要从之前的CellSpace移动到新的CellSpace,这个移动的过程就叫做迁移Migration。整个迁移过程其实跟切换场景有很大的相似之处,都是需要在迁移之前打包好当前ActorEntity的所有数据,然后从当前CellSpace移除此ActorEntity,然后通过RPC将这个ActorEntity的数据发送到新CellSpace,利用打包数据对这个ActorEntity进行重建。
RealEntity的迁移
RealEntity的迁移有两种:同场景内的迁移和不同场景内的迁移。同场景内的迁移主要是当前RealEntity所归属的Cell发生变化时,触发向周围的Cell进行迁移。同场景内的迁移属于Real-Ghost管理相关内容, 这部分是引擎自动维护的,业务层不需要处理。另外一种是不同场景的迁移,这部分与业务逻辑强相关,引擎层只提供相关接口。
在BigWorld中, Real-Ghost的管理基本都被EntityGhostMaintainer这个类型负责,其逻辑入口是EntityGhostMaintainer::check,会在游戏主循环中被调用到。 其调用链为CellApp::handleGameTickTimeSlice => CellApp::checkOffloads => Cells::checkOffloads => Cell::checkOffloadsAndGhosts => OffloadChecker::run => EntityGhostMaintainer::check:
/**
* This method handles the game tick time slice.
*/
void CellApp::handleGameTickTimeSlice()
{
AUTO_SCOPED_PROFILE( "gameTick" );
// 暂时忽略一些无关代码
this->checkOffloads();
// 暂时忽略一些无关代码
}
/**
* This method checks whether any entities should be offloaded or ghosts
* created or destroyed.
*/
void CellApp::checkOffloads()
{
if ((time_ % Config::checkOffloadsPeriodInTicks()) == 0)
{
cells_.checkOffloads();
}
}
/**
* This method checks whether any cells should offload any entities or create
* or destroy any ghosts.
*
* It also destroys any cells that can now safely be destroyed.
*/
void Cells::checkOffloads()
{
Container toDelete;
Container::iterator iCell = container_.begin();
while (iCell != container_.end())
{
Cell * pCell = iCell->second;
bool isReadyForDeletion = pCell->checkOffloadsAndGhosts();
if (isReadyForDeletion)
{
toDelete[ iCell->first ] = pCell;
}
++iCell;
}
// 省略一些无关代码
}
/**
* We want to periodically update the CellApps that we have created ghosts
* for our reals on, and also offload them there if appropriate.
*
* @return True if this cell should be killed, otherwise false.
*/
bool Cell::checkOffloadsAndGhosts()
{
OffloadChecker offloadChecker( *this );
offloadChecker.run();
return this->isReadyForDeletion();
}
/**
* This method performs the offloads and ghosts check.
*/
void OffloadChecker::run()
{
// If this space is shutting down, don't offload any entities so that
// they are not lost when the cells are shut down.
if (cell_.space().isShuttingDown())
{
return;
}
static ProfileVal localProfile( "boundaryCheck" );
START_PROFILE( localProfile );
Cell::Entities::iterator iEntity = cell_.realEntities().begin();
while (iEntity != cell_.realEntities().end())
{
EntityPtr pEntity = *iEntity;
MF_ASSERT( &cell_ == &(pEntity->cell()) );
EntityGhostMaintainer entityGhostMaintainer( *this, pEntity );
entityGhostMaintainer.check();
++iEntity;
}
this->sendOffloads();
STOP_PROFILE_WITH_DATA( localProfile, offloadList_.size() );
}
CellApp::checkOffloads并不是每帧都会触发后续调用,而是以Config::checkOffloadsPeriodInTicks()为周期去检查。 Cells::checkOffloads负责触发每个Cell的检查,然后OffloadChecker::run负责遍历每个RealEntity来执行EntityGhostMaintainer::check。 在这个check函数中,checkEntityForOffload负责处理这个RealEntity的迁移, createOrUnmarkRequiredHaunts负责往周围的CellSpace创建GhostEntity,最后的deleteMarkedHaunts负责销毁已经不再需要的GhostEntity:
/**
* This method checks through this entity's ghosts and checks whether the real
* entity needs to be offloaded elsewhere.
*/
void EntityGhostMaintainer::check()
{
if (this->cell().shouldOffload())
{
this->checkEntityForOffload();
}
// We mark all the haunts for this entity, and then we unmark all the valid
// ones. The invalid ghosts are left marked and are deleted.
bool doesOffloadDestinationHaveGhost = this->markHaunts();
this->createOrUnmarkRequiredHaunts();
MF_ASSERT( (pOffloadDestination_ == NULL) ||
doesOffloadDestinationHaveGhost ||
(numGhostsCreated_ == 1) );
this->deleteMarkedHaunts();
}
我们继续跟进checkEntityForOffload的实现,这里会使用pCellAt函数来计算当前RealEntity的位置对应的CellSpace,如果这个CellSpace不是当前的CellSpace的话,就可以考虑加入到等待迁移列表中:
/**
* This method checks whether the given entity requires offloading to another
* cell.
*/
void EntityGhostMaintainer::checkEntityForOffload()
{
// Find out where we really want to live.
const Vector3 & position = pEntity_->position();
const CellInfo * pHomeCell = pEntity_->space().pCellAt(
position.x, position.z );
if ((pHomeCell == NULL) || (pHomeCell == &(this->cell().cellInfo())))
{
// Don't offload to ourselves.
return;
}
if (pHomeCell->isDeletePending())
{
// Don't offload to a cell that is being deleted.
return;
}
CellAppChannel * pOffloadDestination =
CellAppChannels::instance().get( pHomeCell->addr() );
if (!pOffloadDestination || !pOffloadDestination->isGood())
{
// Don't offload if other cell has failed or died.
return;
}
// OK to offload now.
pOffloadDestination_ = pOffloadDestination;
offloadChecker_.addToOffloads( pEntity_, pOffloadDestination_ );
}
/**
* This method adds an entity to the offload list.
*
* @param pEntity The entity to add.
* @param pOffloadDestination The destination channel.
*/
void OffloadChecker::addToOffloads( EntityPtr pEntity,
CellAppChannel * pOffloadDestination )
{
offloadList_.push_back( OffloadEntry( pEntity, pOffloadDestination ) );
}
这里的设计与我们之前设计的无缝迁移有点不同,之前的设计里是RealEntity在当前CellSpace的ghost_rect外时才触发迁移,而Bigworld里是只要在当前CellSpace的real_rect外就开始准备迁移,这种管理方式对于一些反复在边界上进行移动的情况很不友好,会触发频繁的迁移。
在OffloadChecker::run()中会调用OffloadChecker::sendOffloads()来遍历当前的等待迁移列表,来逐个通知迁移:
/**
* This method sends all the pending offloads.
*/
void OffloadChecker::sendOffloads()
{
OffloadList::iterator iOffload = offloadList_.begin();
while (iOffload != offloadList_.end())
{
this->sendOffload( iOffload );
++iOffload;
}
}
/**
* This method sends an individual offload, subject to sufficient ghosting
* capacity being present on the channels and the channels being in a good
* state.
*
* @param iOffload The iterator pointing to the entry in the offload list
* to process.
*/
void OffloadChecker::sendOffload( OffloadList::const_iterator iOffload )
{
EntityPtr pEntity = iOffload->first;
CellAppChannel * pOffloadDestination = iOffload->second;
cell_.offloadEntity( pEntity.get(), pOffloadDestination,
/* isTeleport: */ false );
}
最终会调用到Cell::offloadEntity这个接口来处理迁移逻辑。实际上跨场景传送的接口最终也是通过Cell::offloadEntity来驱动迁移的,这里我们来跟进一下传送时的具体流程,其入口是Base::teleportOther:
/**
* This method handles a message that requests to teleport another entity to
* the space of this entity.
*/
void Base::teleportOther( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
const BaseAppIntInterface::teleportOtherArgs & args )
{
const EntityMailBoxRef & teleportingMB = args.cellMailBox;
if (pCellEntityMailBox_ == NULL)
{
ERROR_MSG( "Base::teleportOther( %u ): "
"pCellEntityMailBox_ is NULL while teleporting %u\n",
id_, teleportingMB.id );
return;
}
if (pCellEntityMailBox_->address().ip == 0)
{
ERROR_MSG( "Base::teleportOther( %u ): Cell mailbox has no address while "
"teleporting %u. isGetCellPending_ = %d\n",
id_, teleportingMB.id, this->isGetCellPending() );
return;
}
Mercury::Channel & channel = BaseApp::getChannel( teleportingMB.addr );
Mercury::Bundle & bundle = channel.bundle();
CellAppInterface::teleportArgs & rTeleportArgs =
CellAppInterface::teleportArgs::start( bundle, teleportingMB.id );
rTeleportArgs.dstMailBoxRef = pCellEntityMailBox_->ref();
channel.send();
}
比较神奇的是这里的Base并不是要传送的RealEntity的Base,而是传送目标场景里的一个RealEntity对应的Base。这里会使用中转Base对应的CellEntity的Cell地址来填充传送目标地址信息。按道理传送只需要目标场景的通信地址即可,但是这里却需要一个已经在目标场景的RealEntity来中转。具体为啥设计我也不怎么清楚,反正能用就行。
/**
* This method handles a message telling us to teleport to another space
* indicated by a cell mailbox.
*/
void Entity::teleport( const CellAppInterface::teleportArgs & args )
{
MF_ASSERT( pReal_ );
EntityPtr pThis = this;
pThis->callback( "onTeleport" );
if (pReal_)
{
pReal_->teleport( args.dstMailBoxRef );
}
else
{
ERROR_MSG( "Entity::teleport( %u ): "
"No longer real after onTeleport callback\n",
id_ );
}
}
/**
* This method handles a message telling us to teleport to another space
* indicated by a cell mailbox.
*/
void RealEntity::teleport( const EntityMailBoxRef & dstMailBoxRef )
{
Vector3 direction( 0.f, 0.f, 0.f );
if (!this->teleport( dstMailBoxRef, Entity::INVALID_POSITION, direction ))
{
ERROR_MSG( "RealEntity::teleport: Failed\n" );
PyErr_Print();
}
}
当要传送的RealEntity接收到这个teleport的RPC之后,会调用一个重载的teleport函数,这个函数实现比较长,处理了多种参数组合,这里我们只关心目前的跨场景传送,忽略掉其他分支的代码:
/**
* This method allows scripts to teleport this entity to another
* location in the world - possibly to another space.
* It sets a Python error if it fails.
*/
bool RealEntity::teleport( const EntityMailBoxRef & nearbyMBRef,
const Vector3 & position, const Vector3 & direction )
{
// 忽略一些异常处理
if (nearbyMBRef.component() != nearbyMBRef.CELL)
{
PyErr_SetString( PyExc_TypeError, "Entity.teleport() "
"Cannot teleport near to non-cell entities" );
return false;
}
// 忽略一些无关代码
// Note: This may actually be a channel that leads back to this CellApp, but
// this is OK. Having a circular channel works fine when single-threaded.
CellAppChannel * pChannel =
CellAppChannels::instance().get( nearbyMBRef.addr );
if (pChannel == NULL)
{
PyErr_SetString( PyExc_ValueError, "Entity.teleport() "
"Invalid destination mailbox" );
return false;
}
entity().relocated();
// 忽略一些无关代码
// Save our old pos and dir and temporarily set to the teleport ones
Vector3 oldPos = entity_.localPosition_;
Direction3D oldDir = entity_.localDirection_;
entity_.localPosition_ = position;
// XXX: This constructor takes its input as (roll, pitch, yaw)
entity_.localDirection_ = Direction3D( direction );
// Delete all ghosts
// 忽略一些无关代码
recordingSpaceEntryID_ = SpaceEntryID();
// Call this before the message is started so that the channel is in a good
// state. This should also be above the creation of the new ghost. If not,
// ghosted property changes would arrive before the ghost is created.
entity_.callback( "onLeavingCell" );
Mercury::ChannelSender sender( pChannel->channel() );
Mercury::Bundle & bundle = sender.bundle();
// Re-add haunt for destination.
this->addHaunt( *pChannel );
// We always have to create the ghost anew, even if there's already one
// there, because we have no idea which space the entity on the cellapp
// we're going to is in.
MemoryOStream ghostDataStream;
entity_.writeGhostDataToStream( ghostDataStream );
bundle.startMessage( CellAppInterface::onloadTeleportedEntity );
bundle << nearbyMBRef.id;
// so the receiver knows where the onload message starts
bundle << uint32( ghostDataStream.remainingLength() );
bundle.transfer( ghostDataStream, ghostDataStream.remainingLength() );
// And offload it
// Save a reference since offloadEntity decrefs 'this'!
EntityPtr pThisEntity = &entity_;
pThisEntity->population().rememberBaseChannel( *pThisEntity, pChannel->addr() );
pThisEntity->cell().offloadEntity( pThisEntity.get(), pChannel,
/* isTeleport: */ true );
// Restore our old pos and dir
pThisEntity->localPosition_ = oldPos;
pThisEntity->localDirection_ = oldDir;
return true;
}
可以看到这里的跨场景迁移的实现其实跟之前的Real-Ghost的迁移实现差不多,都会通过writeGhostDataToStream在目标Cell里创建一个GhostEntity,最终通过Cell::offloadEntity来调用RealEntity到GhostEntity的切换。
可以看到不管是同场景内迁移还是跨场景的迁移,最终都会调用到Cell::offloadEntity,其第二个参数代表要迁移的目标地址,第三个参数代表当前迁移是否是跨场景:
/**
* This method moves a real entity from this cell to an adjacent cell.
*
* @param pEntity The entity to offload.
* @param pChannel The channel to send on.
* @param isTeleport Indicates whether this is a teleport
*/
void Cell::offloadEntity( Entity * pEntity, CellAppChannel * pChannel,
bool isTeleport )
{
AUTO_SCOPED_PROFILE( "offloadEntity" );
SCOPED_PROFILE( TRANSIENT_LOAD_PROFILE );
// TRACE_MSG( "Cell::offloadEntity: id %d to cell %s\n", pEntity->id(),
// pChannel->address().c_str() );
// Make sure it's real.
MF_ASSERT( pEntity->pReal() != NULL );
// Make sure the entity doesn't have a zero refcount when between lists.
EntityPtr pCopy = pEntity;
// If teleporting, this has already been called so that the channel is not
// left with a partially streamed message.
if (!isTeleport)
{
pEntity->callback( "onLeavingCell" );
}
// Move the entity from being real to a ghost.
if (pEntity->isReal())
{
if (pReplayData_ && isTeleport)
{
pReplayData_->deleteEntity( pEntity->id() );
}
realEntities_.remove( pEntity );
pEntity->offload( pChannel, isTeleport );
pEntity->callback( "onLeftCell" );
}
}
offloadEntity的核心是Entity::offload,这个函数会开始将当前RealEntity的数据打包到Channel里:
/**
* This method offloads this real entity to the input adjacent cell. It should
* only be called on a real entity. This entity is converted into a ghost
* entity.
*
* @param pChannel The channel to the application to move to.
* @param isTeleport Indicates whether this is a teleport.
*
* @see onload
*/
void Entity::offload( CellAppChannel * pChannel, bool isTeleport )
{
#ifdef DEBUG_FAULT_TOLERANCE
if (g_crashOnOffload)
{
MF_ASSERT( !"Entity::offload: Crash on offload" );
}
#endif
MF_ASSERT( this->isReal() );
Mercury::Bundle & bundle = pChannel->bundle();
// if we are teleporting then we already have a message on the bundle
if (!isTeleport)
{
bundle.startMessage( CellAppInterface::onload );
}
this->convertRealToGhost( &bundle, pChannel, isTeleport );
// DEBUG_MSG( "Entity::offload( %d ): Offloading to %s\n",
// id_, pChannel->addr().c_str() );
}
Entity::offload的主要任务就是调用convertRealToGhost来执行数据流的打包工作,将当前RealEntity的所有重要数据都放入到bundle中:
/**
* This method converts a real entity into a ghost entity.
*
* @param pStream The stream to write the conversion data to.
* @param pChannel The application the real entity is being moved to. If
* NULL, the real entity is being destroyed.
* @param isTeleport Indicates whether this is a teleport.
*
* @see offload
* @see destroy
*/
void Entity::convertRealToGhost( BinaryOStream * pStream,
CellAppChannel * pChannel, bool isTeleport )
{
MF_ASSERT( this->isReal() );
MF_ASSERT( !pRealChannel_ );
Entity::callbacksPermitted( false );
Witness * pWitness = this->pReal()->pWitness();
if (pWitness != NULL)
{
pWitness->flushToClient();
}
if (pChannel != NULL)
{
// Offload the entity if we have a pChannel to the next real.
MF_ASSERT( pStream != NULL );
this->writeRealDataToStream( *pStream, isTeleport );
pRealChannel_ = pChannel;
// Once the real is created on the other CellApp, it will send a
// ghostSetReal back to this app, so we better be ready for it.
nextRealAddr_ = pRealChannel_->addr();
// Delete the real part (includes decrementing refs of haunts
// and notifying haunts of our nextRealAddr_)
this->offloadReal();
}
else
{
// Delete the real part (includes decrementing refs of haunts)
// as we're being destroyed.
this->destroyReal();
}
// 省略很多代码
}
上面的destroyReal内有个比较奇怪的操作,会用CellAppInterface::ghostSetNextRealArgs这个通知所有的GhostEntity当前的RealEntity要迁移到目标Cell的地址,来同步当前RealEntity的迁移中状态:
/**
* This method deletes this RealEntity. The pNextRealAddr parameter controls
* whether the Channel will be deleted immediately (for offloads) or
* condemned (all other cases).
*/
void RealEntity::destroy( const Mercury::Address * pNextRealAddr )
{
// Offloading
if (pNextRealAddr)
{
// Notify all ghosts that this real is about to be offloaded
for (Haunts::iterator iter = haunts_.begin();
iter != haunts_.end(); ++iter)
{
Haunt & haunt = *iter;
if (haunt.addr() != *pNextRealAddr)
{
CellAppInterface::ghostSetNextRealArgs & args =
CellAppInterface::ghostSetNextRealArgs::start(
haunt.bundle(), entity_.id() );
args.nextRealAddr = *pNextRealAddr;
}
}
// Clear out the channel's resend history so that when Channel::condemn() is
// called it is destroyed immediately. The resend history is now the
// responsibility of the channel that will be created on the dest app.
pChannel_->reset( Mercury::Address::NONE, false );
// delete pChannel_;
pChannel_->destroy();
}
// Destroying
else
{
pChannel_->condemn();
}
pChannel_ = NULL;
delete this;
}
这里会直接销毁pChannel_,无视剩下的还没有ACK的消息,因为在offload迁移时调用的writeRealDataToStreamInternal内部会让RealEntity把通道状态写到 offload流中:
/**
* This method is called by writeRealDataToStream once the decision whether or
* not to compress has been made.
*/
void Entity::writeRealDataToStreamInternal( BinaryOStream & data,
bool isTeleport ) const
{
//this->pType()->dumpRealScript( this, data );
// 暂时省略所有脚本属性的打包
TOKEN_ADD( data, "RealProps" );
pReal_->writeOffloadData( data, isTeleport );
this->writeBasePropertiesExposedForReplayToStream( data );
}
/**
* This method should put the relevant data into the input BinaryOStream so
* that this entity can be onloaded to another cell. It is mostly read off
* in the readOffloadData except for a bit done in our constructor above.
*
* @param data The stream to place the data on.
* @param isTeleport Indicates whether this is a teleport.
*/
void RealEntity::writeOffloadData( BinaryOStream & data, bool isTeleport )
{
StreamHelper::addRealEntity( data );
// -------- above here read off in our constructor above
pChannel_->addToStream( data );
// 省略后续的所有代码
}
这行代码pChannel_->addToStream( data )会把channel的待发送数据和重传历史等必要状态序列化到offload数据流,具体细节这里就不贴了。
等到RealEntity在目标CellApp上重建的时候,在其init函数中会判断当前是从迁移数据里构造数来的,因此会将通道执行恢复:
/**
* This method initialise the RealEntity. Must be called immediately after
* RealEntity is constructed. Return true if ghost position of this entity
* needs to be updated.
*/
bool RealEntity::init( BinaryIStream & data, CreateRealInfo createRealInfo,
Mercury::ChannelVersion channelVersion,
const Mercury::Address * pBadHauntAddr )
{
// The following could've been put on by:
// - py_createEntity
// - py_createEntityFromFile
// - eLoad
// - Base.createCellEntity
// - offloading
//
// This is usually added to the stream using StreamHelper::addRealEntity.
// Set the channel version if we have one
if (channelVersion != Mercury::SEQ_NULL)
{
pChannel_->version( channelVersion );
pChannel_->creationVersion( channelVersion );
}
bool requireGhostPosUpdate = false;
bool hasChangedSpace = false;
bool needsPhysicsCorrection = false;
switch( createRealInfo )
{
case CREATE_REAL_FROM_OFFLOAD:
needsPhysicsCorrection =
this->readOffloadData( data, pBadHauntAddr, &hasChangedSpace );
break;
case CREATE_REAL_FROM_RESTORE:
this->readBackupData( data );
break;
case CREATE_REAL_FROM_INIT:
requireGhostPosUpdate = true;
break;
}
// 省略后续代码
}
/**
* This method is used to stream off data that was added to a stream using
* writeOffloadData. Return true if the ghost position of the entity needs
* to be updated.
*
* @return Whether a physics correction should be sent to the client. This
* needs to be performed by the caller after it gets a witness.
*
* @see writeOffloadData
*/
bool RealEntity::readOffloadData( BinaryIStream & data,
const Mercury::Address * pBadHauntAddr, bool * pHasChangedSpace )
{
pChannel_->initFromStream( data, entity_.baseAddr() );
// 省略后续代码
}
这样执行完成之后,pChannel_就恢复到了迁移之前的状态,可以继续维护可靠消息的确认和重传。有了这个pChannel的序列化与反序列化之后,我们就不需要担心在迁移过程中的下发数据的可靠性。
每次RealEntity在迁移结束之后,在进入新的Cell时又会执行Cell::addRealEntity这个操作,这样就会将最新地址通知回Base,这样Base就有了最新的RealEntity地址。相信大家很快都发现了问题,在迁移期间从Base发往RealEntity的消息使用的仍然是RealEntity迁移前的Cell地址,那消息不就会丢了吗?
这个问题我们在mosaic_game里也遇到过,在介绍mosaic_game里的迁移时,我们为了避免迁移期间的消息丢失,会在迁移之前让actor_entity主动的通知RealEntity迁移的开始,之后所有通过RealEntity发往actor_entity的转发消息都会缓存住,直到actor_entity迁移完成之后通知RealEntity新的地址再重新开始发送。这么明显的问题BigWorld当然也考虑了,他的解决方案与mosaic_game中的很不一样,他是在老的Cell上将这些消息临时缓存起来,等到迁移完成后再执行转发。接下来我们来详细的剖析这个缓存然后等待转发的流程,首先回到之前提到的Entity处理消息的EntityMessageHandler:
/**
* This method handles this message. It is called from the InputMessageHandler
* override and from handling of buffered messages.
*/
void EntityMessageHandler::handleMessage( const Mercury::Address & srcAddr,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data,
EntityID entityID )
{
CellApp & app = ServerApp::getApp< CellApp >( header );
Entity * pEntity = app.findEntity( entityID );
AUTO_SCOPED_ENTITY_PROFILE( pEntity );
BufferedGhostMessages & bufferedMessages = app.bufferedGhostMessages();
bool shouldBufferGhostMessage =
!pEntity ||
pEntity->shouldBufferMessagesFrom( srcAddr ) ||
bufferedMessages.isDelayingMessagesFor( entityID, srcAddr );
bool isForDestroyedGhost = false;
// Message is for a destroyed ghost if it is out of subsequence order.
if (reality_ == GHOST_ONLY)
{
// 省略
}
// Drop GHOST_ONLY messages for destroyed ghost.
if (isForDestroyedGhost)
{
// 省略
}
// Buffer GHOST_ONLY messages that are out of sender order.
else if (reality_ == GHOST_ONLY && shouldBufferGhostMessage)
{
// 省略
}
// REAL_ONLY messages should be forwarded if we don't have the real.
else if (reality_ >= REAL_ONLY && (!pEntity || !pEntity->isReal()))
{
// We only try to look up the cached channel for the entity if it
// doesn't exist, since calling findRealChannel() for ghosts will
// cause an assertion.
CellAppChannel * pChannel = pEntity ?
pEntity->pRealChannel() :
Entity::population().findRealChannel( entityID );
if (pChannel)
{
Entity::forwardMessageToReal( *pChannel, entityID,
header.identifier, data, srcAddr, header.replyID );
}
else
{
ERROR_MSG( "EntityMessageHandler::handleMessage( %s [id: %d] ): "
"Dropped real message for unknown entity %u\n",
header.msgName(), int( header.identifier ), entityID );
this->sendFailure( srcAddr, header, data, entityID );
}
}
// 省略后续分支
}
这里我们重点关注reality_ >= REAL_ONLY && (!pEntity || !pEntity->isReal())这个分支,代表这个消息应该是Real去处理,但是本地只有Ghost的情况。此时会从GhostEntity上找到这个pRealChannel,然后执行转发。这个转发逻辑依赖于我们的GhostEntity知道RealEntity的下一个Cell地址是什么,这个很好设置,因为当前的GhostEntity就是RealEntity迁移之后形成的,迁移开始的时候本来就知道迁移的目标Cell
void Entity::offload( CellAppChannel * pChannel, bool isTeleport )这里的pChannel代表的就是要迁移的目标Cell的Channel,这样迁移开始的时候Entity::pRealChannel_就指向了要迁移的目标Cell,所以Entity::forwardMessageToReal就可以使用这个pRealChannel_就可以正确的将数据转发过去:
/**
* This static message is used to forward a message to another CellApp.
*
* @param realChannel The channel on which the message will be forwarded.
* @param entityID The id of the entity to send the message to.
* @param messageID The id of the message to forward.
* @param data The message data to forward.
* @param srcAddr This is used if the message is a request. The reply will be
* forwarded to this address.
* @param replyID If not REPLY_ID_NONE, the message is a
* request. The reply will be forwarded to srcAddr via this application.
*/
void Entity::forwardMessageToReal(
CellAppChannel & realChannel,
EntityID entityID,
uint8 messageID, BinaryIStream & data,
const Mercury::Address & srcAddr, Mercury::ReplyID replyID )
{
AUTO_SCOPED_PROFILE( "forwardToReal" );
Mercury::ChannelSender sender( realChannel.channel() );
Mercury::Bundle & bundle = sender.bundle();
const Mercury::InterfaceElement & ie =
CellAppInterface::gMinder.interfaceElement( messageID );
if (replyID == Mercury::REPLY_ID_NONE)
{
bundle.startMessage( ie );
}
else
{
bundle.startRequest( ie, new ReplyForwarder( srcAddr, replyID ) );
}
bundle << entityID;
bundle.transfer( data, data.remainingLength() );
}
在Base里还有一个比较重要的机制来辅助迁移,在Base创建与RealEntity之间通信的pChannel_时会设置shouldAutoSwitchToSrcAddr为true,其作用是如果收到任意消息时,这个Channel的目标地址会立即更新为这个消息的源地址:
// Base channels must auto switch to the incoming address, because if
// packets are lost before a long teleport (i.e. so old ghost will not hang
// around), incoming packets (with the setCurrentCell message) will be
// buffered and the address switch might never happen.
pChannel_->shouldAutoSwitchToSrcAddr( true );
根据其注释,大概解释一下这个选项想要解决的问题:在长时间迁移的情况下,如果RealEntity发之前发出的部分包丢失导致后面发出的setCurrentCell这个包一直在缓冲区里不会被处理,而处理包丢失的消息重传又依赖于Base知道最新的RealEntity的地址,这样就会出现一种诡异的逻辑死锁。所以这里把这个自动切换目标地址为新包源地址的功能会被打开,来自动更新为最新RealEntity的地址。
这个选项说起来简单,但是实现上考虑的是比较严谨的,并不是任意到达的数据包都会更新当前pChannel_的目标地址。在收包处理函数里UDPChannel::addToReceiveWindow会使用一个版本号系统,只有在传入数据的地址版本号大于本地记录的地址版本号的时候才会执行地址更新:
/**
* This method is called when a packet is received. It is responsible for
* adding the packet to the receive window and queueing an ACK to the next
* outgoing bundle on this channel.
*/
UDPChannel::AddToReceiveWindowResult UDPChannel::addToReceiveWindow(
Packet * p, const Address & srcAddr, PacketReceiverStats & stats )
{
const SeqNum seq = p->seq();
const bool isDebugVerbose = this->networkInterface().isDebugVerbose();
// Make sure the sequence number is valid
if (seqMask( seq ) != seq)
{
if (this->networkInterface().isVerbose())
{
ERROR_MSG( "UDPChannel::addToReceiveWindow( %s ): "
"Got out-of-range incoming seq #%u (inSeqAt: #%u)\n",
this->c_str(), seq, inSeqAt_ );
}
return PACKET_IS_CORRUPT;
}
if (shouldAutoSwitchToSrcAddr_)
{
// We switch address if the version number is acceptable. We switch on
// equal version numbers because the first packet from a cell entity
// sets the address and is version 0.
if (!seqLessThan( p->channelVersion(), version_ ))
{
version_ = p->channelVersion();
this->setAddress( srcAddr );
}
}
// 省略后续的分支处理
}
这个功能依赖于每个Packet在发送时都会带上自身的地址版本号:
/**
* This method writes this channel's state to the provided stream so that it
* can be reconstructed with initFromStream().
*/
void UDPChannel::addToStream( BinaryOStream & data )
{
// Avoid having to stream this with the channel.
if (this->hasUnsentData())
{
this->send();
}
// Increment version number for peer
data << seqMask( version_ + 1 );
// 省略后续的处理
}
当这个Base第一次创建Entity的时候,会带上自身的channel版本号作为参数:
/**
* This method creates the cell entity associated with this entity into the
* input space.
*/
bool Base::createInSpace( SpaceID spaceID, const char * pyErrorPrefix )
{
BaseApp & app = BaseApp::instance();
// TODO:
// As an optimisation, try to find a cell entity mailbox for an existing
// base entity that is in the same space.
//
// This is currently not implemented as there is a potential race-condition.
// The entity may currently be in the same space but may be in a different
// space by the time the createCellEntity message arrives.
std::auto_ptr< Mercury::ReplyMessageHandler > pHandler(
this->prepareForCellCreate( pyErrorPrefix ) );
if (!pHandler.get())
{
return false;
}
Mercury::Channel & channel = BaseApp::getChannel( app.cellAppMgrAddr() );
// We don't use the channel's own bundle here because the streaming might
// fail and the message might need to be aborted halfway through.
std::auto_ptr< Mercury::Bundle > pBundle( channel.newBundle() );
pBundle->startRequest( CellAppMgrInterface::createEntity, pHandler.get() );
*pBundle << spaceID;
// stream on the entity channel version
*pBundle << this->channel().version();
*pBundle << false; /* isRestore */
// See if we can add the necessary data to the bundle
if (!this->addCellCreationData( *pBundle, pyErrorPrefix ))
{
isCreateCellPending_ = false;
isGetCellPending_ = false;
return false;
}
channel.send( pBundle.get() );
pHandler.release(); // Handler deletes itself on callback.
return true;
}
当cellApp接收到这个createEntity请求之后,会一路传递这个channelVersion到RealEntity上:
EntityPtr Cell::createEntityInternal( BinaryIStream & data,
const ScriptDict & properties,
bool isRestore, Mercury::ChannelVersion channelVersion,
EntityPtr pNearbyEntity )
{
// 省略很多代码
// Build up the Entity structure
EntityPtr pNewEntity = space_.newEntity( id, entityTypeID );
if (!pNewEntity)
{
return NULL;
}
MF_ASSERT( pNewEntity->nextInChunk() == NULL );
MF_ASSERT( pNewEntity->prevInChunk() == NULL );
MF_ASSERT( pNewEntity->pChunk() == NULL );
Entity::callbacksPermitted( false ); // {
if (!pNewEntity->initReal( data, properties, isRestore, channelVersion,
pNearbyEntity ))
{
pNewEntity->setShouldReturnID( shouldAllocateID );
pNewEntity->decRef();
// TODO: Make a callbacksPermitted lock class to help manage the pairing
// of these calls
Entity::callbacksPermitted( true );
return NULL;
}
// 省略很多代码
}
bool Entity::initReal( BinaryIStream & data, const ScriptDict & properties,
bool isRestore,
Mercury::ChannelVersion channelVersion,
EntityPtr pNearbyEntity )
{
// 省略很多代码
this->createReal();
bool shouldUpdateGhostPositions = pReal_->init( data,
isRestore ? CREATE_REAL_FROM_RESTORE : CREATE_REAL_FROM_INIT,
channelVersion );
// 省略很多的代码
}
这里的RealEntity::init就会使用channelVersion这个版本信息作为参数来初始化pChannel内部的版本号:
/**
* This method initialise the RealEntity. Must be called immediately after
* RealEntity is constructed. Return true if ghost position of this entity
* needs to be updated.
*/
bool RealEntity::init( BinaryIStream & data, CreateRealInfo createRealInfo,
Mercury::ChannelVersion channelVersion,
const Mercury::Address * pBadHauntAddr )
{
// The following could've been put on by:
// - py_createEntity
// - py_createEntityFromFile
// - eLoad
// - Base.createCellEntity
// - offloading
//
// This is usually added to the stream using StreamHelper::addRealEntity.
// Set the channel version if we have one
if (channelVersion != Mercury::SEQ_NULL)
{
pChannel_->version( channelVersion );
pChannel_->creationVersion( channelVersion );
}
// 省略后续代码
}
上述流程走完之后,保证了从Base发出的创建Entity请求成功之后, 这个RealEntity内的pChannel的地址版本号与Base里记录的版本号是一致的。
然后RealEntity每次迁移的时候,都会在pChannel数据打包的时候将内部的Version字段进行自增:
/**
* This method writes this channel's state to the provided stream so that it
* can be reconstructed with initFromStream().
*/
void UDPChannel::addToStream( BinaryOStream & data )
{
// Avoid having to stream this with the channel.
if (this->hasUnsentData())
{
this->send();
}
// Increment version number for peer
data << seqMask( version_ + 1 );
// 省略后续代码
}
然后在解包的时候,会原样的将这个version解析出来:
/**
* This method reconstructs this channel from streamed data. It is used for
* streaming the entity channel when the real cell entity is offloaded.
*
* This assumes that this object was constructed with the same arguments as
* the source channel.
*/
void UDPChannel::initFromStream( BinaryIStream & data,
const Address & addr )
{
uint64 timeNow = timestamp();
lastReceivedTime_ = timeNow;
addr_ = addr;
data >> version_;
// 省略后续的代码
}
这样的设计就可以让每次迁移后RealEntity的pChannel的version比迁移前多1,这样就可以保证迁移后的RealEntity发出的第一个包里面的version一定是比Base上记录的pChannel的version大。在这样的设计下就可以被动的让Base上记录的pChannel的地址更新到最新迁移后的RealEntity的地址。
Base的迁移
前面我们介绍的关于RealEntity消息投递的流程里,有一个非常强的依赖:Base对象在创建之后便不会移动。有了这个不会移动的Base对象之后,往动态的RealEntity投递消息就可以简化为往静态的Base投递消息,因为Base会自动对相关消息转发到最新的RealEntity上。但是其实这个Base不会移动的假设是不成立的,在某些情况下一个Base可能会从一个BaseApp移动到另外一个BaseApp,主要有这两种情况:
- 当前
BaseApp的负载太高了,执行负载均衡时需要将一些Base从当前的BaseApp移动到负载较低的BaseApp上 - 当前的
BaseApp由于进程崩溃导退出,引发相关的Base从数据库中重建,此时会被迫创建在一个新的Base上
由于进程奔溃推出时引发的Base重建会出现可能的消息丢失,所以我们就不去考虑这种容灾的情况,下面我们来重点研究一下Base迁移时如何保证消息的可靠投递的。
负载均衡导致的Base迁移逻辑入口在BaseApp::startOffloading:
/**
* Once we have started retiring, we wait for acknowledgement from the
* BaseAppMgr that it will no longer adjust the backup hash of this app.
*/
void BaseApp::startOffloading( BinaryIStream & stream )
{
MF_ASSERT( this->isRetiring() );
INFO_MSG( "BaseApp::startOffloading: Received confirmation of "
"retirement from BaseAppMgr, destroying %zu local service"
"fragments and starting to offload %zu entities\n",
localServiceFragments_.size(),
bases_.size() );
localServiceFragments_.discardAll( /*shouldDestroy*/ true );
pBackupSender_->restartBackupCycle( bases_ );
pBackupSender_->startOffloading();
}
有一个专门的BackupSender来负责Base的迁移,其内部有一个地址数组addrs_,作为Base的可选迁移目标,这个数组在当前BaseApp被创建的时候就会被填充好,填充的数据由BaseAppMgr指定。BackupSender其最重要的接口就是根据EntityID计算的Hash来获取这个EntityID要迁移的目标地址,通过Hash的随机性来达到平分负载到目标BaseApp的目的:
/**
* This method returns the address that the input id hashes to.
*/
Mercury::Address BackupHash::addressFor( EntityID id ) const
{
if (!addrs_.empty())
{
return addrs_[ this->hashFor( id ) ];
}
return Mercury::Address( 0, 0 );
}
后续的restartBackupCycle负责将当前BaseApp里的所有Base加入到待迁移列表basesToBackUp_中:
/**
* This method restarts the backup cycle.
*
* @param bases The collection of bases to consider for backing up.
*/
void BackupSender::restartBackupCycle( const Bases & bases )
{
basesToBackUp_.clear();
Bases::const_iterator iBase = bases.begin();
while (iBase != bases.end())
{
basesToBackUp_.push_back( (iBase++)->first );
}
// Randomise the backup so we do not load ourselves if contiguous
// blocks of large entities exist in the bases collection.
std::random_shuffle( basesToBackUp_.begin(), basesToBackUp_.end() );
// TODO: It would be nicer if we maintained the random order. Currently,
// it would be possible for an entity not to be backed up for twice the
// archive period.
}
这里填充好了之后会执行一次随机化,来避免可能出现的连续多个重负载的Base聚集在一起, 这样可以方便后续的分帧处理迁移时的负载平滑。在执行完restartBackupCycle之后会将当前的BackupSender设置为正在负载均衡:
void startOffloading() { isOffloading_ = true; }
这个标记位开启之后后续就会执行Base的分帧迁移流程,分帧迁移的逻辑在BackupSender::tick函数中,这个函数会在Tick里来从basesToBackUp_的尾部拿出numToBackUp个元素来处理,避免一帧内大量Base的迁移逻辑触发卡顿:
/**
* This method sends backups for as many base entities as we are supposed to
* each tick.
*
* @param bases The collection of base entities.
* @param networkInterface The network interface to use to send backups
* through.
*
*/
void BackupSender::tick( const Bases & bases,
Mercury::NetworkInterface & networkInterface )
{
int periodInTicks = BaseAppConfig::backupPeriodInTicks();
if (periodInTicks == 0)
return;
if (!isUsingNewBackup_ && entityToAppHash_.empty())
return;
Mercury::BundleSendingMap bundles( networkInterface );
// The number of entities to back up is calculated. A floating point
// remainder is kept so that the backup period is roughly correct.
float numToBackUpFloat =
float(bases.size())/periodInTicks + backupRemainder_;
int numToBackUp = int(numToBackUpFloat);
backupRemainder_ = numToBackUpFloat - numToBackUp;
if (isOffloading_)
{
if (offloadPerTick_ < numToBackUp)
{
offloadPerTick_ = numToBackUp;
INFO_MSG( "BackupSender::tick: "
"BaseApp is retiring, offloading at %d entities per tick\n",
offloadPerTick_ );
}
else
{
numToBackUp = offloadPerTick_;
}
}
if (basesToBackUp_.empty())
{
this->restartBackupCycle( bases );
}
bool madeProgress = false;
while ((numToBackUp > 0) && !basesToBackUp_.empty())
{
Base * pBase = bases.findEntity( basesToBackUp_.back() );
basesToBackUp_.pop_back();
if (pBase && this->autoBackupBase( *pBase, bundles ) )
{
madeProgress = true;
--numToBackUp;
}
}
// Check if at least one base was backed up.
if (madeProgress)
{
// Send all the backup data to the other baseapps.
bundles.sendAll();
ticksSinceLastSuccessfulOffload_ = 0;
}
else
{
// 省略一些容错代码
}
if (basesToBackUp_.empty() && isUsingNewBackup_)
{
// If we were updating a new backup, we are now finished. Inform the
// BaseAppMgr and start using it.
this->ackNewBackupHash();
}
}
这里处理单个Base迁移的函数为autoBackupBase,其函数体负责转发业务到backupBase函数上,这个函数负责查询当前Base对应的新BaseApp的地址,并执行数据打包,打包的数据会统一放在bundles这个Map里,根据BaseApp的地址进行聚合:
/**
* This method performs the backup operation for a single base entity.
*
* @param base The base entity to backup.
* @param bundles Bundle sending map to be used for sending.
* @param pHandler The request handler for the backupEntity request
* operation on the peer BaseApp.
*
* @return True if a base was actually backed up, false otherwise.
*/
bool BackupSender::backupBase( Base & base,
Mercury::BundleSendingMap & bundles,
Mercury::ReplyMessageHandler * pHandler )
{
Mercury::Address addr = entityToAppHash_.addressFor( base.id() );
if (isUsingNewBackup_)
{
// 暂时忽略重复触发负载均衡的情况
}
if (addr == Mercury::Address::NONE)
{
return false;
}
if (isOffloading_)
{
if (base.isProxy())
{
// 忽略客户端还没有连接上来时的处理
}
// If a baseapp is offloading, the hash is immutable, so if an address
// is dead, just find another baseapp to send stuff to.
addr = baseApp_.backupHashChain().addressFor( addr, base.id() );
}
Mercury::Bundle & bundle = bundles[ addr ];
base.backupTo( addr, bundle, isOffloading_, pHandler );
return true;
}
这里的Base::backupTo里负责构造一个BaseAppIntInterface::backupBaseEntity的RPC请求,填充数据里第一个字段代表是否是由于负载均衡导致的迁移,writeBackupData这个我们就不跟进入去了,唯一需要注意的一点是内部在发现正处于负载均衡的时候会将与RealEntity之间的消息通道pChannel_也序列化进去,等到迁移到新的BaseApp之后会利用打包在里面的数据重新创建一个pChannel_,这个过程类似于RealEntity::pChannel_在RealEntity迁移时的处理:
void Base::backupTo( const Mercury::Address & addr,
Mercury::Bundle & bundle,
bool isOffloading, Mercury::ReplyMessageHandler * pHandler )
{
if (pHandler)
{
bundle.startRequest( BaseAppIntInterface::backupBaseEntity,
pHandler );
}
else
{
bundle.startMessage( BaseAppIntInterface::backupBaseEntity );
}
bundle << isOffloading;
this->writeBackupData( bundle, /*isOnload*/ isOffloading );
if (isOffloading)
{
this->offload( addr );
}
hasBeenBackedUp_ = true;
}
数据打包好了之后,还会执行一个offload操作,这个操作比较重要,我们需要跟进一下:
/**
* This method offloads the base entity to the BaseApp at the destination
* address.
*
* @param dstAddr The destination BaseApp (internal address).
*/
void Base::offload( const Mercury::Address & dstAddr )
{
BaseApp & baseApp = BaseApp::instance();
baseApp.makeLocalBackup( *this );
baseApp.addForwardingMapping( id_, dstAddr );
if (this->cellAddr() != Mercury::Address::NONE)
{
Mercury::Channel & channel =
baseApp.intInterface().findOrCreateChannel( this->cellAddr() );
Mercury::Bundle & bundle = channel.bundle();
CellAppInterface::onBaseOffloadedArgs & rOnBaseOffloadedArgs =
CellAppInterface::onBaseOffloadedArgs::start( bundle, this->id() );
rOnBaseOffloadedArgs.newBaseAddr = dstAddr;
DEBUG_MSG( "Base( %s %u )::offload: Now on baseapp %s\n",
pType_->name(), id_, dstAddr.c_str() );
channel.send();
}
// Stop the Proxy trying to disable its Witness when it offloads
// its client.
pChannel_->reset( Mercury::Address::NONE, false );
if (this->isProxy())
{
static_cast< Proxy * >( this )->offload( dstAddr );
}
this->discard( /* isOffload */ true );
}
这里的重点是addForwardingMapping( id_, dstAddr ),这里的意思是为这个id_的Base提供新地址dstAddr的消息转发。实现原理是BaseApp上维护一个id到新地址的映射map,存储在pBaseMessageForwarder_中:
/**
* This method records and sets up the forwarding for an offloaded base
* entity.
*/
void BaseApp::addForwardingMapping( EntityID entityID,
const Mercury::Address & addr )
{
pBaseMessageForwarder_->addForwardingMapping( entityID, addr );
}
/**
* This method adds a forwarding mapping for the given entity ID to the given
* destination address.
*/
void BaseMessageForwarder::addForwardingMapping( EntityID entityID,
const Mercury::Address & destAddr )
{
map_[entityID] = destAddr;
}
然后CommonBaseMessageHandler在接收消息的时候,会优先检查这个id是否需要转发:
/**
* Objects of this type are used to handle base messages
*/
class CommonBaseMessageHandler : public Mercury::InputMessageHandler
{
public:
virtual void handleMessage( const Mercury::Address & srcAddr,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
BaseApp & app = ServerApp::getApp< BaseApp >( header );
if (app.forwardBaseMessageIfNecessary( 0, srcAddr, header, data ))
{
return;
}
// 省略后续的所有代码
}
}
/**
* This message forwards a message to a recently offloaded base entity.
*/
bool BaseApp::forwardBaseMessageIfNecessary( EntityID entityID,
const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
return pBaseMessageForwarder_->forwardIfNecessary(
forwardingEntityIDForCall_, srcAddr, header, data );
}
如果在map_里找到了转发地址,则封装一个forwardedBaseMessage投递到新的目的地址:
/**
* This method forwards a message if a mapping exists for this entity ID.
* Returns true if a mapping exists, and forwarding occurred, otherwise it
* returns false.
*/
bool BaseMessageForwarder::forwardIfNecessary( EntityID entityID,
const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
Map::iterator iMapping = map_.find( entityID );
if (iMapping != map_.end())
{
const Mercury::Address & destAddr = iMapping->second;
Mercury::Channel & channel =
networkInterface_.findOrCreateChannel( destAddr );
Mercury::Bundle & bundle = channel.bundle();
bundle.startMessage( BaseAppIntInterface::forwardedBaseMessage );
bundle << srcAddr << header.identifier << header.replyID ;
// A REAL handler would use this: bundle << entityID;
bundle.transfer( data, data.remainingLength() );
return true;
}
return false;
}
有了这个消息转发机制之后,对于一个RealEntity来说,所有用其老Base进行消息中转的RPC都会自动的在老BaseApp上中转到新的Base上,这样就避免了使用老Base作为通信地址时的消息丢失。
同时Base::offload这里判断了cellAddr()不为空的时候,会通过这个cellAddr给对应的RealEntity发送一个onBaseOffloaded的RPC,来通知其Base要迁移过去的新地址。
/**
* This method is called when this entity's base part has been offloaded to another
* BaseApp as part of BaseApp retirement.
*/
void Entity::onBaseOffloaded(
const CellAppInterface::onBaseOffloadedArgs & args )
{
// This message is REAL_ONLY
MF_ASSERT( pReal_ );
baseAddr_ = args.newBaseAddr;
pReal_->channel().setAddress( baseAddr_ );
DEBUG_MSG( "Entity( %s %u )::onBaseOffloaded: Now on baseapp %s\n",
pEntityType_->name(), id_, baseAddr_.c_str() );
for (RealEntity::Haunts::iterator iter = pReal_->hauntsBegin();
iter != pReal_->hauntsEnd();
++iter)
{
CellAppInterface::onBaseOffloadedForGhostArgs::start(
iter->bundle(), this->id() ).newBaseAddr = baseAddr_;
}
}
/**
* This method is called when this entity's real has been told that this
* entity's base part has been offloaded to another BaseApp as part of BaseApp
* retirement.
*/
void Entity::onBaseOffloadedForGhost(
const CellAppInterface::onBaseOffloadedForGhostArgs & args )
{
// This message is GHOST_ONLY
MF_ASSERT( !pReal_ );
baseAddr_ = args.newBaseAddr;
}
这样RealEntity上的channel就更新了最新的Base的地址,同时其所有的GhostEntity也会收到这个Base地址的更新。其他RealEntity可以使用这个新的Base地址与这个RealEntity进行通信,避免使用老Base地址时导致的多一层转发延迟。
这里还针对Proxy做了一个特殊的处理,通知客户端连接当前Base正在迁移:
/**
* This function is called when this proxy is offloaded in order to transfer
* the connected client.
*/
void Proxy::offload( const Mercury::Address & dstAddr )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
if (this->hasClient())
{
// We don't wait for an acknowledgement of this
// transfer, since we're going to be destroyed now.
this->transferClient( dstAddr, /* shouldReset */ false );
this->detachFromClient( /* shouldCondemn */ true );
}
}
/**
* This function will transfer the connected client and optionally reset
* entities on the client if it has been transferred between two
* different proxies.
*/
void Proxy::transferClient( const Mercury::Address & dstAddr,
bool shouldReset, Mercury::ReplyMessageHandler * pHandler )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
Mercury::Bundle & bundle = this->clientBundle();
Mercury::Address externalAddr =
BaseApp::instance().getExternalAddressFor( dstAddr );
MF_ASSERT( pClientChannel_ != NULL );
if (NATConfig::isExternalIP( pClientChannel_->addr().ip ))
{
externalAddr.ip = NATConfig::externalIPFor( externalAddr.ip );
}
// Either a cross-CellApp handoff, or an offload.
MF_ASSERT( isGivingClientAway_ || pHandler == NULL );
// Abort any in-progress downloads.
DownloadCallbacks callbacks;
dataDownloads_.abortDownloads( callbacks );
DEBUG_MSG( "Proxy::transferClient( %s %d ): "
"switching client %s to BaseApp %s (%s)\n",
this->pType()->name(),
id_,
pClientChannel_->c_str(),
externalAddr.c_str(),
shouldReset ? "should reset" : "no reset" );
ClientInterface::switchBaseAppArgs & rArgs = (pHandler ?
(ClientInterface::switchBaseAppArgs::startRequest( bundle, pHandler )) :
(ClientInterface::switchBaseAppArgs::start( bundle )));
rArgs.baseAddr = externalAddr;
rArgs.shouldResetEntities = shouldReset;
this->sendBundleToClient();
// If giving the client away, we need to know that we've aborted the
// downloads.
// Otherwise, we're offloading, and the new BaseApp's copy of us
// will note the downloads active in the backup and abort them.
// If we're offloading, our backup has already been sent, so it's
// too late to change state. So we _never_ call triggerCallbacks()
if (isGivingClientAway_)
{
callbacks.triggerCallbacks( this );
}
}
这里的transferClient的实现其实就是通知原来的客户端执行一次主动断线后再去连接到新的Base上,也就是一次顶号的流程。transferClient结束之后就主动调用detachFromClient来等待被动断线。
到这里,Base迁移的准备工作就都做完了,老的BaseApp已经添加了这个Base的转发映射,这个Base的RealEntity和对应的所有GhostEntity都收到了最新BaseApp地址,剩下的工作就是等待打包好的Base数据传递到新的BaseApp上,执行恢复流程,对应的接口为BaseApp::backupBaseEntity,这个接口负责恢复一个Base:
/**
* This method handles a message containing the backup information for a base
* entity on another BaseApp.
*/
void BaseApp::backupBaseEntity( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
bool isOffload;
data >> isOffload;
EntityID entityID;
data >> entityID;
if (isOffload)
{
BasePtr pBase = this->createBaseFromStream( entityID, data );
if (pBase == NULL)
{
WARNING_MSG( "BaseApp::backupBaseEntity: "
"failed to create base %d for onload\n", entityID );
return;
}
DEBUG_MSG( "BaseApp::backupBaseEntity: Onloaded %s %d\n",
pBase->pType()->name(), entityID );
// This just culls the old backup if appropriate
pBackedUpBaseApps_->onloadedEntity( srcAddr, entityID );
// 省略一些容错代码
}
else
{
pBackedUpBaseApps_->backUpEntity( srcAddr, entityID, data );
}
// 省略reply的处理
}
backupBaseEntity首先从数据流里取出开头的isOffload和entityID,这里的isOffload代表的是否因为负载均衡而迁移,如果是负载均衡则调用createBaseFromStream从数据流里读取数据来创建一个新的Base:
/**
* This method creates a base entity from the given backup data stream as a
* restore or offload.
*/
BasePtr BaseApp::createBaseFromStream( EntityID id, BinaryIStream & stream )
{
// This can happen when an offloading baseapp offloads a restored entity
// to the baseapp that has already restored it.
if (bases_.find( id ) != bases_.end())
{
NOTICE_MSG( "BaseApp::createBaseFromStream( %d ): "
"Entity already exists\n",
id );
stream.finish();
return NULL;
}
// This should match the Base::writeBackupData, with the exception that the
// entity ID has already been streamed off as the given EntityID parameter.
EntityTypeID typeID;
BW::string templateID;
DatabaseID databaseID;
stream >> typeID >> templateID >> databaseID;
EntityTypePtr pType = EntityType::getType( typeID );
if ((pType == NULL) || !pType->canBeOnBase())
{
ERROR_MSG( "BaseApp::createBaseFromStream: "
"Invalid entity type %d for entity %d\n",
typeID, id );
stream.finish();
return NULL;
}
BasePtr pBase = pType->newEntityBase( id, databaseID );
if (!pBase)
{
ERROR_MSG( "BaseApp::createBaseFromStream: "
"Failed to create entity %d of type %d\n",
id, typeID );
stream.finish();
return NULL;
}
if (!pBase->initDelegate( templateID ))
{
ERROR_MSG( "BaseApp::createBaseFromStream: "
"Failed to initialise delegate of entity %d of type '%s' "
"with template '%s'\n",
id, pType->name(), templateID.c_str() );
stream.finish();
return NULL;
}
pBase->readBackupData( stream );
return pBase;
}
最后的pBase->readBackupData里负责重建与RealEntity通信的Channel:
/**
* This method is called by readBackupData with a stream that handles
* compression.
*/
void Base::readBackupDataInternal( BinaryIStream & stream )
{
Mercury::Address cellAddr;
stream >> cellAddr;
pChannel_->setAddress( cellAddr );
bool hasChannel;
stream >> hasChannel;
if (hasChannel)
{
pChannel_->initFromStream( stream, cellAddr );
}
if (pCellEntityMailBox_)
pCellEntityMailBox_->address( this->cellAddr() );
stream >> isCreateCellPending_ >> isGetCellPending_ >>
isDestroyCellPending_ >> spaceID_ >>
shouldAutoBackup_ >> shouldAutoArchive_ >>
cellBackupData_;
Mercury::Address clientAddr;
if (this->isProxy())
{
clientAddr = static_cast< Proxy * >( this )->readBackupData(
stream, hasChannel );
}
this->restoreTimers( stream );
this->restoreAttributes( stream );
this->restoreCellData( stream ); // Must be last. Consumes rest of data.
// Holding onto ourselves to ensure that the entity isn't destroyed in
// onRestore()
// 忽略一些脚本层的回调
if (!this->isDestroyed() && this->isProxy())
{
static_cast< Proxy * >( this )->onRestored( hasChannel, clientAddr );
}
}
这里的Proxy::onRestored处理的是当前还没有创建RealEntity的情况,这个时候直接强制让客户端掉线,反正也没有必要维护之前的状态了:
/**
* This method is called after the Base entity has finished restoring from
* the backup stream.
*/
void Proxy::onRestored( bool hasChannel, const Mercury::Address & clientAddr )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
if (!hasChannel && (clientAddr != Mercury::Address::NONE))
{
this->onClientDeath( CLIENT_DISCONNECT_BASE_RESTORE,
/*shouldExpectClient: */ false );
}
}
到这里整个Base在新的BaseApp上的重建流程就走完了,所有数据与信道都已经恢复了,同时老的BaseApp上依然保留着老Base到新Base的转发规则,以避免用老的Base地址进行通信时的消息丢失。
虽然保证了迁移前后的消息不会丢失,但是这里好像会出现消息不保序的问题,举个例子来说,RealEntity(A)给Base(B)发出了消息Msg(M),但是此时由于Base(B)正在迁移到Base(C),所以Msg(M)会通过BaseApp(B)转发到BaseApp(C)。但是如果在转发过程中,RealEntity(A)收到了其最新地址Base(C),并向Base(C)发送了一个消息Msg(N),则Msg(N)可能会在Msg(M)之前先到达Base(C)。因为A->B->C的延迟可能会比A->C的延迟大很多,从而出现这种乱序的现象。不过BigWorld的Channel里的所有消息应该都带上了版本号,估计会在可靠UDP里做一个消息排序,具体的需要对可靠UDP里的发包和收包实现做研究。
Ghost管理
在Cell::offloadEntity中直接将当前real_entity从当前的CellSpace::realEntities_集合中删除,然后通过Entity::offload将当前real_entity的全部数据执行打包,然后再将当前Entity切换为ghost_entity,加入到CellSpace:
/**
* This method converts a real entity into a ghost entity.
*
* @param pStream The stream to write the conversion data to.
* @param pChannel The application the real entity is being moved to. If
* NULL, the real entity is being destroyed.
* @param isTeleport Indicates whether this is a teleport.
*
* @see offload
* @see destroy
*/
void Entity::convertRealToGhost( BinaryOStream * pStream,
CellAppChannel * pChannel, bool isTeleport )
{
MF_ASSERT( this->isReal() );
MF_ASSERT( !pRealChannel_ );
Entity::callbacksPermitted( false );
Witness * pWitness = this->pReal()->pWitness();
if (pWitness != NULL)
{
pWitness->flushToClient();
}
if (pChannel != NULL)
{
// Offload the entity if we have a pChannel to the next real.
MF_ASSERT( pStream != NULL );
this->writeRealDataToStream( *pStream, isTeleport );
pRealChannel_ = pChannel;
// Once the real is created on the other CellApp, it will send a
// ghostSetReal back to this app, so we better be ready for it.
nextRealAddr_ = pRealChannel_->addr();
// Delete the real part (includes decrementing refs of haunts
// and notifying haunts of our nextRealAddr_)
this->offloadReal();
}
else
{
// Delete the real part (includes decrementing refs of haunts)
// as we're being destroyed.
this->destroyReal();
}
MF_ASSERT( !this->isReal() );
// make it a ghost script
//this->pType()->convertToGhostScript( this );
// .. by dropping all the properties of the real
MF_ASSERT( properties_.size() == pEntityType_->propCountGhostPlusReal() );
for (uint i = pEntityType_->propCountGhost(); i < properties_.size(); ++i)
{
if (properties_[i])
{
pEntityType_->propIndex(i)->dataType()->detach( properties_[i] );
}
}
properties_.erase( properties_.begin() + pEntityType_->propCountGhost(),
properties_.end() );
this->relocated();
Entity::callbacksPermitted( true );
}
在打包好real_entity的数据之后,本地的Entity的属性系统中就不再需要存储非ghost_entity数据了,因此会调用properties_.erase将这些不再需要的属性执行删除,这样就完成了real_entity到ghost_entity的转换。中间还有pEntity->callback( "onLeavingCell" )和 pEntity->callback( "onLeftCell" )来通过事件系统来通知其他逻辑这个Real-Ghost的状态改变。
介绍完了real_entity的迁移之后,我们再来看EntityGhostMaintainer::check剩余部分对ghost_entity的管理,markHaunts负责把所有现有的ghost_entity都标记为需要删除,然后这在EntityGhostMaintainer::createOrUnmarkRequiredHaunts中对需要存在的ghost_entity取消标记, 最后通过deleteMarkedHaunts对仍然带有删除标记的ghost_entity真正执行删除:
/**
* This method checks through this entity's ghosts and checks whether the real
* entity needs to be offloaded elsewhere.
*/
void EntityGhostMaintainer::check()
{
if (this->cell().shouldOffload())
{
this->checkEntityForOffload();
}
// We mark all the haunts for this entity, and then we unmark all the valid
// ones. The invalid ghosts are left marked and are deleted.
bool doesOffloadDestinationHaveGhost = this->markHaunts();
this->createOrUnmarkRequiredHaunts();
MF_ASSERT( (pOffloadDestination_ == NULL) ||
doesOffloadDestinationHaveGhost ||
(numGhostsCreated_ == 1) );
this->deleteMarkedHaunts();
}
createOrUnmarkRequiredHaunts这个函数的逻辑实现比较简单,就是根据当前的位置构造出一个特定半径的矩形,这个半径为由三个部分组成:
CellAppConfig::ghostDistance()默认的GhostRadiuspEntity_->pType()->description().appealRadius()这个实体类型所带的一个额外影响半径GHOST_FUDGE这个是一个额外的容差半径, 主要是为了避免AABB相交计算时的一些浮点比较判定的误差,
/**
* This method evaluates each cell in the space's tree for suitability for
* adding a ghost to that cell for the given entity. It creates a ghost if
* one is required but not present, and leaves the channel unmarked.
*
* If a ghost exists and the cell is still a suitable haunt for the entity,
* then the haunt's channel is unmarked.
*
* Those cells that no longer require a ghost for the given entity are left
* alone (they should be marked for removal).
*
*/
void EntityGhostMaintainer::createOrUnmarkRequiredHaunts()
{
// TODO: Make this configurable.
static const float GHOST_FUDGE = 20.f;
const Vector3 & position = pEntity_->position();
// Find all the haunts that we should have.
BW::Rect interestArea( position.x, position.z, position.x, position.z );
// Entities with an appeal raidus have to ghost more
interestArea.inflateBy( CellAppConfig::ghostDistance() +
pEntity_->pType()->description().appealRadius() );
hysteresisArea_ = interestArea;
interestArea.inflateBy( GHOST_FUDGE );
pEntity_->space().visitRect( interestArea, *this );
}
构造完这个矩形之后, 遍历当前space下所有与这个矩形相交的CellSpace,检查其是否需要创建ghost_entity:
/**
* Override from CellInfoVisitor.
*/
void EntityGhostMaintainer::visit( CellInfo & cellInfo )
{
const Mercury::Address & remoteAddress = cellInfo.addr();
// discard it if it is ourself
if (remoteAddress == ownAddress_)
{
return;
}
if (cellInfo.isDeletePending())
{
// Do not have ghosts on cells that are about to be deleted.
return;
}
// If it has been marked as an existing haunt then unmark it and bail.
CellAppChannel & channel = *CellAppChannels::instance().get(
remoteAddress );
if (channel.mark() == 1)
{
channel.mark( 0 );
return;
}
// Do not create a ghost if we are about to be offloaded. Let the
// destination do this. This helps with not creating CellAppChannels
// unnecessarily and also helps prevent race conditions. We still create
// the ghost on the destination cell.
if (pOffloadDestination_ && pOffloadDestination_->addr() != remoteAddress)
{
return;
}
// and if we are not far enough in then toss it too (hysteresis check)
if (!cellInfo.rect().intersects( hysteresisArea_ ))
{
return;
}
// Otherwise we should create a new ghost.
pEntity_->pReal()->addHaunt( channel );
pEntity_->createGhost( channel.bundle() );
++numGhostsCreated_;
}
这里可以看出,如果当前Entity已经标记了准备迁移的情况下,就不会在非迁移目标里创建ghost_entity。同时如果迁移目标里还没有ghost_entity,则会新建一个。由于Cell::sendOffloads是在所有的real_entity的EntityGhostMaintainer::check都执行完成之后才执行的,所以这样就保证了要迁移的目标CellSpace一定会有当前real_entity的ghost_entity, 所以会有下面的Assert:
MF_ASSERT( (pOffloadDestination_ == NULL) ||
doesOffloadDestinationHaveGhost ||
(numGhostsCreated_ == 1) );
对于所有仍然带有删除标记的ghost_entity,在deleteMarkedHaunts会执行删除操作,不过这里执行删除的条件并不仅仅是这个标记位,还有其他的考虑条件:
- 单次删除的
ghost_entity数量有上限 - 如果一个
ghost_entity的存在时间短于MINIMUM_GHOST_LIFESPAN,则不会被删除 - 如果这个
real_entity的存在时间短于NEW_REAL_KEEP_GHOST_PERIOD_IN_SECONDS,则不会被删除 - 如果是迁移目标的
ghost_entity,则也不会被删除
/**
* This method removes ghosts on haunts that have their channels marked. All
* the required haunts would have had their channels unmakred in
* createOrUnmarkRequiredHaunts().
*
* There are criteria for when a ghost should not be deleted:
*
* * Each iteration of the offload checker has a maximum number of ghosts that
* can be deleted (configurable).
* * A ghost will not be deleted if it is a new ghost (configurable).
* * A ghost will not be deleted if the real entity has been created recently
* (2 seconds).
*
*/
void EntityGhostMaintainer::deleteMarkedHaunts()
{
static const int NEW_REAL_KEEP_GHOST_PERIOD_IN_SECONDS = 2;
const GameTime NEW_REAL_KEEP_GHOST_PERIOD =
NEW_REAL_KEEP_GHOST_PERIOD_IN_SECONDS * CellAppConfig::updateHertz();
const GameTime MINIMUM_GHOST_LIFESPAN =
CellAppConfig::minGhostLifespanInTicks();
const GameTime gameTime = CellApp::instance().time();
real_entity * pReal = pEntity_->pReal();
real_entity::Haunts::iterator iHaunt = pReal->hauntsBegin();
while (iHaunt != pReal->hauntsEnd())
{
real_entity::Haunt & haunt = *iHaunt;
CellAppChannel & channel = haunt.channel();
const bool shouldDelGhost =
// Too many ghosts deleted in this iteration.
offloadChecker_.canDeleteMoreGhosts() &&
// Keep the ghost if we're offloading there.
(&channel != pOffloadDestination_) &&
// Keep the ghost if the real entity is new.
(gameTime - pReal->creationTime() > NEW_REAL_KEEP_GHOST_PERIOD) &&
// Keep the ghost if the ghost is new.
(gameTime - haunt.creationTime() > MINIMUM_GHOST_LIFESPAN);
if (channel.mark() && shouldDelGhost)
{
// only bother telling it if it hasn't failed
if (channel.isGood())
{
pReal->addDelGhostMessage( channel.bundle() );
offloadChecker_.addDeletedGhost();
}
iHaunt = pReal->delHaunt( iHaunt );
}
else
{
++iHaunt;
}
// always clear the mark for the next user
channel.mark( 0 );
}
}