BigWorld 的 Entity 通信

bigworld这种分布式大世界架构相对于常规的MMO服务器来说通信复杂度上升了很多。在常规的MMO中可能只需要维护好一个entity跨进程迁移时的消息投递,在分布式大世界中entity之间的消息投递由于real/ghost的存在变得复杂了很多。为了减少逻辑上处理这种分布式大世界的通信代码,Bigworld在框架上提供了很多机制,来隐藏底层的通信拓扑。下面我们将对框架层的一些封装进行拆解,以了解其所需要解决的具体问题。

RealEntity的数据下发流程

bigworld的设计中,能向客户端发送信息的只有这个客户端对应playerRealEntity。向自身客户端发送消息的入口在Entity::sendToClient这个函数中:

/**
 *	This method is exposed to scripting. It should only be called by avatars
 *	that have an associated client. It sends a message to that client.
 *	Assumes that 'args' are valid according to the MethodDescription.
 *
 *	@param description	The description of the method to send.
 *	@param argStream	A MemoryOStream containing the destination entity
 *						ID and arguments for the method.
 *	@param isForOwn		A boolean flag indicates whether message should be
 *						sent to the client associated with the entity.
 *	@param isForOthers	A boolean flag indicates whether message should be
 *						send to all other clients apart from the client
 *						associated with the entity.
 *	@param isExposedForReply
 *						This determines whether the method call will be
 *						recorded, if a recording is active.
 */
bool Entity::sendToClient( EntityID entityID,
		const MethodDescription & description, MemoryOStream & argStream,
		bool isForOwn, bool isForOthers,
		bool isExposedForReplay )
{
	if (pReal_ == NULL)
	{
		return false;
	}

	if (isForOthers)
	{
		g_publicClientStats.trackEvent( pEntityType_->name(),
			description.name(), argStream.size(),
			description.streamSize( true ) );

		pReal_->addHistoryEvent( description.exposedMsgID(), argStream,
				description, description.streamSize( true ),
				description.priority() );
	}

	if (isForOwn)
	{
		if (pReal_->pWitness() == NULL)
		{
			return false;
		}

		argStream.rewind();
		g_privateClientStats.trackEvent( pEntityType_->name(),
			description.name(), argStream.size(),
			description.streamSize( true ) );
		description.stats().countSentToOwnClient( argStream.size() );
		pEntityType_->stats().countSentToOwnClient( argStream.size() );

		pReal_->pWitness()->sendToClient( entityID,
			description.exposedMsgID(), argStream,
			description.streamSize( true ) );
	}

	if (isExposedForReplay && this->cell().pReplayData())
	{
		argStream.rewind();
		this->cell().pReplayData()->addEntityMethod( *this, description,
			argStream );
	}

	return true;
}

  1. 如果这里的isForOtherstrue,则代表这个消息是其他entity的状态变化通过aoi机制广播到当前的entity上了,这里会通过RealEntityaddHistoryEvent接口来处理。这个接口的具体实现这里先不讲,将在后面的aoi同步小节中介绍。
  2. 如果这里的isForOwntrue,则代表这个消息处理的是自身entity的状态改变,这里会使用RealEntity上的一个Witness结构来封装sendToClient调用,这个Witness结构等下我们会重点介绍。
  3. 如果这里的isExposedForReplaytrue,代表这个消息需要通知观战系统,这部分的内容我们将不会涉及

这个pReal_->pWitness()的作用等价于获取当前entity的客户端通信地址,pReal_->pWitness()->sendToClient作用就类似于mosaic_game中的call_client。其实Witness的作用远比一个简单的call_anchor大,他负责了所有需要往客户端发送的数据,大头是其他entity通过aoi传递过来的状态变化:

/**
 *	This class is a witness to the movements and perceptions of a RealEntity.
 *	It is created when a client is attached to this entity. Its main activity
 *	centres around the management of an Area of Interest list.
 */
class Witness : public Updatable
{
	// 省略很多代码
}

不过我们这个小节只关注非aoi同步的部分,因此先暂时聚焦在sendToClient这个接口:

/**
 *	This helper method is used to send data to the client associated with this
 *	object.
 */
