BigWorld 的 RPC 实现

INTERFACE与RPC

Bigworld引擎中,每一种App所支持的RPC的声明需要集中在一个地方声明,一般都放在xxxApp_Interface.hpp文件里。在这个头文件中,会声明一个XXXAppInterface的名字空间,然后在这个名字空间下声明一个个的RPC。在这个名字空间的声明里会使用各种宏来辅助,例如下面的BEGIN_MERCURY_INTERFACE负责开启当前Interface的名字空间声明:

#pragma pack( push, 1 )
BEGIN_MERCURY_INTERFACE( CellAppInterface )

这个宏最终会展开为下面的cpp代码:

namespace CellAppInterface 
{ 
	extern Mercury::InterfaceMinder gMinder; 
	void registerWithInterface( Mercury::NetworkInterface & networkInterface ); 
	Mercury::Reason registerWithMachined( Mercury::NetworkInterface & networkInterface, int id ); 
	Mercury::Reason registerWithMachinedAs( const char * name, Mercury::NetworkInterface & networkInterface, int id );
};

这里的三个函数主要负责向全局路由里注册当前机器可以提供的Interface

然后再一个接一个的来声明具体的RPC,下面就是CellAppInterface开头的几个RPC声明的样例:

	// -------------------------------------------------------------------------
	// CellApp messages
	// -------------------------------------------------------------------------
	BW_STREAM_MSG_EX( CellApp, addCell )
		// SpaceID spaceID;

	BW_BEGIN_STRUCT_MSG( CellApp, startup )
		Mercury::Address baseAppAddr;
	END_STRUCT_MESSAGE()

BW_STREAM_MSG_EX对应的是一个无具体参数的RPC的声明,这里的无具体参数并不是代码这个RPC不需要参数,而是将这个RPC的参数当作不定长度的字节流stream来处理,具体的参数解析依赖于具体的逻辑。展开之后就是很简单的一行代码,声明一个extern const Mercury::InterfaceElement &的变量, 变量名字就是当前RPC的名字:

extern const Mercury::InterfaceElement & addCell;

BW_BEGIN_STRUCT_MSG对应的是一个有固定参数的RPC的声明。上面的代码片段的宏展开之后,除了会生成跟之前一样的一个extern const Mercury::InterfaceElement &变量之外,还会生成一个xxxArgs的结构体来封装这个RPC的所有参数:

struct startupArgs; 
extern const Mercury::InterfaceElement & startup;
Mercury::Bundle & operator<<( Mercury::Bundle & b, const struct startupArgs &s );
struct startupArgs 
{ 
	static startupArgs & start( Mercury::Bundle & b, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER ) 
	{ 
		return *(startupArgs*)b.startStructMessage( startup, reliable ); 
	} 
	static startupArgs & startRequest( Mercury::Bundle & b, Mercury::ReplyMessageHandler * handler, void * arg = 0, int timeout = Mercury::DEFAULT_REQUEST_TIMEOUT, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER ) 
	{ 
		return *(startupArgs*)b.startStructRequest( startup, handler, arg, timeout, reliable ); 
	} 
	static const Mercury::InterfaceElement & interfaceElement() { return startup; }
	Mercury::Address baseAppAddr;
};

这里的startupArgs::start的作用就是在传入的Bundle里开始填充一个startUpRPC相关数据,这个startStructMessage在填充了startUp的基本元数据之后,还会分配一个startupArgs大小的缓冲区。

/**
 * Make using simple messages easier - returns a pointer the
 * size of the message (note: not all fixed length msgs will
 * be simple structs, so startMessage doesn't do it
 * automatically)
 */
INLINE void * Bundle::startStructMessage( const InterfaceElement & ie,
	ReliableType reliable )
{
	this->startMessage( ie, reliable );
	return this->reserve( ie.lengthParam() );
}

start函数将这个缓冲区的开始地址强转为startupArgs结构体类型的地址,然后返回这个结构体的引用,外部有了这个结构体的引用之后,就可以开始对startupArgs的内部成员变量做赋值操作,这些赋值操作就是RPC的参数填充。

