BigWorld 的 RPC 实现
INTERFACE与RPC
在Bigworld引擎中,每一种App所支持的RPC的声明需要集中在一个地方声明,一般都放在xxxApp_Interface.hpp文件里。在这个头文件中,会声明一个XXXAppInterface的名字空间,然后在这个名字空间下声明一个个的RPC。在这个名字空间的声明里会使用各种宏来辅助,例如下面的BEGIN_MERCURY_INTERFACE负责开启当前Interface的名字空间声明:
#pragma pack( push, 1 )
BEGIN_MERCURY_INTERFACE( CellAppInterface )
这个宏最终会展开为下面的cpp代码:
namespace CellAppInterface
{
extern Mercury::InterfaceMinder gMinder;
void registerWithInterface( Mercury::NetworkInterface & networkInterface );
Mercury::Reason registerWithMachined( Mercury::NetworkInterface & networkInterface, int id );
Mercury::Reason registerWithMachinedAs( const char * name, Mercury::NetworkInterface & networkInterface, int id );
};
这里的三个函数主要负责向全局路由里注册当前机器可以提供的Interface。
然后再一个接一个的来声明具体的RPC,下面就是CellAppInterface开头的几个RPC声明的样例:
// -------------------------------------------------------------------------
// CellApp messages
// -------------------------------------------------------------------------
BW_STREAM_MSG_EX( CellApp, addCell )
// SpaceID spaceID;
BW_BEGIN_STRUCT_MSG( CellApp, startup )
Mercury::Address baseAppAddr;
END_STRUCT_MESSAGE()
BW_STREAM_MSG_EX对应的是一个无具体参数的RPC的声明,这里的无具体参数并不是代码这个RPC不需要参数,而是将这个RPC的参数当作不定长度的字节流stream来处理,具体的参数解析依赖于具体的逻辑。展开之后就是很简单的一行代码,声明一个extern const Mercury::InterfaceElement &的变量, 变量名字就是当前RPC的名字:
extern const Mercury::InterfaceElement & addCell;
BW_BEGIN_STRUCT_MSG对应的是一个有固定参数的RPC的声明。上面的代码片段的宏展开之后,除了会生成跟之前一样的一个extern const Mercury::InterfaceElement &变量之外,还会生成一个xxxArgs的结构体来封装这个RPC的所有参数:
struct startupArgs;
extern const Mercury::InterfaceElement & startup;
Mercury::Bundle & operator<<( Mercury::Bundle & b, const struct startupArgs &s );
struct startupArgs
{
static startupArgs & start( Mercury::Bundle & b, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER )
{
return *(startupArgs*)b.startStructMessage( startup, reliable );
}
static startupArgs & startRequest( Mercury::Bundle & b, Mercury::ReplyMessageHandler * handler, void * arg = 0, int timeout = Mercury::DEFAULT_REQUEST_TIMEOUT, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER )
{
return *(startupArgs*)b.startStructRequest( startup, handler, arg, timeout, reliable );
}
static const Mercury::InterfaceElement & interfaceElement() { return startup; }
Mercury::Address baseAppAddr;
};
这里的startupArgs::start的作用就是在传入的Bundle里开始填充一个startUp的RPC相关数据,这个startStructMessage在填充了startUp的基本元数据之后,还会分配一个startupArgs大小的缓冲区。
/**
* Make using simple messages easier - returns a pointer the
* size of the message (note: not all fixed length msgs will
* be simple structs, so startMessage doesn't do it
* automatically)
*/
INLINE void * Bundle::startStructMessage( const InterfaceElement & ie,
ReliableType reliable )
{
this->startMessage( ie, reliable );
return this->reserve( ie.lengthParam() );
}
start函数将这个缓冲区的开始地址强转为startupArgs结构体类型的地址,然后返回这个结构体的引用,外部有了这个结构体的引用之后,就可以开始对startupArgs的内部成员变量做赋值操作,这些赋值操作就是RPC的参数填充。
startRequest也是做了类似的操作,不过这里对应的是逻辑层的一个RPC请求,不仅仅是一个RPC数据的简单填充,需要考虑应答、超时、可靠性等各种参数:
/**
* Make using simple requests easier - returns a pointer the
* size of the request message.
*/
INLINE void * Bundle::startStructRequest( const InterfaceElement & ie,
ReplyMessageHandler * handler, void * arg,
int timeout, ReliableType reliable)
{
this->startRequest( ie, handler, arg, timeout, reliable );
return this->reserve( ie.lengthParam() );
}
在xxxApp_Interface.hpp声明完所有的RPC之后,会有一个对应的xxxAPP_interface.cpp文件来提供这些声明的变量和函数的实现,下面就是cellapp_interface.cpp里的全部内容:
// This file should be linked against by those wishing to use our interface
#include "cellapp/cellapp_interface.hpp"
#include "network/network_interface.hpp"
#define DEFINE_INTERFACE_HERE
#include "cellapp/cellapp_interface.hpp"
// cellapp_interface.cpp
这里会对头文件cellapp_interface.hpp引用两次,第一次负责获取所有的声明,然后第二次引用会在DEFINE_INTERFACE_HERE的帮助下生成这些声明的具体实现。因为之前的相关辅助宏会根据DEFINE_INTERFACE_HERE是否定义来切换具体实现。例如开头的BEGIN_MERCURY_INTERFACE( CellAppInterface )在DEFINE_INTERFACE_HERE被定义之后会展开为这些实现代码,会声明一个全局的RPC元数据管理器gMinder:
namespace CellAppInterface
{
Mercury::InterfaceMinder gMinder( "CellAppInterface" );
void registerWithInterface( Mercury::NetworkInterface & networkInterface )
{
gMinder.registerWithInterface( networkInterface );
}
Mercury::Reason registerWithMachined( Mercury::NetworkInterface & networkInterface, int id )
{
return gMinder.registerWithMachined( networkInterface.address(), id );
}
Mercury::Reason registerWithMachinedAs( const char * name, Mercury::NetworkInterface & networkInterface, int id )
{
return gMinder.registerWithMachinedAs( name, networkInterface.address(), id );
}
同时BW_STREAM_MSG_EX( CellApp, addCell )会被简单的展开为这样的向gMinder注册一个RPC的代码:
StreamMessageHandlerEx< CellApp > gHandler_addCell(&CellApp::addCell);
const Mercury::InterfaceElement & addCell = gMinder.add( "addCell", Mercury::VARIABLE_LENGTH_MESSAGE, 2, &gHandler_addCell );
这个add函数将传入的参数构造出一个InterfaceElement,塞入到gMinder的内部数组elements_中,在这里会给每个注册过来的RPC赋予一个唯一id,其实就是此时elements_数组的长度:
/**
* This method adds an interface element (Mercury method) to the interface minder.
* @param name Name of the interface element.
* @param lengthStyle Specifies whether the message is fixed or variable.
* @param lengthParam This depends on lengthStyle.
* @param pHandler The message handler for this interface.
*/
InterfaceElement & InterfaceMinder::add( const char * name,
int8 lengthStyle, int lengthParam, InputMessageHandler * pHandler )
{
const MessageID id = static_cast<MessageID>(elements_.size());
// Set up the new bucket and add it to the list
InterfaceElement element( name, id, lengthStyle, lengthParam,
pHandler );
elements_.push_back( element );
return elements_.back();
}
这里的lengthStyle代表当前RPC的参数大小是否固定,lengthParam的意思依赖于lengthStyle的值:
- 如果
lengthStyle是固定长度消息FIXED_LENGTH_MESSAGE = 0,则此时的lengthParam表示消息的固定字节数, - 如果
lengthStyle是可变长度消息VARIABLE_LENGTH_MESSAGE = 1,则此时lengthParam表示用于存储消息大小的字节数,用于指示消息头中预留多少字节来存储实际的消息长度。在BW_STREAM_MSG_EX会默认的将lengthParam设置为2.代表消息头使用2字节来存储消息长度
对于有参RPC,其展开内容就比较复杂了:
typedef StructMessageHandler< CellApp, CellAppInterface::startupArgs > CellApp_startup_Handler;
CellApp_startup_Handler gHandler_startup(&CellApp::startup);
const Mercury::InterfaceElement & startup =
gMinder.add( "startup", Mercury::FIXED_LENGTH_MESSAGE,
sizeof(struct startupArgs), &gHandler_startup );
Mercury::Bundle & operator<<( Mercury::Bundle & b, const struct startupArgs &s )
{
b.startMessage( startup ); (*(BinaryOStream*)( &b )) << s;
return b;
}
struct __Garbage__startupArgs
{
static startupArgs & start( Mercury::Bundle & b, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER )
{
return *(startupArgs*)b.startStructMessage( startup, reliable );
}
static startupArgs & startRequest( Mercury::Bundle & b, Mercury::ReplyMessageHandler * handler, void * arg = 0, int timeout = Mercury::DEFAULT_REQUEST_TIMEOUT, Mercury::ReliableType reliable = Mercury::RELIABLE_DRIVER )
{
return *(startupArgs*)b.startStructRequest( startup, handler, arg, timeout, reliable );
}
static const Mercury::InterfaceElement & interfaceElement() { return startup; }
Mercury::Address baseAppAddr;
};
但是其实也没那么复杂,这里的struct __Garbage__startupArgs 存在的意义只是为了消耗掉Mercury::Address baseAppAddr;这行代码而已,整个结构体__Garbage__startupArgs并不会被外部所引用。展开后的有效内容其实就只有第一行的注册和第二行的operator<<。第一行的注册函数里负责向gMinder里添加一个c参数定长的RPC,此时参数大小lengthParam会被设置为当前参数结构体startupArgs的大小。 然后序列化函数operator<<里负责将startupArgs强转为二进制流并塞入到Bundle中。
这里的RPC参数序列化机制居然是直接对结构体转成二进制之后进行拼接,这种方法只有在结构体内的所有成员都是POD类型的时候才有效,如果结构体内部有String/Vector等动态容器的情况就不适用了。如果RPC的参数里有这些非POD类型,则不能使用BW_BEGIN_STRUCT_MSG,只能使用BW_STREAM_MSG。
RPC的发送、接收与路由
如果业务层需要发起一个RPC,则首先需要在指定的stream里创建一个消息包bundle,然后调用startRequest来填充当前RPC的元数据,下面就是CellAppInterface::addCell的调用样例:
Mercury::Bundle & bundle = cellApp.bundle();
bundle.startRequest( CellAppInterface::addCell,
new AddCellReplyHandler( cellApp.addr(), id_ ) );
this->addToStream( bundle );
在创建好当前的bundle之后,需要开始做参数填充,由于addCell被声明为了变长参数的RPC,所以没有addCellArgs这个结构体来辅助填充参数。Bigworld采取了类似于iostream的方式来做这种非定长RPC参数的填充:
bundle << isFirstCell_;
isFirstCell_ = false;
bundle << isFromDB_;
对于固定参数的RPC来说,填充这个RPC的参数就简单很多了。以之前介绍过的startup为例,只需要填充好这个startupArgs,并添加到bundle里,就可以发送出去了:
void CellApps::startAll( const Mercury::Address & baseAppAddr ) const
{
CellAppInterface::startupArgs args;
args.baseAppAddr = baseAppAddr;
Map::const_iterator iter = map_.begin();
while (iter != map_.end())
{
CellApp * pCellApp = iter->second;
Mercury::Bundle & bundle = pCellApp->bundle();
bundle << args;
pCellApp->send();
++iter;
}
}
在startupArgs添加到bundle的时候,其operator<<就会自动触发对应rpc的元数据填充:
Mercury::Bundle & operator<<( Mercury::Bundle & b, const struct startupArgs &s )
{
b.startMessage( startup ); (*(BinaryOStream*)( &b )) << s;
return b;
}
当所有参数都传递好了之后,就可以调用Channel的send将整个bundle发送到网络了。
当一个进程接收到一个RPC消息bundle的时候,我们需要正确的为这个RPC bundle找到对应的处理函数,这个过程就是RPC的路由。RPC的路由功能也是在之前的宏来控制的,重点是BEGIN_MERCURY_INTERFACE宏展开后生成的registerWithInterface函数,下面的代码片段是CellAppInterface展开之后的开头部分代码:
namespace CellAppInterface
{
Mercury::InterfaceMinder gMinder( "CellAppInterface" );
void registerWithInterface( Mercury::NetworkInterface & networkInterface )
{
gMinder.registerWithInterface( networkInterface );
}
然后在CellApp启动之后的init函数里,会调用这个声明的CellAppInterface::registerWithInterface函数:
/**
* This method is used to initialise the application.
*/
bool CellApp::init( int argc, char * argv[] )
{
// 省略很多代码
// find the cell app manager.
if (!cellAppMgr_.init( "CellAppMgrInterface", Config::numStartupRetries(),
Config::maxMgrRegisterStagger() ))
{
NETWORK_DEBUG_MSG( "CellApp::init: Failed to find the CellAppMgr.\n" );
return false;
}
// Register the fixed portion of our interface with the interface
CellAppInterface::registerWithInterface( interface_ );
// 省略很多代码
}
这个gMinder.registerWithInterface负责将当前注册过来的所有RPC函数都加入到NetworkInterface的RPC路由表里:
/**
* This method registers all the minded interface elements with an interface.
*
* @param networkInterface The network interface to register with.
*/
void InterfaceMinder::registerWithInterface(
NetworkInterface & networkInterface )
{
for (uint i=0; i < elements_.size(); ++i)
{
const InterfaceElement & element = elements_[i];
networkInterface.interfaceTable().serve( element, element.pHandler() );
}
}
/**
* This method registers an interface element as the handler for the given
* message ID on this interface.
*/
void InterfaceTable::serve( const InterfaceElement & ie,
InputMessageHandler * pHandler )
{
InterfaceElement & element = table_[ ie.id() ];
element = ie;
element.pHandler( pHandler );
}
这里的element.pHandler就是我们在执行gminder::add时传入的第四个参数:
/**
* This method adds an interface element (Mercury method) to the interface minder.
* @param name Name of the interface element.
* @param lengthStyle Specifies whether the message is fixed or variable.
* @param lengthParam This depends on lengthStyle.
* @param pHandler The message handler for this interface.
*/
InterfaceElement & InterfaceMinder::add( const char * name,
int8 lengthStyle, int lengthParam, InputMessageHandler * pHandler )
{
const MessageID id = static_cast<MessageID>(elements_.size());
// Set up the new bucket and add it to the list
InterfaceElement element( name, id, lengthStyle, lengthParam,
pHandler );
elements_.push_back( element );
return elements_.back();
}
我们再回顾一下startup生成的注册代码,可以看出这里提供了CellApp::startup这个函数作为当前RPC的处理函数:
typedef StructMessageHandler< CellApp, CellAppInterface::startupArgs > CellApp_startup_Handler;
CellApp_startup_Handler gHandler_startup(&CellApp::startup);
const Mercury::InterfaceElement & startup =
gMinder.add( "startup", Mercury::FIXED_LENGTH_MESSAGE,
sizeof(struct startupArgs), &gHandler_startup );
刚好在CellApp这个类型上有这个函数的声明,对应的参数列表也是对的:
void startup( const CellAppInterface::startupArgs & args );
从上面的代码中可以知道InterfaceTable::table_肯定是消息路由需要使用的结构,因为这里才存储了每个消息的id到对应处理函数的映射。我们来继续探究消息接收后是如何利用这个InterfaceTable的,通过全局搜索InterfaceElementWithStats的文本,基本可以确定相关逻辑在UDPBundleProcessor::dispatchMessages里:
/**
* This method is responsible for dispatching the messages on this bundle to
* the appropriate handlers.
*
* @param interfaceTable The interface table.
* @param addr The source address of the bundle.
* @param pChannel The channel.
* @param networkInterface The network interface.
* @param pStatsHelper The socket receive statistics.
*
* @return REASON_SUCCESS on success, otherwise an appropriate
* Mercury::Reason describing the error.
*/
Reason UDPBundleProcessor::dispatchMessages( InterfaceTable & interfaceTable,
const Address & addr, UDPChannel * pChannel,
NetworkInterface & networkInterface,
ProcessSocketStatsHelper * pStatsHelper ) const
{
# define SOURCE_STR (pChannel ? pChannel->c_str() : addr.c_str())
bool breakLoop = pChannel ? pChannel->isDead() : false;
Reason ret = REASON_SUCCESS;
// NOTE: The channel may be destroyed while processing the messages so we
// need to hold a local reference to keep pChannel valid.
ChannelPtr pChannelHolder = pChannel;
MessageFilterPtr pMessageFilter =
pChannel ? pChannel->pMessageFilter() : NULL;
// now we simply iterate over the messages in that bundle
iterator iter = this->begin();
iterator end = this->end();
interfaceTable.onBundleStarted( pChannel );
while (iter != end && !breakLoop)
{
// find out what this message looks like
InterfaceElementWithStats & ie = interfaceTable[ iter.msgID() ];
if (ie.pHandler() == NULL)
{
// If there aren't any interfaces served on this nub
// then don't print the warning (slightly dodgy I know)
ERROR_MSG( "UDPBundleProcessor::dispatchMessages( %s ): "
"Discarding bundle after hitting unhandled message ID "
"%u\n",
SOURCE_STR, iter.msgID() );
// Note: Early returns are OK because the bundle will
// release the packets it owns for us!
ret = REASON_NONEXISTENT_ENTRY;
break;
}
ie.pHandler()->processingEarlyMessageNow( isEarly_ );
// 省略后续代码
ie.startProfile();
{
PROFILER_SCOPED_DYNAMIC_STRING( ie.c_str() );
if (!pMessageFilter)
{
// and call the handler
ie.pHandler()->handleMessage( addr, header, mis );
}
else
{
// or pass to our channel's message filter if it has one
pMessageFilter->filterMessage( addr, header, mis, ie.pHandler() );
}
}
}
// 省略后续代码
}
这里的UDPBundleProcessor::dispatchMessages负责对当前channel里的多个连续数据做路由分发,所以这里会使用一个while循环来做每个bundle的遍历处理。当处理到一个bundle的时候,会首先拿到这个bundle的msgID,这个就是我们在gminder::add里获取的消息在interface内的唯一id,其实就是数组内的索引。有了这个msgID之后,就可以从interfaceTable里获得table_里的指定元素ie:
class InterfaceTable : public TimerHandler
{
public:
InterfaceElementWithStats & operator[]( int id ) { return table_[ id ]; }
const InterfaceElementWithStats & operator[]( int id ) const { return table_[ id ]; }
};
有了ie之后,ie.pHandler()就是在获取我们注册进去的消息回调函数:
class InterfaceElement
{
public:
InputMessageHandler * pHandler() const { return pHandler_; }
};
然后ie.pHandler()->handleMessage内部会先把参数结构体ARGS_TYPE从data里通过operator>>反序列出来,之后再执行最终的回调函数(pObject->*handler_)( args ),这个函数唯一的参数就是参数结构体ARGS_TYPE:
typedef StructMessageHandler< CellApp, CellAppInterface::startupArgs > CellApp_startup_Handler;
/**
* Objects of this type are used to handle structured messages.
*/
template <class OBJECT_TYPE, class ARGS_TYPE,
class FIND_POLICY = MessageHandlerFinder< OBJECT_TYPE > >
class StructMessageHandler : public Mercury::InputMessageHandler
{
public:
/**
* This type is the function pointer type that handles the incoming
* message.
*/
typedef void (OBJECT_TYPE::*Handler)( const ARGS_TYPE & args );
/**
* Constructor.
*/
StructMessageHandler( Handler handler ) : handler_( handler ) {}
// Override
void handleMessage( const Mercury::Address & srcAddr,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data ) /* override */
{
OBJECT_TYPE * pObject = FIND_POLICY::find( srcAddr, header, data );
if (pObject != NULL)
{
ARGS_TYPE args;
data >> args;
(pObject->*handler_)( args );
}
else
{
ERROR_MSG( "StructMessageHandler::handleMessage(%s): "
"%s (id %d). Could not find object\n",
srcAddr.c_str(), header.msgName(), header.identifier );
data.finish();
}
}
Handler handler_;
};
上面的StructMessageHandler处理的是参数类型为固定大小的POD结构体的RPC,对于另外一种基于stream参数的RPC,其消息Handler则是StreamMessageHandlerEx:
StreamMessageHandlerEx< CellApp > gHandler_addCell(&CellApp::addCell);
const Mercury::InterfaceElement & addCell = gMinder.add( "addCell", Mercury::VARIABLE_LENGTH_MESSAGE, 2, &gHandler_addCell );
在StreamMessageHandleEx里对RPC进行分发的处理与StructMessageHandler不一样,这里会将传入的srcAddr,header,data三个参数原样的传递过去:
/**
* Objects of this type are used to handle variable length messages. This
* version supplies the source address and header.
*/
template <class OBJECT_TYPE,
class FIND_POLICY = MessageHandlerFinder< OBJECT_TYPE > >
class StreamMessageHandlerEx : public Mercury::InputMessageHandler
{
public:
/**
* This type is the function pointer type that handles the incoming
* message.
*/
typedef void (OBJECT_TYPE::*Handler)(
const Mercury::Address & addr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & stream );
/**
* Constructor.
*/
StreamMessageHandlerEx( Handler handler ) : handler_( handler ) {}
// Override
virtual void handleMessage( const Mercury::Address & srcAddr,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
OBJECT_TYPE * pObject = FIND_POLICY::find( srcAddr, header, data );
if (pObject != NULL)
{
(pObject->*handler_)( srcAddr, header, data );
}
else
{
// OK we give up then
ERROR_MSG( "StreamMessageHandlerEx::handleMessage: "
"Do not have object for message from %s\n",
srcAddr.c_str() );
}
}
Handler handler_;
};
所以addCell这个RPC在CellApp上的处理函数声明是这样的,带上了这个BinaryIStream:
void addCell( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data );