bool Witness::sendToClient( EntityID entityID, Mercury::MessageID msgID,
		MemoryOStream & stream, int msgSize )
{
	Mercury::Bundle & bundle = this->bundle();

	int oldSize = bundle.size();

	if (!this->selectEntity( bundle, entityID ))
	{
		return false;
	}

	bundle.startMessage( BaseAppIntInterface::sendMessageToClient );
	bundle << msgID;
	MF_ASSERT( msgSize < 0 || msgSize == stream.size() );
 	bundle << (int16) msgSize;
	bundle.transfer( stream, stream.size() );

	int messageSize = bundle.size() - oldSize;

	bandwidthDeficit_ += messageSize;

	return true;
}

可以看出这个接口其实就是在构造一个sendMessageToClient的消息类型,内部会包裹传递过来的各种逻辑消息。而这里的boundle则是一个绑定在RealEntity上的一个channel里的临时待发送数据:

/**
 *	This method returns a reference to the next outgoing bundle destined
 *	for the proxy.
 */
Mercury::Bundle & Witness::bundle()
{
	return real_.channel().bundle();
}

每次调用这个sendToClient的时候,都只是将所要发送的数据添加到这个临时bundle里,并不触发直接发送。只有在flushToClient的时候才会真正的执行发送操作,这个flushToClient的调用时机则是每帧的末尾:

/**
 *	This method sends the bundle to the client associated with this entity.
 */
void Witness::flushToClient()
{
	// Tell the BaseApp to send to the client.
	this->bundle().startMessage( BaseAppIntInterface::sendToClient );

	g_downstreamBytes += this->bundle().size();
	g_downstreamPackets += this->bundle().numDataUnits();
	++g_downstreamBundles;

	// Send bundle via the channel
	real_.channel().send();
}
/**
 *	This method is called regularly to send data to the witnesses associated
 *	with this entity.
 */
void Witness::update()
{
	SCOPED_PROFILE( CLIENT_UPDATE_PROFILE );
	AUTO_SCOPED_ENTITY_PROFILE( &entity_ );
	// 省略很多代码 只保留最后的几行
	{
		AUTO_SCOPED_PROFILE( "updateClientSend" );

		this->flushToClient();
	}

	// Tell the proxy that anything else we send is from next tick
	BaseAppIntInterface::tickSyncArgs::start( this->bundle() ).tickByte =
		(uint8)(CellApp::instance().time() + 1);
}

现在我们来跟踪一下这个real_.channel到底是什么:

/**
 *	The constructor for RealEntity.
 *
 *	@param owner			The entity associated with this object.
 */
RealEntity::RealEntity( Entity & owner ) :
		entity_( owner ),
		pWitness_( NULL ),
		removalHandle_( NO_ENTITY_REMOVAL_HANDLE ),
		velocity_( 0.f, 0.f, 0.f ),
		positionSample_( owner.position() ),
		positionSampleTime_( CellApp::instance().time() ),
		creationTime_( CellApp::instance().time() ),
		shouldAutoBackup_( AutoBackupAndArchive::YES ),
		pChannel_(
			new Mercury::UDPChannel( CellApp::instance().interface(),
				owner.baseAddr(),
				Mercury::UDPChannel::INTERNAL,
				DEFAULT_INACTIVITY_RESEND_DELAY,
				/* filter: */ NULL,
				Mercury::ChannelID( owner.id() ) ) ),
		recordingSpaceEntryID_()
{
	++g_numRealEntities;
	++g_numRealEntitiesEver;

	pChannel_->isLocalRegular( false );
	pChannel_->isRemoteRegular( false );

	controlledBy_.init();
}

从这个RealEntity的构造函数可以看出,当前的pChannel_是一个基于可靠UDP实现的UDPChannel,这个Channel的投递地址是owner.baseAddr,执行send的时候就是把之前构造的数据全都投递到owner.baseAddr。我们通过全局搜索发出的消息包格式BaseAppIntInterface::sendMessageToClientBaseAppIntInterface::sendToClient定位到这些消息的接收者是BaseApp里的Proxy类型:

/*~ class BigWorld.Proxy
 *	@components{ base }
 *
 *	The Proxy is a special type of Base that has an associated Client. As such,
 *	it handles all the server updates for that Client. There is no direct script
 *	call to create a Proxy specifically.
 *
 */

/**
 *	This class is used to represent a proxy. A proxy is a special type of base.
 *	It has an associated client.
 */
class Proxy: public Base
{
	void sendMessageToClient( const Mercury::Address & srcAddr,
			Mercury::UnpackedMessageHeader & header,
			BinaryIStream & data );
	void sendToClient();
}

这里的sendMessageToClient消息的处理就是在执行了当前是否还有客户端的过滤之后,将这个消息转移到内部的pOutput这个输出缓冲区去,这里不会触发消息的真正发送:

/**
 *	This method handles a script message that should be forwarded to the client.
 */
void Proxy::sendMessageToClient( const Mercury::Address & srcAddr,
		Mercury::UnpackedMessageHeader & header,
		BinaryIStream & data )
{
	this->sendMessageToClientHelper( data, /*isReliable:*/ true );
}

/**
 *	This method handles a script message that should be forwarded to the client.
 */
void Proxy::sendMessageToClientUnreliable( const Mercury::Address & srcAddr,
		Mercury::UnpackedMessageHeader & header,
		BinaryIStream & data )
{
	this->sendMessageToClientHelper( data, /*isReliable:*/ false );
}

/**
 *	This method forwards this message to the client.
 */
void Proxy::sendMessageToClientHelper( BinaryIStream & data, bool isReliable )
{
	AUTO_SCOPED_THIS_ENTITY_PROFILE;

	if (this->hasOutstandingEnableWitness())
	{
		// Do nothing. It's for an old client.
		data.finish();
		return;
	}

	Mercury::MessageID msgID;
	data >> msgID;

	if (!this->hasClient())
	{
		WARNING_MSG( "Proxy::sendMessageToClientHelper(%u): "
				"No client. Cannot forward msgID %d\n",
			id_, msgID );

		data.finish();
		return;
	}

	int16 msgStreamSize;
	data >> msgStreamSize;
	MF_ASSERT( msgStreamSize < 0 || msgStreamSize == data.remainingLength() );

	BinaryOStream * pOutput = this->getStreamForEntityMessage(
		msgID, msgStreamSize, isReliable );

	MF_ASSERT( pOutput != NULL );

	pOutput->transfer( data, data.remainingLength() );
}

这里的getStreamForEntityMessage作用其实根据当前消息是否是可靠的消息将下发数据分别封装为Mercury::RELIABLE_DRIVERMercury::RELIABLE_NO类型。通过sendMessageToClientHelper接口发送的消息全都默认为Reliable的,通过sendMessageToClientUnreliable全都是非Reliable的。这里的是否Reliable主要处理的是UDP消息丢失时是否需要重传,这里就不去探究这个按需可靠UDP的具体实现了:

/**
 *  This method gets a suitable stream to the client for an entity property
 *	update or method call.
 *
 *	@param messageID	The messageID of the client-side event
 *	@param messageStreamSize 	The fixed size of the message or -1 if variable.
 *
 *	@return A BinaryOStream* to which the message can be written
 *		or NULL if no client is attached to this proxy.
 */
BinaryOStream * Proxy::getStreamForEntityMessage( Mercury::MessageID msgID,
	int methodStreamSize, bool isReliable /* = true */ )
{
	AUTO_SCOPED_THIS_ENTITY_PROFILE;

	if (!this->hasClient())
	{
		return NULL;
	}

	int8 lengthStyle = Mercury::FIXED_LENGTH_MESSAGE;
	int lengthParam = methodStreamSize;

	if (methodStreamSize < 0)
	{
		lengthStyle = Mercury::VARIABLE_LENGTH_MESSAGE;
		lengthParam = -methodStreamSize;
	}

	Mercury::InterfaceElement ie( "entityMessage", msgID,
			lengthStyle, lengthParam );

	Mercury::Bundle & bundle = this->clientBundle();

	bundle.startMessage( ie,
		(isReliable ? Mercury::RELIABLE_DRIVER : Mercury::RELIABLE_NO) );

	return &bundle;
}