startRequest也是做了类似的操作,不过这里对应的是逻辑层的一个RPC请求,不仅仅是一个RPC数据的简单填充,需要考虑应答、超时、可靠性等各种参数:

/**
 * Make using simple requests easier - returns a pointer the
 * size of the request message.
 */
INLINE void * Bundle::startStructRequest( const InterfaceElement & ie,
	ReplyMessageHandler * handler, void * arg,
	int timeout, ReliableType reliable)
{
	this->startRequest( ie, handler, arg, timeout, reliable );
	return this->reserve( ie.lengthParam() );
}

xxxApp_Interface.hpp声明完所有的RPC之后,会有一个对应的xxxAPP_interface.cpp文件来提供这些声明的变量和函数的实现,下面就是cellapp_interface.cpp里的全部内容:

// This file should be linked against by those wishing to use our interface

#include "cellapp/cellapp_interface.hpp"
#include "network/network_interface.hpp"

#define DEFINE_INTERFACE_HERE
#include "cellapp/cellapp_interface.hpp"

// cellapp_interface.cpp

这里会对头文件cellapp_interface.hpp引用两次,第一次负责获取所有的声明,然后第二次引用会在DEFINE_INTERFACE_HERE的帮助下生成这些声明的具体实现。因为之前的相关辅助宏会根据DEFINE_INTERFACE_HERE是否定义来切换具体实现。例如开头的BEGIN_MERCURY_INTERFACE( CellAppInterface )DEFINE_INTERFACE_HERE被定义之后会展开为这些实现代码,会声明一个全局的RPC元数据管理器gMinder:

namespace CellAppInterface 
{ 
	Mercury::InterfaceMinder gMinder( "CellAppInterface" ); 
	void registerWithInterface( Mercury::NetworkInterface & networkInterface ) 
	{ 
		gMinder.registerWithInterface( networkInterface ); 
	} 
	Mercury::Reason registerWithMachined( Mercury::NetworkInterface & networkInterface, int id ) 
	{ 
		return gMinder.registerWithMachined( networkInterface.address(), id ); 
	} 
	Mercury::Reason registerWithMachinedAs( const char * name, Mercury::NetworkInterface & networkInterface, int id ) 
	{ 
		return gMinder.registerWithMachinedAs( name, networkInterface.address(), id ); 
	}

同时BW_STREAM_MSG_EX( CellApp, addCell )会被简单的展开为这样的向gMinder注册一个RPC的代码:

 StreamMessageHandlerEx< CellApp > gHandler_addCell(&CellApp::addCell); 
 const Mercury::InterfaceElement & addCell = gMinder.add( "addCell", Mercury::VARIABLE_LENGTH_MESSAGE, 2, &gHandler_addCell );

这个add函数将传入的参数构造出一个InterfaceElement,塞入到gMinder的内部数组elements_中,在这里会给每个注册过来的RPC赋予一个唯一id,其实就是此时elements_数组的长度:

/**
 * 	This method adds an interface element (Mercury method) to the interface minder.
 *  @param name             Name of the interface element.
 * 	@param lengthStyle		Specifies whether the message is fixed or variable.
 *	@param lengthParam		This depends on lengthStyle.
 *	@param pHandler			The message handler for this interface.
 */
InterfaceElement & InterfaceMinder::add( const char * name,
	int8 lengthStyle, int lengthParam, InputMessageHandler * pHandler )
{
	const MessageID id = static_cast<MessageID>(elements_.size());

	// Set up the new bucket and add it to the list
	InterfaceElement element( name, id, lengthStyle, lengthParam,
		pHandler );

	elements_.push_back( element );
	return elements_.back();
}

这里的lengthStyle代表当前RPC的参数大小是否固定,lengthParam的意思依赖于lengthStyle的值:

  1. 如果lengthStyle是固定长度消息FIXED_LENGTH_MESSAGE = 0,则此时的lengthParam表示消息的固定字节数,
  2. 如果lengthStyle是可变长度消息VARIABLE_LENGTH_MESSAGE = 1,则此时lengthParam表示用于存储消息大小的字节数,用于指示消息头中预留多少字节来存储实际的消息长度。在BW_STREAM_MSG_EX会默认的将lengthParam设置为2.代表消息头使用2字节来存储消息长度

对于有参RPC,其展开内容就比较复杂了:

typedef StructMessageHandler< CellApp, CellAppInterface::startupArgs > CellApp_startup_Handler;
CellApp_startup_Handler gHandler_startup(&CellApp::startup);

const Mercury::InterfaceElement & startup =
    gMinder.add( "startup", Mercury::FIXED_LENGTH_MESSAGE,
        sizeof(struct startupArgs), &gHandler_startup );

Mercury::Bundle & operator<<( Mercury::Bundle & b, const struct startupArgs &s ) 
{ 
	b.startMessage( startup ); (*(BinaryOStream*)( &b )) << s; 
	return b; 
} 
struct __Garbage__startupArgs 
{ 
	static startupArgs & start( Mercury::Bundle & b, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER ) 
	{ 
		return *(startupArgs*)b.startStructMessage( startup, reliable ); 
	} 
	static startupArgs & startRequest( Mercury::Bundle & b, Mercury::ReplyMessageHandler * handler, void * arg = 0, int timeout = Mercury::DEFAULT_REQUEST_TIMEOUT, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER ) 
	{ 
		return *(startupArgs*)b.startStructRequest( startup, handler, arg, timeout, reliable ); 
	} 
	static const Mercury::InterfaceElement & interfaceElement() { return startup; }
	Mercury::Address baseAppAddr;
};

但是其实也没那么复杂,这里的struct __Garbage__startupArgs 存在的意义只是为了消耗掉Mercury::Address baseAppAddr;这行代码而已,整个结构体__Garbage__startupArgs并不会被外部所引用。展开后的有效内容其实就只有第一行的注册和第二行的operator<<。第一行的注册函数里负责向gMinder里添加一个c参数定长的RPC,此时参数大小lengthParam会被设置为当前参数结构体startupArgs的大小。 然后序列化函数operator<<里负责将startupArgs强转为二进制流并塞入到Bundle中。

这里的RPC参数序列化机制居然是直接对结构体转成二进制之后进行拼接,这种方法只有在结构体内的所有成员都是POD类型的时候才有效,如果结构体内部有String/Vector等动态容器的情况就不适用了。如果RPC的参数里有这些非POD类型,则不能使用BW_BEGIN_STRUCT_MSG,只能使用BW_STREAM_MSG

RPC的发送、接收与路由

如果业务层需要发起一个RPC,则首先需要在指定的stream里创建一个消息包bundle,然后调用startRequest来填充当前RPC的元数据,下面就是CellAppInterface::addCell的调用样例:

Mercury::Bundle & bundle = cellApp.bundle();
bundle.startRequest( CellAppInterface::addCell,
		new AddCellReplyHandler( cellApp.addr(), id_ ) );
this->addToStream( bundle );

在创建好当前的bundle之后,需要开始做参数填充,由于addCell被声明为了变长参数的RPC,所以没有addCellArgs这个结构体来辅助填充参数。Bigworld采取了类似于iostream的方式来做这种非定长RPC参数的填充:

bundle << isFirstCell_;
isFirstCell_ = false;

bundle << isFromDB_;

对于固定参数的RPC来说,填充这个RPC的参数就简单很多了。以之前介绍过的startup为例,只需要填充好这个startupArgs,并添加到bundle里,就可以发送出去了:

void CellApps::startAll( const Mercury::Address & baseAppAddr ) const
{
	CellAppInterface::startupArgs args;
	args.baseAppAddr = baseAppAddr;

	Map::const_iterator iter = map_.begin();

	while (iter != map_.end())
	{
		CellApp * pCellApp = iter->second;

		Mercury::Bundle & bundle = pCellApp->bundle();
		bundle << args;
		pCellApp->send();

		++iter;
	}
}

startupArgs添加到bundle的时候,其operator<<就会自动触发对应rpc的元数据填充:

Mercury::Bundle & operator<<( Mercury::Bundle & b, const struct startupArgs &s ) 
{ 
	b.startMessage( startup ); (*(BinaryOStream*)( &b )) << s; 
	return b; 
} 

当所有参数都传递好了之后,就可以调用Channelsend将整个bundle发送到网络了。

当一个进程接收到一个RPC消息bundle的时候,我们需要正确的为这个RPC bundle找到对应的处理函数,这个过程就是RPC的路由。RPC的路由功能也是在之前的宏来控制的,重点是BEGIN_MERCURY_INTERFACE宏展开后生成的registerWithInterface函数,下面的代码片段是CellAppInterface展开之后的开头部分代码:

namespace CellAppInterface 
{ 
	Mercury::InterfaceMinder gMinder( "CellAppInterface" ); 
	void registerWithInterface( Mercury::NetworkInterface & networkInterface ) 
	{ 
		gMinder.registerWithInterface( networkInterface ); 
	} 

然后在CellApp启动之后的init函数里,会调用这个声明的CellAppInterface::registerWithInterface函数:

/**
 *	This method is used to initialise the application.
 */
bool CellApp::init( int argc, char * argv[] )
{
	// 省略很多代码
	// find the cell app manager.
	if (!cellAppMgr_.init( "CellAppMgrInterface", Config::numStartupRetries(),
			Config::maxMgrRegisterStagger() ))
	{
		NETWORK_DEBUG_MSG( "CellApp::init: Failed to find the CellAppMgr.\n" );
		return false;
	}

	// Register the fixed portion of our interface with the interface
	CellAppInterface::registerWithInterface( interface_ );
	// 省略很多代码
}

这个gMinder.registerWithInterface负责将当前注册过来的所有RPC函数都加入到NetworkInterfaceRPC路由表里:

/**
 * 	This method registers all the minded interface elements with an interface.
 *
 * 	@param networkInterface	The network interface to register with.
 */
void InterfaceMinder::registerWithInterface(
		NetworkInterface & networkInterface )
{
	for (uint i=0; i < elements_.size(); ++i)
	{
		const InterfaceElement & element = elements_[i];
		networkInterface.interfaceTable().serve( element, element.pHandler() );
	}
}

/**
 *  This method registers an interface element as the handler for the given
 *  message ID on this interface.
 */
void InterfaceTable::serve( const InterfaceElement & ie,
	InputMessageHandler * pHandler )
{
	InterfaceElement & element = table_[ ie.id() ];
	element	= ie;
	element.pHandler( pHandler );
}


这里的element.pHandler就是我们在执行gminder::add时传入的第四个参数:



/**
 * 	This method adds an interface element (Mercury method) to the interface minder.
 *  @param name             Name of the interface element.
 * 	@param lengthStyle		Specifies whether the message is fixed or variable.
 *	@param lengthParam		This depends on lengthStyle.
 *	@param pHandler			The message handler for this interface.
 */
InterfaceElement & InterfaceMinder::add( const char * name,
	int8 lengthStyle, int lengthParam, InputMessageHandler * pHandler )
{
	const MessageID id = static_cast<MessageID>(elements_.size());

	// Set up the new bucket and add it to the list
	InterfaceElement element( name, id, lengthStyle, lengthParam,
		pHandler );

	elements_.push_back( element );
	return elements_.back();
}

我们再回顾一下startup生成的注册代码,可以看出这里提供了CellApp::startup这个函数作为当前RPC的处理函数:

 typedef StructMessageHandler< CellApp, CellAppInterface::startupArgs > CellApp_startup_Handler;

CellApp_startup_Handler gHandler_startup(&CellApp::startup);

const Mercury::InterfaceElement & startup =
    gMinder.add( "startup", Mercury::FIXED_LENGTH_MESSAGE,
        sizeof(struct startupArgs), &gHandler_startup );

刚好在CellApp这个类型上有这个函数的声明,对应的参数列表也是对的:

void startup( const CellAppInterface::startupArgs & args );

从上面的代码中可以知道InterfaceTable::table_肯定是消息路由需要使用的结构,因为这里才存储了每个消息的id到对应处理函数的映射。我们来继续探究消息接收后是如何利用这个InterfaceTable的,通过全局搜索InterfaceElementWithStats的文本,基本可以确定相关逻辑在UDPBundleProcessor::dispatchMessages里:

/**
 *	This method is responsible for dispatching the messages on this bundle to
 *	the appropriate handlers.
 *
 *	@param interfaceTable 	The interface table.
 *	@param addr 			The source address of the bundle.
 *	@param pChannel 		The channel.
 *	@param networkInterface The network interface.
 *	@param pStatsHelper 	The socket receive statistics.
 *
 *	@return 				REASON_SUCCESS on success, otherwise an appropriate
 *							Mercury::Reason describing the error.
 */
Reason UDPBundleProcessor::dispatchMessages( InterfaceTable & interfaceTable,
		const Address & addr, UDPChannel * pChannel,
		NetworkInterface & networkInterface, 
		ProcessSocketStatsHelper * pStatsHelper ) const
{
#	define SOURCE_STR (pChannel ? pChannel->c_str() : addr.c_str())
	bool breakLoop = pChannel ? pChannel->isDead() : false;
	Reason ret = REASON_SUCCESS;

	// NOTE: The channel may be destroyed while processing the messages so we
	// need to hold a local reference to keep pChannel valid. 
	ChannelPtr pChannelHolder = pChannel;
	MessageFilterPtr pMessageFilter =
		pChannel ? pChannel->pMessageFilter() : NULL;

	// now we simply iterate over the messages in that bundle
	iterator iter	= this->begin();
	iterator end	= this->end();

	interfaceTable.onBundleStarted( pChannel );

	while (iter != end && !breakLoop)
	{
		// find out what this message looks like
		InterfaceElementWithStats & ie = interfaceTable[ iter.msgID() ];
		if (ie.pHandler() == NULL)
		{
			// If there aren't any interfaces served on this nub
			// then don't print the warning (slightly dodgy I know)
			ERROR_MSG( "UDPBundleProcessor::dispatchMessages( %s ): "
					"Discarding bundle after hitting unhandled message ID "
					"%u\n",
				SOURCE_STR, iter.msgID() );

			// Note: Early returns are OK because the bundle will
			// release the packets it owns for us!
			ret = REASON_NONEXISTENT_ENTRY;
			break;
		}

		ie.pHandler()->processingEarlyMessageNow( isEarly_ );
		// 省略后续代码
		ie.startProfile();

		{
			PROFILER_SCOPED_DYNAMIC_STRING( ie.c_str() );
			if (!pMessageFilter)
			{
				// and call the handler
				ie.pHandler()->handleMessage( addr, header, mis );
			}
			else
			{
				// or pass to our channel's message filter if it has one
				pMessageFilter->filterMessage( addr, header, mis, ie.pHandler() );
			}
		}
	}
	// 省略后续代码
}

这里的UDPBundleProcessor::dispatchMessages负责对当前channel里的多个连续数据做路由分发,所以这里会使用一个while循环来做每个bundle的遍历处理。当处理到一个bundle的时候,会首先拿到这个bundlemsgID,这个就是我们在gminder::add里获取的消息在interface内的唯一id,其实就是数组内的索引。有了这个msgID之后,就可以从interfaceTable里获得table_里的指定元素ie:

class InterfaceTable : public TimerHandler
{
public:
	InterfaceElementWithStats & operator[]( int id )				{ return table_[ id ]; }
	const InterfaceElementWithStats & operator[]( int id ) const	{ return table_[ id ]; }
};

有了ie之后,ie.pHandler()就是在获取我们注册进去的消息回调函数:

class InterfaceElement
{
public:
	InputMessageHandler * pHandler() const { return pHandler_; }
};

然后ie.pHandler()->handleMessage内部会先把参数结构体ARGS_TYPEdata里通过operator>>反序列出来,之后再执行最终的回调函数(pObject->*handler_)( args ),这个函数唯一的参数就是参数结构体ARGS_TYPE:

 typedef StructMessageHandler< CellApp, CellAppInterface::startupArgs > CellApp_startup_Handler;

 /**
 *	Objects of this type are used to handle structured messages.
 */
template <class OBJECT_TYPE, class ARGS_TYPE,
		 class FIND_POLICY = MessageHandlerFinder< OBJECT_TYPE > >
class StructMessageHandler : public Mercury::InputMessageHandler
{
	public:
		/**
		 *	This type is the function pointer type that handles the incoming
		 *	message.
		 */
		typedef void (OBJECT_TYPE::*Handler)( const ARGS_TYPE & args );

		/**
		 *	Constructor.
		 */
		StructMessageHandler( Handler handler ) : handler_( handler ) {}

		// Override
		void handleMessage( const Mercury::Address & srcAddr,
				Mercury::UnpackedMessageHeader & header,
				BinaryIStream & data ) /* override */
		{
			OBJECT_TYPE * pObject = FIND_POLICY::find( srcAddr, header, data );

			if (pObject != NULL)
			{
				ARGS_TYPE args;
				data >> args;

				(pObject->*handler_)( args );
			}
			else
			{
				ERROR_MSG( "StructMessageHandler::handleMessage(%s): "
					"%s (id %d). Could not find object\n",
					srcAddr.c_str(), header.msgName(), header.identifier );

				data.finish();
			}
		}