而消息的真正发送,则依赖于Withness::flushToClient里构造的一个空消息sendToClient,在Proxy::sendToClient的处理时才会执行发送:

/**
 *	This message is the cell telling us that it has now sent us all the
 *	updates for the given tick, and we should forward them on to the client.
 */
void Proxy::sendToClient()
{
	// Do nothing. It's for an old client.
	if (!this->hasOutstandingEnableWitness())
	{
		this->sendBundleToClient();
	}
}

/**
 *	This method sends any messages queued by the internal interface (for the
 *	external interface). Returns true if the send occurred (or was attempted
 *	and failed) and the bundle was flushed.
 *
 *	@param irregular True if this send is not a regular, periodic send.
 */
bool Proxy::sendBundleToClient( bool expectData )
{
	// 省略开头检查是否有绑定客户端的代码
	Mercury::Bundle & bundle = pClientChannel_->bundle();

	avgClientBundleDataUnits_.sample( bundle.numDataUnits() );

	if (avgClientBundleDataUnits_.average() >=
			CLIENT_BUNDLE_DATA_UNITS_THRESHOLD)
	{
		// Complain only if enough bundles sent recently have been
		// multi-data-unit.
		WARNING_MSG( "Proxy::sendBundleToClient: "
				"Client %u has consistently sent multiple data units per "
				"update (moving average = %.01f)\n",
			id_,
			avgClientBundleDataUnits_.average() );
	}

	// see if we can squeeze any more data on the bundle
	this->addOpportunisticData( &bundle );

	// now actually send the packet!
	pClientChannel_->send();

	// 省略后面的一些延迟计算与时钟同步的代码
}

这里我们心满意足的看到了期望看到的pClientChannel_->send,也就是最终的消息发送的Channel。不过在send之前还会调用一个奇怪的函数addOpportunisticData,其作用是在这个bundle的数据量比较小的情况下将一些文件下载相关的数据分批的插入到当前需要下发的bundle中去。因为文件下载这种通信优先级不高,但是数据量一般来说就很大,直接在一个bundle中发送很容易占满发往客户端的下行带宽,导致后续的数据要延迟发送。而且整体发下去的话对于UDP的可靠性要求很高,很容易触发整包的重传。为了避免抢占到客户端之间的下行带宽,只能采取这个每帧检查夹带的形式发送下去。每一帧能夹带下去的数据大小计算处理的非常复杂,这里就不去贴这个函数的实现代码了,有兴趣的可以去看看。

RealEntity的数据下发流程

RealEntity的消息投递流程

在上一节内容中我们完整的介绍了RealEntity是如何往客户端发送消息的,重点是通过一个不会移动固定地址的对象Proxy来处理中转和流控。反过来客户端往RealEntity发送消息也需要通过Proxy,但是此时由于RealEntity是会迁移的,所以ProxyRealEntity投递消息的的流程就比RealEntityProxy投递消息的流程复杂很多。因为这里涉及到一个在分布式大世界里一个非常关键的问题,如何向一个可迁移的RealEntity发送消息。这里的RealEntity不仅仅包括我们之前提到过的玩家对象,也包括场景里可能创建的怪物、NPC等其他可迁移角色。BigWorld里为了解决这个问题引入了Base对象,之前提到的Proxy对象就是Base对象的直接子类。每个RealEntity都会有一个Base对象,在这个Base对象上会维护好对应的RealEntity的最新通信地址,所以这个Base对象的作用与我们在mosaic_game中创立的relay_entity作用是差不多的。接下来我们来重点剖析一下这个最新通信地址的维护过程。

根据我们之前对玩家进入场景的流程分析,我们可以知道任何一个玩家对应的RealEntity的第一次创建都是由其Proxy对象发起的,创建信息里会带上当前Proxy对象的通信地址。对于非玩家RealEntity来说,这个创建流程依然成立,只不过当前之前的Proxy对象是登录成功之后创建的,而现在的Base对象则是通过脚本逻辑调用DBAPP上的AutoLoadingEntityHandler来触发的:

/**
 *	Constructor.
 */
AutoLoadingEntityHandler::AutoLoadingEntityHandler( EntityTypeID typeID,
		DatabaseID dbID, EntityAutoLoader & mgr ) :
	state_(StateInit),
	ekey_( typeID, dbID ),
	createBaseBundle_(),
	mgr_( mgr ),
	isOK_( true )
{}


/**
 *	Start auto-loading the entity.
 */
void AutoLoadingEntityHandler::autoLoad()
{
	// Start create new base message even though we're not sure entity exists.
	// This is to take advantage of getEntity() streaming properties into the
	// bundle directly.
	DBApp::prepareCreateEntityBundle( ekey_.typeID, ekey_.dbID,
		Mercury::Address( 0, 0 ), this, createBaseBundle_ );

	// Get entity data into bundle
	DBApp::instance().getEntity( ekey_, &createBaseBundle_, false, *this );
	// When getEntity() completes onGetEntityCompleted() is called.
}

脚本逻辑负责填充好这个非玩家Entity在数据库中的唯一索引ekey,然后通过DBApp::prepareCreateEntityBundle这个接口来出发Entity数据的加载,并在加载完成之后向BaseAppMgr发起一个CreateEntity的请求。之前在解析玩家登录的时候,已经详细的提到了DBApp::prepareCreateEntityBundle这个接口的后续详细流程,所以这里就不再展开。后续流程里与之前的玩家登录流程很不一样的点就是创建的BasePtr指向的不再是Proxy类型,而是Base类型。

BasePtr pBase = this->createBaseFromStream( entityID, data );

BasePtr被创建好之后,脚本逻辑那边就可以手动的调用之前提到的玩家进入场景的接口,即向CellAppMgr发起一个createEntity的调用,参数里填充好Base对象的通信地址。这样当对应的RealEntity被创建的时候,同样的会触发下面这个函数的执行:

void Cell::addRealEntity( Entity * pEntity, bool shouldSendNow )

这个函数的完整流程我们在玩家进入场景流程里已经分析过了,最终会发起一个Base::currentCellRPC来通知当前RealEntity的最新通信地址:

/**
 *	This method is used to inform the base that the cell we send to has changed.
 */
void Base::currentCell( const Mercury::Address & srcAddr,
			const Mercury::UnpackedMessageHeader & header,
			const BaseAppIntInterface::currentCellArgs & args )
{
	this->setCurrentCell( args.newSpaceID, args.newCellAddr,
			&srcAddr );
}

至此,对于任何RealEntity,在其第一次进入场景时,都会有一个不可迁移的Base对象被创建,同时Base对象里会得到当前RealEntity创建时的通信地址。

由于Base对象是不会移动的,所以任何Entity想要往一个执行的RealEntity发送消息时,并不需要知道这个RealEntity的最新地址,只需要通过某种途径知道对应的Base的通信地址即可, 这个通信地址会被封装为一个CellViaBaseMailBox的对象:

// -----------------------------------------------------------------------------
// Section: CellViaBaseMailBox
// -----------------------------------------------------------------------------

/**
 *	This class is used to create a mailbox to a cell entity. Traffic for the
 *	entity is sent via the base entity instead of directly to the cell entity.
 *	This means that these mailboxes do not have the restrictions that normal
 *	cell entity mailboxes have.
 */
class CellViaBaseMailBox : public CommonBaseEntityMailBox
{
	Py_Header( CellViaBaseMailBox, CommonBaseEntityMailBox )

	public:
		CellViaBaseMailBox( EntityTypePtr pBaseType,
					const Mercury::Address & addr, EntityID id,
					PyTypeObject * pType = &s_type_ ):
			CommonBaseEntityMailBox( pBaseType, addr, id, pType )
		{}

		~CellViaBaseMailBox() { }

		virtual ScriptObject pyGetAttribute( const ScriptString & attrObj );
		virtual BinaryOStream * getStream( const MethodDescription & methodDesc,
			std::auto_ptr< Mercury::ReplyMessageHandler > pHandler );
		virtual EntityMailBoxRef::Component component() const;
		virtual const MethodDescription * findMethod( const char * attr ) const;
};