		Handler handler_;
};

上面的StructMessageHandler处理的是参数类型为固定大小的POD结构体的RPC,对于另外一种基于stream参数的RPC,其消息Handler则是StreamMessageHandlerEx:

 StreamMessageHandlerEx< CellApp > gHandler_addCell(&CellApp::addCell); 
 const Mercury::InterfaceElement & addCell = gMinder.add( "addCell", Mercury::VARIABLE_LENGTH_MESSAGE, 2, &gHandler_addCell );

StreamMessageHandleEx里对RPC进行分发的处理与StructMessageHandler不一样,这里会将传入的srcAddr,header,data三个参数原样的传递过去:

/**
 *	Objects of this type are used to handle variable length messages. This
 *	version supplies the source address and header.
 */
template <class OBJECT_TYPE,
		 class FIND_POLICY = MessageHandlerFinder< OBJECT_TYPE > >
class StreamMessageHandlerEx : public Mercury::InputMessageHandler
{
	public:
		/**
		 *	This type is the function pointer type that handles the incoming
		 *	message.
		 */
		typedef void (OBJECT_TYPE::*Handler)(
			const Mercury::Address & addr,
			const Mercury::UnpackedMessageHeader & header,
			BinaryIStream & stream );

		/**
		 *	Constructor.
		 */
		StreamMessageHandlerEx( Handler handler ) : handler_( handler ) {}

		// Override
		virtual void handleMessage( const Mercury::Address & srcAddr,
				Mercury::UnpackedMessageHeader & header,
				BinaryIStream & data )
		{
			OBJECT_TYPE * pObject = FIND_POLICY::find( srcAddr, header, data );

			if (pObject != NULL)
			{
				(pObject->*handler_)( srcAddr, header, data );
			}
			else
			{
				// OK we give up then
				ERROR_MSG( "StreamMessageHandlerEx::handleMessage: "
						"Do not have object for message from %s\n",
					srcAddr.c_str() );
			}
		}

		Handler handler_;
};

所以addCell这个RPCCellApp上的处理函数声明是这样的,带上了这个BinaryIStream:

void addCell( const Mercury::Address & srcAddr,
		const Mercury::UnpackedMessageHeader & header,
		BinaryIStream & data );