任意往这个CellViaBaseMailBox里发送的RPC都会被额外包裹一层新的RPC BaseAppIntInterface::callCellMethod, 原有的RPC数据则是作为这个外层RPC的参数来填充,同时开头会填充一下内层RPC的名字索引internalIndex:

BinaryOStream * CellViaBaseMailBox::getStream(
		const MethodDescription & methodDesc,
		std::auto_ptr< Mercury::ReplyMessageHandler > pHandler )
{
	Mercury::Bundle & bundle = this->bundle();

	// Not supporting return values
	if (pHandler.get())
	{
		PyErr_Format( PyExc_TypeError,
				"Cannot call two-way method '%s' from CellApp",
				methodDesc.name().c_str() );
		return NULL;
	}

	bundle.startMessage( BaseAppIntInterface::callCellMethod );
	bundle << methodDesc.internalIndex();

	return &bundle;
}

Base接收到这个BaseAppIntInterface::callCellMethod远程调用之后,就直接开始转发工作:

/**
 *  This method handles a message from a CellViaBaseMailBox. It calls
 *  the target method on the cell entity.
 */
void Base::callCellMethod( const Mercury::Address & srcAddr,
		   const Mercury::UnpackedMessageHeader & header,
		   BinaryIStream & data )
{

	if (pCellEntityMailBox_ == NULL)
	{
		// 省略一些RealEntity不存在的报错处理
		return;
	}

	MethodIndex methodIndex;
	data >> methodIndex;

	const MethodDescription * pDescription =
			this->pType()->description().cell().internalMethod( methodIndex );

	if (pDescription != NULL)
	{
		std::auto_ptr< Mercury::ReplyMessageHandler > pReplyHandler;

		if (header.replyID != Mercury::REPLY_ID_NONE)
		{
			pReplyHandler.reset( new TwoWayMethodForwardingReplyHandler(
					srcAddr, header.replyID ) );
		}

		BinaryOStream * pBOS = pCellEntityMailBox_->getStream( *pDescription,
				pReplyHandler );

	    if (pBOS == NULL)
		{
			// 省略一些错误处理
		}

		pBOS->transfer( data, data.remainingLength() );
		pCellEntityMailBox_->sendStream();
	}
	else
	{
		ERROR_MSG( "Base::callCellMethod(%u): "
					"Invalid method index (%d) on cell.\n", id_, methodIndex );

		sendTwoWayFailure( "BWInternalError", "Invalid method index",
				header.replyID, srcAddr );
	}
}

这里的callCellMethod会将内层的RPC解析出来,如果发现这个RPC需要Reply的话,这里还需要创建一个TwoWayMethodForwardingReplyHandler。 这个TwoWayMethodForwardingReplyHandler的作用就是如果RealEntity接收到这个消息并发起了Reply,这个Reply会直接发送到当前的Base上,然后Base再将这个Reply转发到原始RPC的发起者,起到了一个Reply的中转作用。

消息转发的时候使用的是pCellEntityMailBox_,这个也比较特殊,其内部会存储对应的RealEntityEntityId

/**
 *	This class implements a mailbox that can send to a server object. This
 *	object may be on a cell or may be a base.
 *
 *	@see CellEntityMailBox
 *	@see BaseEntityMailBox
 */
class ServerEntityMailBox: public PyEntityMailBox
{
	// 省略很多代码
	Mercury::Address			addr_;
	EntityID					id_;

	EntityTypePtr				pLocalType_;
};
/**
 *	This class is common to all mailboxes that send to the cell entity or via
 *	the cell entity.
 */
class CommonCellEntityMailBox : public ServerEntityMailBox
{
	// 省略很多代码
};
/**
 *	This class implements a mailbox that can send to an object on a cell.
 */
class CellEntityMailBox: public CommonCellEntityMailBox
{
	// 省略很多代码
};

当执行pCellEntityMailBox_->getStream这行代码时,调用链为ServerEntityMailBox::getStream通过CellEntityMailBox::getStreamEx中转到CommonCellEntityMailBox::getStreamCommon, 这个CommonCellEntityMailBox::getStreamCommon会在外层额外的包一层CellAppInterface::runScriptMethod这个RPC,同时会主动将对应RealEntityid填到数据的开头:

/**
 *	This method gets the stream to send a remote method call on.
 */
BinaryOStream * ServerEntityMailBox::getStream(
					const MethodDescription & methodDesc,
					std::auto_ptr< Mercury::ReplyMessageHandler > pHandler )
{
	if (!MainThreadTracker::isCurrentThreadMain())
	{
		ERROR_MSG( "ServerEntityMailBox::getStream: "
				"Cannot get stream in background thread for %s mailbox\n",
			this->componentName() );
		PyErr_Format( PyExc_TypeError,
			"Cannot get stream in background thread for %s mailbox\n",
			this->componentName() );
		return NULL;
	}

	return this->getStreamEx( methodDesc, pHandler );
}
/**
 *	This method gets a stream to send a message to the cell on.
 */
BinaryOStream * CellEntityMailBox::getStreamEx(
	const MethodDescription & methodDesc,
	std::auto_ptr< Mercury::ReplyMessageHandler > pHandler )
{
	return this->getStreamCommon( methodDesc,
		CellAppInterface::runScriptMethod, pHandler );
}

/**
 *	This method is used by derived classes to the initial part of their
 *	getStream methods.
 */
BinaryOStream * CommonCellEntityMailBox::getStreamCommon(
		const MethodDescription & methodDesc, 
		const Mercury::InterfaceElement & ie,
		std::auto_ptr< Mercury::ReplyMessageHandler > pHandler )
{
	Mercury::UDPChannel * pChannel = this->pChannel();

	if (!pChannel)
	{
		return NULL;
	}

	Mercury::Bundle & bundle = pChannel->bundle();

	if (pHandler.get())
	{
		bundle.startRequest( ie, pHandler.release() );
	}
	else
	{
		bundle.startMessage( ie );
	}

	bundle << id_;
	bundle << methodDesc.internalIndex();

	return &bundle;
}

执行pBOS->transfer( data, data.remainingLength() );就相当于把原本的RPC数据添加到当前CellAppInterface::runScriptMethod的参数之后。当CellApp接收到一个runScriptMethod的远程调用时,会将请求中转到RawEntityVarLenMessageHandler上去处理:

// cellapp_interface.cpp

#define MF_RAW_VARLEN_ENTITY_MSG( NAME, IS_REAL_ONLY )						\
	MERCURY_HANDLED_VARIABLE_MESSAGE( NAME, 2, 								\
			RawEntityVarLenMessageHandler,									\
			std::make_pair( &Entity::NAME, IS_REAL_ONLY) )

// Message to run cell script.
MF_RAW_VARLEN_ENTITY_MSG( runScriptMethod, REAL_ONLY )

这个RawEntityVerLenMessageHandler继承自EntityMessageHandler,其处理RPC调用的实现在EntityMessageHandler::handleMessage中:

/*
 *	Override from InputMessageHandler.
 */
void EntityMessageHandler::handleMessage( const Mercury::Address & srcAddr,
	Mercury::UnpackedMessageHeader & header,
	BinaryIStream & data )
{
	EntityID entityID;
	data >> entityID;

	this->handleMessage( srcAddr, header, data, entityID );
}

这里会首先解析出投递目标RealEntityEntityID,然后再调用handleMessage第二个变体:

/**
 *	This method handles this message. It is called from the InputMessageHandler
 *	override and from handling of buffered messages.
 */
void EntityMessageHandler::handleMessage( const Mercury::Address & srcAddr,
	Mercury::UnpackedMessageHeader & header,
	BinaryIStream & data,
	EntityID entityID )
{
	CellApp & app = ServerApp::getApp< CellApp >( header );
	Entity * pEntity = app.findEntity( entityID );

	AUTO_SCOPED_ENTITY_PROFILE( pEntity );

	BufferedGhostMessages & bufferedMessages = app.bufferedGhostMessages();

	bool shouldBufferGhostMessage =
		!pEntity ||
		pEntity->shouldBufferMessagesFrom( srcAddr ) ||
		bufferedMessages.isDelayingMessagesFor( entityID, srcAddr );

	bool isForDestroyedGhost = false;
	// 省略大部分的异常处理代码
	if(isForDestroyedGhost)
	{
		// 省略大部分的异常处理代码
	}
	else
	{
		this->callHandler( srcAddr, header, data, pEntity );
	}
}

这个变体里有很多细节分支,来处理各种情况,我们目前只关注无异常的情况,即我们通过EntityID找到了这个RealEntity,此时会调用callHandler:

/**
 *	Objects of this type are used to handle variable length messages destined
 *	for an entity. This also passes the source address and header to the
 *	handling function.
 */
class RawEntityVarLenMessageHandler : public EntityMessageHandler
{
public:
	/**
	 *	This type is the function pointer type that handles the incoming
	 *	message.
	 */
	typedef void (Entity::*Handler)( const Mercury::Address & srcAddr,
		const Mercury::UnpackedMessageHeader & header,
		BinaryIStream & stream );

	/**
	 *	Constructor.
	 */
	RawEntityVarLenMessageHandler( std::pair<Handler, EntityReality> args ) :
		EntityMessageHandler( args.second ),
		handler_( args.first )
	{}

private:
	virtual void callHandler( const Mercury::Address & srcAddr,
		Mercury::UnpackedMessageHeader & header,
		BinaryIStream & data, Entity * pEntity )
	{
		(pEntity->*handler_)( srcAddr, header, data );
	}

	Handler			handler_;
};

这里的RawEntityVarLenMessageHandler::callHandler实现非常简单,直接调用Entity上的runScriptMethod接口来处理,也就是这个接口,这里会将之前压入的methodID解析出来:

/**
 *	This method handles calls from other server component to run a method of
 *	this entity.
 */
void Entity::runScriptMethod( const Mercury::Address & srcAddr,
		const Mercury::UnpackedMessageHeader & header,
		BinaryIStream & data )
{
	uint16 methodID;
	data >> methodID;

	this->runMethodHelper( data, methodID, false, header.replyID, &srcAddr );
}

这个runMethodHelper则使用解析出来的methodID来查找对应的接口的pMethodDescription:

/**
 *	This method is used to run a method on this entity that has come from the
 *	network.
 *
 *	@param data			Contains the parameters for the method call.
 *	@param methodID		The index number of the method.
 *	@param isExposed	Whether the methodID refers to the exposed subset.
 */
void Entity::runMethodHelper( BinaryIStream & data, int methodID,
		bool isExposed, int replyID, const Mercury::Address * pReplyAddr )
{
	static ProfileVal localProfile( "scriptMessage" );
	START_PROFILE( localProfile );

	MF_ASSERT( Entity::callbacksPermitted() );

	EntityID sourceID = 0;
	const MethodDescription * pMethodDescription = NULL;

	if (isExposed)
	{
		data >> sourceID;

		const ExposedMethodMessageRange & range =
			BaseAppExtInterface::Range::cellEntityMethodRange;

		pMethodDescription =
			pEntityType_->description().cell().exposedMethodFromMsgID( methodID,
					data, range );
		// 忽略一些异常处理代码
		
	}
	else
	{
		pMethodDescription =
			pEntityType_->description().cell().internalMethod( methodID );

		if (pMethodDescription->isExposed())
		{
			data >> sourceID;
		}
	}

	if (pMethodDescription != NULL)
	{
		Entity::nominateRealEntity( *this );
		{
			if (pMethodDescription->isComponentised())
			{
				MF_ASSERT( pEntityDelegate_ != NULL );
				pEntityDelegate_->handleMethodCall( *pMethodDescription,
						data, sourceID );
			}
			else
			{
				SCOPED_PROFILE( SCRIPT_CALL_PROFILE );
				pMethodDescription->callMethod(
						ScriptObject( this, ScriptObject::FROM_BORROWED_REFERENCE ),
						data, sourceID, replyID, pReplyAddr,
						&CellApp::instance().interface() );
			}
		}
		Entity::nominateRealEntityPop();
	}
	else
	{
		// 省略一些异常处理代码
	}
}

当找到pMethodDescription之后,就会使用当前Entity所绑定的RealEntity来执行脚本函数的调用,这样完成的远程调用就执行完毕了,当然这里我们忽略了处理Reply的部分。

RealEntity的消息投递流程